mirror of https://git.48k.eu/ogcli/
311 lines
11 KiB
Python
311 lines
11 KiB
Python
# Copyright (C) 2020-2024 Soleta Networks <opengnsys@soleta.eu>
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify it under
|
|
# the terms of the GNU Affero General Public License as published by the
|
|
# Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
import argparse
|
|
|
|
from cli.config import cfg, OG_CLI_CFG_PATH
|
|
from cli.utils import *
|
|
import requests
|
|
import shutil
|
|
import sys
|
|
import os
|
|
|
|
class OgLive():
|
|
|
|
live_files = [
|
|
'initrd.img',
|
|
'vmlinuz',
|
|
'filesystem.squashfs',
|
|
]
|
|
tmp_extension = '.tmp'
|
|
|
|
@staticmethod
|
|
def _get_local_live_dir():
|
|
local_live_dir = cfg.get('local_live', '/var/www/html/ogrelive')
|
|
if not local_live_dir:
|
|
print(f'Error: local_live not defined in {OG_CLI_CFG_PATH}')
|
|
return None
|
|
|
|
if not os.path.isdir(local_live_dir):
|
|
print(f'Warning: {local_live_dir} directoy does not exist, creating directory')
|
|
try:
|
|
os.makedirs(local_live_dir, exist_ok=True)
|
|
except OSError as e:
|
|
print(f'ERROR: Failed to create directory {local_live_dir}: {e}')
|
|
return None
|
|
return local_live_dir
|
|
|
|
@staticmethod
|
|
def _get_server_base_url():
|
|
server_live = cfg.get('server_live', 'https://opengnsys.soleta.eu/ogrelive')
|
|
if not server_live:
|
|
print(f'Error: server_live not defined in {OG_CLI_CFG_PATH}')
|
|
return None
|
|
return server_live
|
|
|
|
def _is_live_in_server(live_name):
|
|
server_live = OgLive._get_server_base_url()
|
|
target_url = f'{server_live}/{live_name}/{OgLive.live_files[0]}'
|
|
response = requests.head(target_url)
|
|
return response.status_code == 200
|
|
|
|
@staticmethod
|
|
def _delete_tmp_live_files(live_name):
|
|
local_live_dir = OgLive._get_local_live_dir()
|
|
|
|
if not local_live_dir:
|
|
return 1
|
|
|
|
folder_path = os.path.join(local_live_dir, live_name)
|
|
|
|
for file_name in os.listdir(folder_path):
|
|
if not file_name.endswith(OgLive.tmp_extension):
|
|
continue
|
|
target_file = os.path.join(folder_path, file_name)
|
|
try:
|
|
os.remove(target_file)
|
|
except OSError as e:
|
|
print(f'ERROR: Failed to delete temporary file {target_file}: {e}')
|
|
return 1
|
|
return 0
|
|
|
|
@staticmethod
|
|
def _download_from_server(file_path, local_extension):
|
|
live_file = os.path.basename(file_path)
|
|
|
|
server_live = OgLive._get_server_base_url()
|
|
if not server_live:
|
|
return 1
|
|
file_url = f'{server_live}/{file_path}'
|
|
|
|
local_live_dir = OgLive._get_local_live_dir()
|
|
|
|
if not local_live_dir:
|
|
return 1
|
|
|
|
local_path = os.path.join(local_live_dir, file_path)
|
|
if local_extension:
|
|
local_path += local_extension
|
|
|
|
try:
|
|
response = requests.get(file_url, stream=True, timeout=(5, None))
|
|
except requests.exceptions.RequestException as e:
|
|
print(f'Request failed for {file_url}: {e}')
|
|
return 1
|
|
|
|
if response.status_code == 200:
|
|
try:
|
|
with open(local_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
except OSError as e:
|
|
print(f'File system error occurred: {e}')
|
|
return 1
|
|
else:
|
|
print(f'ERROR: Failed to download {live_file}. Status code: {response.status_code}')
|
|
return 1
|
|
return 0
|
|
|
|
@staticmethod
|
|
def list_live(rest, args):
|
|
parser = argparse.ArgumentParser(prog='ogcli list live')
|
|
parser.add_argument('--remote',
|
|
action='store_true',
|
|
help='(Optional) Obtain the list of the remote instead of the local lives')
|
|
parsed_args = parser.parse_args(args)
|
|
|
|
if parsed_args.remote:
|
|
local_live_dir = OgLive._get_local_live_dir()
|
|
|
|
if not local_live_dir:
|
|
return 1
|
|
|
|
download_err = OgLive._download_from_server('ogrelive.json',
|
|
local_extension=OgLive.tmp_extension)
|
|
if download_err:
|
|
OgLive._delete_tmp_live_files('')
|
|
return 1
|
|
|
|
remote_json = os.path.join(local_live_dir, 'ogrelive.json')
|
|
remote_json_tmp = remote_json + OgLive.tmp_extension
|
|
try:
|
|
shutil.move(remote_json_tmp, remote_json)
|
|
except OSError as e:
|
|
print(f'ERROR: cannot move {remote_json_tmp} into {remote_json}: {e}')
|
|
return 1
|
|
|
|
try:
|
|
with open(remote_json, 'r') as json_file:
|
|
remote_data = json_file.read()
|
|
except json.JSONDecodeError:
|
|
print(f'ERROR: Failed parse malformed JSON file {remote_json}')
|
|
return 1
|
|
except OSError as e:
|
|
print(f'ERROR: cannot open {remote_json}: {e}')
|
|
return 1
|
|
|
|
print_json(remote_data)
|
|
return 0
|
|
|
|
res = rest.get('/oglive/list')
|
|
if not res:
|
|
return 1
|
|
print_json(res.text)
|
|
return 0
|
|
|
|
@staticmethod
|
|
def _is_same_checksum(file_path, checksum_path):
|
|
local_checksum = compute_md5(file_path)
|
|
try:
|
|
with open(checksum_path, 'r') as f:
|
|
remote_checksum = f.read().strip()
|
|
except (FileNotFoundError, PermissionError, OSError) as e:
|
|
print(f'ERROR: Cannot read checksum file for {live_file}: {e}')
|
|
return False
|
|
return local_checksum == remote_checksum
|
|
|
|
@staticmethod
|
|
def install_live(rest, args):
|
|
parser = argparse.ArgumentParser(prog='ogcli install live')
|
|
parser.add_argument('--name',
|
|
nargs='?',
|
|
required=True,
|
|
help='Name of the center')
|
|
parsed_args = parser.parse_args(args)
|
|
|
|
live_name = parsed_args.name
|
|
|
|
local_live_dir = OgLive._get_local_live_dir()
|
|
|
|
if not local_live_dir:
|
|
return 1
|
|
|
|
if not OgLive._is_live_in_server(live_name):
|
|
print(f'{live_name} is not available on the server, it cannot be installed')
|
|
return 1
|
|
|
|
local_dir = os.path.join(local_live_dir, live_name)
|
|
if os.path.exists(local_dir):
|
|
print(f'{live_name} already exists, checking for updates...')
|
|
try:
|
|
os.makedirs(local_dir, exist_ok=True)
|
|
except OSError as e:
|
|
print(f'ERROR: Failed to create directory {local_dir}: {e}')
|
|
return 1
|
|
|
|
for live_file in OgLive.live_files:
|
|
download_err = OgLive._download_from_server(os.path.join(live_name,
|
|
live_file + '.full.sum'),
|
|
local_extension=OgLive.tmp_extension)
|
|
if download_err:
|
|
OgLive._delete_tmp_live_files(live_name)
|
|
return download_err
|
|
|
|
file_path = os.path.join(local_dir, live_file)
|
|
file_path_tmp = file_path + OgLive.tmp_extension
|
|
checksum_path_tmp = file_path + '.full.sum' + OgLive.tmp_extension
|
|
|
|
is_first_install = not os.path.exists(file_path)
|
|
if is_first_install:
|
|
print(f'Downloading {live_file}...')
|
|
else:
|
|
requires_update = not OgLive._is_same_checksum(file_path, checksum_path_tmp)
|
|
if not requires_update:
|
|
print(f'{live_file} is up-to-date, skipping')
|
|
continue
|
|
|
|
print(f'Updating {live_file}...')
|
|
download_err = OgLive._download_from_server(os.path.join(live_name,
|
|
live_file),
|
|
local_extension=OgLive.tmp_extension)
|
|
if download_err:
|
|
OgLive._delete_tmp_live_files(live_name)
|
|
return download_err
|
|
|
|
if not OgLive._is_same_checksum(file_path_tmp, checksum_path_tmp):
|
|
print(f'ERROR: Checksum mismatch for {live_file}')
|
|
OgLive._delete_tmp_live_files(live_name)
|
|
return 1
|
|
|
|
print(f'Checksum is OK for {live_file}')
|
|
|
|
for file_name in os.listdir(local_dir):
|
|
if not file_name.endswith(OgLive.tmp_extension):
|
|
continue
|
|
file_path_tmp = os.path.join(local_dir, file_name)
|
|
file_path = file_path_tmp[:-len(OgLive.tmp_extension)]
|
|
try:
|
|
shutil.move(file_path_tmp, file_path)
|
|
except OSError as e:
|
|
print(f'ERROR: cannot move {src_file} into {target_file}: {e}')
|
|
OgLive._delete_tmp_live_files(live_name)
|
|
return 1
|
|
|
|
payload = {'name': live_name}
|
|
res = rest.post('/oglive/add', payload=payload)
|
|
|
|
if not res:
|
|
return 1
|
|
return 0
|
|
|
|
@staticmethod
|
|
def delete_live(rest, args):
|
|
parser = argparse.ArgumentParser(prog='ogcli delete live')
|
|
parser.add_argument('--name',
|
|
nargs='?',
|
|
required=True,
|
|
help='Name of the center')
|
|
parsed_args = parser.parse_args(args)
|
|
live_name = parsed_args.name
|
|
|
|
payload = {'name': live_name}
|
|
res = rest.post('/oglive/delete', payload=payload)
|
|
|
|
if not res:
|
|
return 1
|
|
|
|
local_live_dir = OgLive._get_local_live_dir()
|
|
|
|
if not local_live_dir:
|
|
return 1
|
|
|
|
local_dir = os.path.join(local_live_dir, live_name)
|
|
|
|
if os.path.exists(local_dir):
|
|
try:
|
|
shutil.rmtree(local_dir)
|
|
except OSError as e:
|
|
print(f'Error deleting directory {local_dir}: {e}')
|
|
return 1
|
|
else:
|
|
print(f'Error: no directory found for {live_name}')
|
|
|
|
return 0
|
|
|
|
@staticmethod
|
|
def set_live(rest, args):
|
|
parser = argparse.ArgumentParser(prog='ogcli set live')
|
|
parser.add_argument('--default',
|
|
action='store_true',
|
|
required=True,
|
|
help='set the default live image')
|
|
parser.add_argument('--name',
|
|
nargs='?',
|
|
required=True,
|
|
help='Name of the live')
|
|
parsed_args = parser.parse_args(args)
|
|
live_name = parsed_args.name
|
|
|
|
payload = {'name': live_name}
|
|
res = rest.post('/oglive/default', payload=payload)
|
|
|
|
if not res:
|
|
return 1
|
|
|
|
return 0
|