cli: add live management commands

Add new parameters to the config file ogcli.json with the
following default values:
'local_live': '/var/www/html/ogrelive'
'server_live': 'https://opengnsys.soleta.eu/ogrelive'

Add command to install the files of a live system. Example:
ogcli install live --name ogrelive-6.1.0-26

Perform an update if live files are already present.

Add command to delete the files of a live system. Example:
ogcli delete live --name ogrelive-6.1.0-26

Update ogcli list live to show the lives in the server when
invoked with the --remote flag. Example:
ogcli list live --remote
master v0.3.3-14
Alejandro Sirgo Rica 2024-11-07 16:26:55 +01:00 committed by OpenGnSys Support Team
parent c4eb5d165a
commit 85d910d020
3 changed files with 309 additions and 3 deletions

View File

@ -113,7 +113,7 @@ class OgCLI():
elif parsed_args.item == 'servers':
ret = OgServer.list_servers(self.rest)
elif parsed_args.item == 'live':
ret = OgLive.list_live(self.rest)
ret = OgLive.list_live(self.rest, args[1:])
return ret
def set(self, args):
@ -231,7 +231,7 @@ class OgCLI():
return ret
def delete(self, args):
choices = ['server', 'repo', 'center', 'room', 'client', 'folder']
choices = ['server', 'repo', 'center', 'room', 'client', 'folder', 'live']
parser = argparse.ArgumentParser(prog='ogcli delete')
parser.add_argument('delete_obj', choices=choices)
@ -253,6 +253,8 @@ class OgCLI():
ret = OgClient.delete_client(self.rest, args[1:])
elif parsed_args.delete_obj == 'folder':
ret = OgFolder.delete_folder(self.rest, args[1:])
elif parsed_args.delete_obj == 'live':
ret = OgLive.delete_live(self.rest, args[1:])
return ret
def add(self, args):
@ -280,3 +282,19 @@ class OgCLI():
elif parsed_args.add_obj == 'folder':
ret = OgFolder.add_folder(self.rest, args[1:])
return ret
def install(self, args):
choices = ['live']
parser = argparse.ArgumentParser(prog='ogcli install')
parser.add_argument('install_obj', choices=choices)
if not args:
print('Missing install subcommand', file=sys.stderr)
parser.print_help(file=sys.stderr)
return 1
parsed_args = parser.parse_args([args[0]])
ret = 0
if parsed_args.install_obj == 'live':
ret = OgLive.install_live(self.rest, args[1:])
return ret

View File

