mirror of https://git.48k.eu/ogcli/
231 lines
8.4 KiB
Python
231 lines
8.4 KiB
Python
# Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it under
|
|
# the terms of the GNU Affero General Public License as published by the
|
|
# Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
from cli.objects.client import OgClient
|
|
from cli.objects.scopes import OgScope
|
|
from cli.objects.modes import OgModes
|
|
from cli.objects.wol import OgWol
|
|
from cli.objects.images import OgImage
|
|
from cli.objects.disks import OgDisk
|
|
from cli.objects.poweroff import OgPoweroff
|
|
from cli.objects.reboot import OgReboot
|
|
from cli.objects.repo import OgRepo
|
|
from cli.objects.server import OgServer
|
|
from cli.objects.center import OgCenter
|
|
import argparse
|
|
import requests
|
|
import sys
|
|
|
|
|
|
class OgREST():
|
|
def __init__(self, ip, port, api_token):
|
|
self.URL = f'http://{ip}:{port}'
|
|
self.HEADERS = {'Authorization': api_token}
|
|
|
|
def get(self, path, payload=None):
|
|
try:
|
|
r = requests.get(f'{self.URL}{path}',
|
|
headers=self.HEADERS,
|
|
json=payload)
|
|
if r.status_code != 200:
|
|
sys.exit(f"Cannot connect to ogServer: "
|
|
f"{r.status_code} HTTP status code")
|
|
except IOError as e:
|
|
sys.exit(f"Cannot connect to ogServer: {e}")
|
|
return r
|
|
|
|
def post(self, path, payload):
|
|
try:
|
|
r = requests.post(f'{self.URL}{path}',
|
|
headers=self.HEADERS,
|
|
json=payload)
|
|
print(r.text)
|
|
if r.status_code not in {200, 202}:
|
|
sys.exit(f"Cannot connect to ogServer: "
|
|
f"{r.status_code} HTTP status code")
|
|
except IOError as e:
|
|
sys.exit(f"Cannot connect to ogServer: {e}")
|
|
return r
|
|
|
|
def delete(self, path, payload):
|
|
try:
|
|
r = requests.delete(f'{self.URL}{path}',
|
|
headers=self.HEADERS,
|
|
json=payload)
|
|
print(r.text)
|
|
if r.status_code != 200:
|
|
sys.exit(f"Unsuccessful request to ogServer: "
|
|
f"{r.status_code} HTTP status code")
|
|
except IOError as e:
|
|
sys.exit(f"Cannot connect to ogServer: {e}")
|
|
return r
|
|
|
|
|
|
class OgCLI():
|
|
def __init__(self, cfg):
|
|
self.rest = OgREST(cfg['ip'], cfg['port'], cfg['api_token'])
|
|
|
|
def list(self, args):
|
|
choices = ['clients', 'scopes', 'modes', 'hardware',
|
|
'client', 'images', 'disks', 'server',
|
|
'servers', 'repos']
|
|
parser = argparse.ArgumentParser(prog='ogcli list')
|
|
parser.add_argument('item', choices=choices)
|
|
|
|
if not args:
|
|
print('Missing list subcommand', file=sys.stderr)
|
|
parser.print_help(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
parsed_args = parser.parse_args([args[0]])
|
|
if parsed_args.item == 'clients':
|
|
OgClient.list_clients(self.rest)
|
|
elif parsed_args.item == 'client':
|
|
OgClient.get_client_properties(self.rest, args[1:])
|
|
elif parsed_args.item == 'hardware':
|
|
OgClient.list_client_hardware(self.rest, args[1:])
|
|
elif parsed_args.item == 'modes':
|
|
OgModes.list_available_modes(self.rest)
|
|
elif parsed_args.item == 'scopes':
|
|
OgScope.list_scopes(self.rest)
|
|
elif parsed_args.item == 'images':
|
|
OgImage.list_images(self.rest)
|
|
elif parsed_args.item == 'disks':
|
|
OgDisk.list_disks(self.rest, args[1:])
|
|
elif parsed_args.item == 'repos':
|
|
OgRepo.list_repos(self.rest)
|
|
elif parsed_args.item in ['server', 'servers']:
|
|
OgServer.list_servers(self.rest)
|
|
|
|
def set(self, args):
|
|
choices = ['modes', 'mode', 'server', 'repo']
|
|
parser = argparse.ArgumentParser(prog='ogcli set')
|
|
parser.add_argument('item', choices=choices)
|
|
|
|
if not args:
|
|
print('Missing set subcommand', file=sys.stderr)
|
|
parser.print_help(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
parsed_args = parser.parse_args([args[0]])
|
|
if parsed_args.item in ['modes', 'mode']:
|
|
OgModes.set_modes(self.rest, args[1:])
|
|
elif parsed_args.item == 'repo':
|
|
OgRepo.set_repo(self.rest, args[1:])
|
|
elif parsed_args.item == 'server':
|
|
OgServer.set_server(self.rest, args[1:])
|
|
|
|
def send(self, args):
|
|
choices = ['reboot', 'refresh', 'poweroff', 'wol']
|
|
parser = argparse.ArgumentParser(prog='ogcli send')
|
|
parser.add_argument('send_obj', choices=choices)
|
|
|
|
if not args:
|
|
print('Missing send subcommand', file=sys.stderr)
|
|
parser.print_help(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
parsed_args = parser.parse_args([args[0]])
|
|
if parsed_args.send_obj == 'wol':
|
|
OgWol.send_wol(self.rest, args[1:])
|
|
elif parsed_args.send_obj == 'poweroff':
|
|
OgPoweroff.send_poweroff(self.rest, args[1:])
|
|
elif parsed_args.send_obj == 'refresh':
|
|
OgClient.send_refresh(self.rest, args[1:])
|
|
elif parsed_args.send_obj == 'reboot':
|
|
OgReboot.send_reboot(self.rest, args[1:])
|
|
|
|
def restore(self, args):
|
|
choices = ['image']
|
|
parser = argparse.ArgumentParser(prog='ogcli restore')
|
|
parser.add_argument('send_obj', choices=choices)
|
|
|
|
if not args:
|
|
print('Missing restore subcommand', file=sys.stderr)
|
|
parser.print_help(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
parsed_args = parser.parse_args([args[0]])
|
|
if parsed_args.send_obj == 'image':
|
|
OgImage.restore_image(self.rest, args[1:])
|
|
|
|
def create(self, args):
|
|
choices = ['image']
|
|
parser = argparse.ArgumentParser(prog='ogcli create')
|
|
parser.add_argument('create_obj', choices=choices)
|
|
|
|
if not args:
|
|
print('Missing create subcommand', file=sys.stderr)
|
|
parser.print_help(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
parsed_args = parser.parse_args([args[0]])
|
|
if parsed_args.create_obj == 'image':
|
|
OgImage.create_image(self.rest, args[1:])
|
|
|
|
def setup(self, args):
|
|
choices = ['disk']
|
|
parser = argparse.ArgumentParser(prog='ogcli setup')
|
|
parser.add_argument('setup_obj', choices=choices)
|
|
|
|
if not args:
|
|
print('Missing setup subcommand', file=sys.stderr)
|
|
parser.print_help(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
parsed_args = parser.parse_args([args[0]])
|
|
if parsed_args.setup_obj == 'disk':
|
|
OgDisk.setup_disk(self.rest, args[1:])
|
|
|
|
def update(self, args):
|
|
choices = ['image']
|
|
parser = argparse.ArgumentParser(prog='ogcli update')
|
|
parser.add_argument('update_obj', choices=choices)
|
|
|
|
if not args:
|
|
print('Missing update subcommand', file=sys.stderr)
|
|
parser.print_help(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
parsed_args = parser.parse_args([args[0]])
|
|
if parsed_args.update_obj == 'image':
|
|
OgImage.update_image(self.rest, args[1:])
|
|
|
|
def delete(self, args):
|
|
choices = ['server', 'center']
|
|
parser = argparse.ArgumentParser(prog='ogcli delete')
|
|
parser.add_argument('delete_obj', choices=choices)
|
|
|
|
if not args:
|
|
print('Missing delete subcommand', file=sys.stderr)
|
|
parser.print_help(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
parsed_args = parser.parse_args([args[0]])
|
|
if parsed_args.delete_obj == 'server':
|
|
OgServer.delete_server(self.rest, args[1:])
|
|
elif parsed_args.delete_obj == 'center':
|
|
OgCenter.delete_center(self.rest, args[1:])
|
|
|
|
def add(self, args):
|
|
choices = ['server', 'repo', 'center']
|
|
parser = argparse.ArgumentParser(prog='ogcli add')
|
|
parser.add_argument('add_obj', choices=choices)
|
|
|
|
if not args:
|
|
print('Missing add subcommand', file=sys.stderr)
|
|
parser.print_help(file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
parsed_args = parser.parse_args([args[0]])
|
|
if parsed_args.add_obj == 'server':
|
|
OgServer.add_server(self.rest, args[1:])
|
|
elif parsed_args.add_obj == 'repo':
|
|
OgRepo.add_repo(self.rest, args[1:])
|
|
elif parsed_args.add_obj == 'center':
|
|
OgCenter.add_center(self.rest, args[1:])
|