mirror of https://git.48k.eu/ogcli/
223 lines
7.2 KiB
Python
223 lines
7.2 KiB
Python
# Copyright (C) 2020-2024 Soleta Networks <opengnsys@soleta.eu>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it under
|
|
# the terms of the GNU Affero General Public License as published by the
|
|
# Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
import argparse
|
|
|
|
from cli.utils import *
|
|
|
|
class OgRepo():
|
|
|
|
@staticmethod
|
|
def list_repos(rest):
|
|
res = rest.get('/repositories')
|
|
if not res:
|
|
return 1
|
|
print_json(res.text)
|
|
return 0
|
|
|
|
@staticmethod
|
|
def _add_repo(rest, parsed_args):
|
|
payload = {
|
|
'addr': parsed_args.ip,
|
|
'name': parsed_args.name,
|
|
}
|
|
res = rest.post('/repository/add', payload=payload)
|
|
if not res:
|
|
return 1
|
|
return 0
|
|
|
|
@staticmethod
|
|
def _add_repo_ip(rest, parsed_args):
|
|
for ip in parsed_args.ip:
|
|
if not check_address(ip):
|
|
print(f'Invalid IP address: {ip}')
|
|
return 1
|
|
|
|
res = rest.get('/repositories')
|
|
target_repo = None
|
|
for repo in res.json()['repositories']:
|
|
if repo['id'] == parsed_args.id:
|
|
target_repo = repo
|
|
break
|
|
|
|
if not target_repo:
|
|
print('Invalid repository id specified')
|
|
return 1
|
|
|
|
for ip in parsed_args.ip:
|
|
if ip in target_repo['addr']:
|
|
print(f'The repository already contains the address {ip}')
|
|
return 1
|
|
|
|
payload = {
|
|
'id': parsed_args.id,
|
|
'addr': target_repo['addr'] + parsed_args.ip,
|
|
'name': target_repo['name'],
|
|
}
|
|
res = rest.post('/repository/update', payload=payload)
|
|
if not res:
|
|
return 1
|
|
return 0
|
|
|
|
@staticmethod
|
|
def add_repo(rest, args):
|
|
parser = argparse.ArgumentParser(prog='ogcli add repo')
|
|
|
|
# Mutually exclusive group for adding new repo or adding address to existing repo
|
|
group = parser.add_mutually_exclusive_group(required=True)
|
|
group.add_argument('--name',
|
|
nargs='?',
|
|
help='name of the repository')
|
|
group.add_argument('--id',
|
|
type=int,
|
|
help='repo id (list repos using "ogcli list repos")')
|
|
parser.add_argument('--ip',
|
|
nargs='+',
|
|
required=True,
|
|
help='IP list separated by spaces')
|
|
|
|
parsed_args = parser.parse_args(args)
|
|
|
|
if parsed_args.name:
|
|
OgRepo._add_repo(rest, parsed_args)
|
|
elif parsed_args.id:
|
|
OgRepo._add_repo_ip(rest, parsed_args)
|
|
|
|
@staticmethod
|
|
def update_repo(rest, args):
|
|
parser = argparse.ArgumentParser(prog='ogcli update repo')
|
|
parser.add_argument('--id',
|
|
type=int,
|
|
nargs='?',
|
|
required=True,
|
|
help='repo id (list repos using "ogcli list repos")')
|
|
parser.add_argument('--ip',
|
|
nargs='+',
|
|
required=True,
|
|
help='IP list separated by spaces')
|
|
parser.add_argument('--name',
|
|
nargs='?',
|
|
required=True,
|
|
help='name of the repository')
|
|
parsed_args = parser.parse_args(args)
|
|
payload = {
|
|
'id': parsed_args.id,
|
|
'addr': parsed_args.ip,
|
|
'name': parsed_args.name,
|
|
}
|
|
|
|
res = rest.post('/repository/update', payload=payload)
|
|
if not res:
|
|
return 1
|
|
return 0
|
|
|
|
@staticmethod
|
|
def _delete_repo_ip(rest, parsed_args):
|
|
for ip in parsed_args.ip:
|
|
if not check_address(ip):
|
|
print(f'Invalid IP address: {ip}')
|
|
return 1
|
|
|
|
res = rest.get('/repositories')
|
|
target_repo = None
|
|
for repo in res.json()['repositories']:
|
|
if repo['id'] == parsed_args.id:
|
|
target_repo = repo
|
|
break
|
|
|
|
if not target_repo:
|
|
print('Invalid repository id specified')
|
|
return 1
|
|
|
|
for ip in parsed_args.ip:
|
|
if ip not in target_repo['addr']:
|
|
print(f'Invalid address {ip}: The repository has the following IPs: {target_repo["addr"]}')
|
|
return 1
|
|
|
|
target_repo['addr'].remove(ip)
|
|
|
|
payload = {
|
|
'id': parsed_args.id,
|
|
'addr': target_repo['addr'],
|
|
'name': target_repo['name'],
|
|
}
|
|
res = rest.post('/repository/update', payload=payload)
|
|
if not res:
|
|
return 1
|
|
return 0
|
|
|
|
@staticmethod
|
|
def _delete_repo(rest, parsed_args):
|
|
payload = {'id': parsed_args.id}
|
|
res = rest.post('/repository/delete', payload=payload)
|
|
if not res:
|
|
return 1
|
|
return 0
|
|
|
|
@staticmethod
|
|
def delete_repo(rest, args):
|
|
parser = argparse.ArgumentParser(prog='ogcli delete repo')
|
|
|
|
parser.add_argument('--id',
|
|
type=int,
|
|
required=True,
|
|
help='repo id (list repos using "ogcli list repos")')
|
|
|
|
# --ip is optional; if provided, the IP will be removed, otherwise,
|
|
# the repo will be deleted
|
|
parser.add_argument('--ip',
|
|
nargs='+',
|
|
help='IP address to remove from the repository')
|
|
|
|
parsed_args = parser.parse_args(args)
|
|
|
|
if parsed_args.ip:
|
|
OgRepo._delete_repo_ip(rest, parsed_args)
|
|
else:
|
|
OgRepo._delete_repo(rest, parsed_args)
|
|
|
|
@staticmethod
|
|
def set_repo(rest, args):
|
|
parser = argparse.ArgumentParser(prog='ogcli set repo')
|
|
parser.add_argument('--id',
|
|
type=int,
|
|
nargs='?',
|
|
required=True,
|
|
help='repo id (list repos using "ogcli list repos")')
|
|
group = parser.add_argument_group('clients', 'Target client arguments')
|
|
group.add_argument('--room-id',
|
|
type=int,
|
|
action='append',
|
|
default=[],
|
|
required=False,
|
|
help='Clients from a given room id')
|
|
group.add_argument('--client-ip',
|
|
action='append',
|
|
default=[],
|
|
required=False,
|
|
help='Any valid client ip address')
|
|
parsed_args = parser.parse_args(args)
|
|
|
|
res = rest.get('/scopes')
|
|
scopes = res.json()
|
|
ips = set()
|
|
|
|
for room in parsed_args.room_id:
|
|
room_scope = scope_lookup(room, 'room', scopes)
|
|
ips.update(ips_in_scope(room_scope))
|
|
for l in parsed_args.client_ip:
|
|
ips.add(l)
|
|
if not ips:
|
|
print('No valid clients specified.')
|
|
return 1
|
|
|
|
payload = {'id': parsed_args.id, 'clients': list(ips)}
|
|
res = rest.post('/client/repo', payload=payload)
|
|
if not res:
|
|
return 1
|
|
return 0
|