@ -7,14 +7,282 @@
import argparse
from cli.config import cfg, OG_CLI_CFG_PATH
from cli.utils import *
import requests
import shutil
import sys
import os
class OgLive():
live_files = [
'initrd.img',
'vmlinuz',
'filesystem.squashfs',
]
tmp_extension = '.tmp'
@staticmethod
def list_live(rest):
def _get_local_live_dir():
local_live_dir = cfg.get('local_live', '/var/www/html/ogrelive')
if not local_live_dir:
print(f'Error: local_live not defined in {OG_CLI_CFG_PATH}')
return None
if not os.path.isdir(local_live_dir):
print(f'Warning: {local_live_dir} directoy does not exist, creating directory')
try:
os.makedirs(local_live_dir, exist_ok=True)
except OSError as e:
print(f'ERROR: Failed to create directory {local_live_dir}: {e}')
return None
return local_live_dir
@staticmethod
def _get_server_base_url():
server_live = cfg.get('server_live', 'https://opengnsys.soleta.eu/ogrelive')
if not server_live:
print(f'Error: server_live not defined in {OG_CLI_CFG_PATH}')
return None
return server_live
def _is_live_in_server(live_name):
server_live = OgLive._get_server_base_url()
target_url = f'{server_live}/{live_name}/{OgLive.live_files[0]}'
response = requests.head(target_url)
return response.status_code == 200
@staticmethod
def _delete_tmp_live_files(live_name):
local_live_dir = OgLive._get_local_live_dir()
if not local_live_dir:
return 1
folder_path = os.path.join(local_live_dir, live_name)
for file_name in os.listdir(folder_path):
if not file_name.endswith(OgLive.tmp_extension):
continue
target_file = os.path.join(folder_path, file_name)
try:
os.remove(target_file)
except OSError as e:
print(f'ERROR: Failed to delete temporary file {target_file}: {e}')
return 1
return 0
@staticmethod
def _download_from_server(file_path, local_extension):
live_file = os.path.basename(file_path)
server_live = OgLive._get_server_base_url()
if not server_live:
return 1
file_url = f'{server_live}/{file_path}'
local_live_dir = OgLive._get_local_live_dir()
if not local_live_dir:
return 1
local_path = os.path.join(local_live_dir, file_path)
if local_extension:
local_path += local_extension
try:
response = requests.get(file_url, stream=True, timeout=(5, None))
except requests.exceptions.RequestException as e:
print(f'Request failed for {file_url}: {e}')
return 1
if response.status_code == 200:
try:
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
except OSError as e:
print(f'File system error occurred: {e}')
return 1
else:
print(f'ERROR: Failed to download {live_file}. Status code: {response.status_code}')
return 1
return 0
@staticmethod
def list_live(rest, args):
parser = argparse.ArgumentParser(prog='ogcli list live')
parser.add_argument('--remote',
action='store_true',
help='(Optional) Obtain the list of the remote instead of the local lives')
parsed_args = parser.parse_args(args)
if parsed_args.remote:
local_live_dir = OgLive._get_local_live_dir()
if not local_live_dir:
return 1
download_err = OgLive._download_from_server('ogrelive.json',
local_extension=OgLive.tmp_extension)
if download_err:
OgLive._delete_tmp_live_files('')
return 1
remote_json = os.path.join(local_live_dir, 'ogrelive.json')
remote_json_tmp = remote_json + OgLive.tmp_extension
try:
shutil.move(remote_json_tmp, remote_json)
except OSError as e:
print(f'ERROR: cannot move {remote_json_tmp} into {remote_json}: {e}')
return 1
try:
with open(remote_json, 'r') as json_file:
remote_data = json_file.read()
except json.JSONDecodeError:
print(f'ERROR: Failed parse malformed JSON file {remote_json}')
return 1
except OSError as e:
print(f'ERROR: cannot open {remote_json}: {e}')
return 1
print_json(remote_data)
return 0
res = rest.get('/oglive/list')
if not res:
return 1
print_json(res.text)
return 0
@staticmethod
def _is_same_checksum(file_path, checksum_path):
local_checksum = compute_md5(file_path)
try:
with open(checksum_path, 'r') as f:
remote_checksum = f.read().strip()
except (FileNotFoundError, PermissionError, OSError) as e:
print(f'ERROR: Cannot read checksum file for {live_file}: {e}')
return False
return local_checksum == remote_checksum
@staticmethod
def install_live(rest, args):
parser = argparse.ArgumentParser(prog='ogcli install live')
parser.add_argument('--name',
nargs='?',
required=True,
help='Name of the center')
parsed_args = parser.parse_args(args)
live_name = parsed_args.name
local_live_dir = OgLive._get_local_live_dir()
if not local_live_dir:
return 1
if not OgLive._is_live_in_server(live_name):
print(f'{live_name} is not available on the server, it cannot be installed')
return 1
local_dir = os.path.join(local_live_dir, live_name)
if os.path.exists(local_dir):
print(f'{live_name} already exists, checking for updates...')
try:
os.makedirs(local_dir, exist_ok=True)
except OSError as e:
print(f'ERROR: Failed to create directory {local_dir}: {e}')
return 1
for live_file in OgLive.live_files:
download_err = OgLive._download_from_server(os.path.join(live_name,
live_file + '.full.sum'),
local_extension=OgLive.tmp_extension)
if download_err:
OgLive._delete_tmp_live_files(live_name)
return download_err
file_path = os.path.join(local_dir, live_file)
file_path_tmp = file_path + OgLive.tmp_extension
checksum_path_tmp = file_path + '.full.sum' + OgLive.tmp_extension
is_first_install = not os.path.exists(file_path)
if is_first_install:
print(f'Downloading {live_file}...')
else:
requires_update = not OgLive._is_same_checksum(file_path, checksum_path_tmp)
if not requires_update:
print(f'{live_file} is up-to-date, skipping')
continue
print(f'Updating {live_file}...')
download_err = OgLive._download_from_server(os.path.join(live_name,
live_file),
local_extension=OgLive.tmp_extension)
if download_err:
OgLive._delete_tmp_live_files(live_name)
return download_err
if not OgLive._is_same_checksum(file_path_tmp, checksum_path_tmp):
print(f'ERROR: Checksum mismatch for {live_file}')
OgLive._delete_tmp_live_files(live_name)
return 1
print(f'Checksum is OK for {live_file}')
for file_name in os.listdir(local_dir):
if not file_name.endswith(OgLive.tmp_extension):
continue
file_path_tmp = os.path.join(local_dir, file_name)
file_path = file_path_tmp[:-len(OgLive.tmp_extension)]
try:
shutil.move(file_path_tmp, file_path)
except OSError as e:
print(f'ERROR: cannot move {src_file} into {target_file}: {e}')
OgLive._delete_tmp_live_files(live_name)
return 1
payload = {'name': live_name}
res = rest.post('/oglive/add', payload=payload)
if not res:
return 1
return 0
@staticmethod
def delete_live(rest, args):
parser = argparse.ArgumentParser(prog='ogcli delete live')
parser.add_argument('--name',
nargs='?',
required=True,
help='Name of the center')
parsed_args = parser.parse_args(args)
live_name = parsed_args.name
payload = {'name': live_name}
res = rest.post('/oglive/delete', payload=payload)
if not res:
return 1
local_live_dir = OgLive._get_local_live_dir()
if not local_live_dir:
return 1
local_dir = os.path.join(local_live_dir, live_name)
if os.path.exists(local_dir):
try:
shutil.rmtree(local_dir)
except OSError as e:
print(f'Error deleting directory {local_dir}: {e}')
return 1
else:
print(f'Error: no directory found for {live_name}')
return 0

View File

@ -8,7 +8,9 @@
import unicodedata
import json
import ipaddress
import hashlib
import re
import os
def scope_lookup(scope_id, scope_type, d):
if scope_id == d.get('id') and scope_type == d.get('type'):
@ -69,3 +71,21 @@ def check_mac_address(addr):
def remove_accents(text):
normalized_text = unicodedata.normalize('NFD', text)
return ''.join(c for c in normalized_text if unicodedata.category(c) != 'Mn')
def compute_md5(path, bs=2**20):
if not os.path.exists(path):
print(f"Failed to calculate checksum, image file {path} does not exist")
return None
m = hashlib.md5()
try:
with open(path, 'rb') as f:
while True:
buf = f.read(bs)
if not buf:
break
m.update(buf)
except Exception as e:
print(f'Failed to calculate checksum for {path}: {e}')
return None
return m.hexdigest()