ogclient/src/live/ogOperations.py

704 lines
25 KiB
Python

#
# Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import datetime
import hashlib
import logging
import os
import subprocess
import shlex
import shutil
from subprocess import Popen, PIPE
import fdisk
from src.ogRest import ThreadState
from src.live.partcodes import GUID_MAP
from src.live.parttypes import get_parttype
from src.utils.image import *
from src.utils.net import ethtool
from src.utils.menu import generate_menu
from src.utils.fs import *
from src.utils.probe import os_probe, get_cache_dev_path
from src.utils.disk import *
from src.utils.cache import *
from src.utils.tiptorrent import *
from src.utils.uefi import *
from src.utils.boot import *
from src.utils.sw_inventory import get_package_set
from src.utils.hw_inventory import get_hardware_inventory, legacy_list_hardware_inventory
from src.log import OgError
OG_SHELL = '/bin/bash'
class OgLiveOperations:
def __init__(self, config):
self._url = config['opengnsys']['url']
self._url_log = config['opengnsys']['url_log']
self._smb_user = config['samba']['user']
self._smb_pass = config['samba']['pass']
def _restartBrowser(self, url):
try:
proc = subprocess.call(["pkill", "-9", "browser"])
proc = subprocess.Popen(["browser", "-qws", url])
except OSError as e:
raise OgError('Cannot restart browser') from e
def _refresh_payload_disk(self, cxt, part_setup, num_disk):
part_setup['disk'] = str(num_disk)
part_setup['disk_type'] = 'DISK'
part_setup['partition'] = '0'
part_setup['filesystem'] = ''
part_setup['os'] = ''
part_setup['size'] = str(cxt.nsectors * cxt.sector_size // 1024)
part_setup['used_size'] = '0'
if not cxt.label:
part_setup['code'] = '0'
else:
part_setup['code'] = '2' if cxt.label.name == 'gpt' else '1'
def _refresh_payload_partition(self, cxt, pa, part_setup, disk):
parttype = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_TYPEID)
fstype = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_FSTYPE)
padev = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE)
size = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_SIZE)
partnum = pa.partno + 1
source = padev
target = padev.replace('dev', 'mnt')
if cxt.label.name == 'gpt':
code = GUID_MAP.get(parttype, 0x0)
else:
code = int(parttype, base=16)
if mount_mkdir(source, target):
probe_result = os_probe(target)
part_setup['os'] = probe_result
part_setup['used_size'] = get_usedperc(target)
umount(target)
else:
part_setup['os'] = ''
part_setup['used_size'] = '0'
part_setup['disk_type'] = ''
part_setup['partition'] = str(partnum)
part_setup['filesystem'] = fstype.upper() if fstype else 'EMPTY'
# part_setup['code'] = hex(code).removeprefix('0x')
part_setup['code'] = hex(code)[2:]
part_setup['size'] = str(int(size) // 1024)
if (part_setup['filesystem'] == 'VFAT'):
part_setup['filesystem'] = 'FAT32'
if (part_setup['filesystem'] == 'SWAP'):
part_setup['filesystem'] = 'LINUX-SWAP'
def _refresh_part_setup_cache(self, cxt, pa, part_setup, cache):
padev = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE)
if padev == cache:
part_setup['filesystem'] = 'CACHE'
part_setup['code'] = 'ca'
def _get_cache_contents(self):
cache_contents = []
if not mount_cache():
return cache_contents
img_dir = OG_CACHE_IMAGE_PATH
if not os.path.isdir(img_dir):
return cache_contents
for file_name in os.listdir(img_dir):
file_path = os.path.join(img_dir, file_name)
if not os.path.isfile(file_path):
continue
if not file_name.endswith('.img'):
continue
checksum_file_path = file_path + '.full.sum'
image_checksum = ''
try:
with open(checksum_file_path, 'r') as f:
image_checksum = f.read()
except (FileNotFoundError, PermissionError, OSError) as e:
logging.info(f'Error reading file at {checksum_file_path}: {e}')
if not image_checksum:
logging.info(f'Warning: empty checksum for image {file_name}')
image_size = os.stat(file_path).st_size
cache_contents.append({
'name': file_name,
'size': image_size,
'checksum': image_checksum})
return cache_contents
def _compute_md5(self, path, bs=2**20):
m = hashlib.md5()
with open(path, 'rb') as f:
while True:
buf = f.read(bs)
if not buf:
break
m.update(buf)
return m.hexdigest()
def _md5_file(self, path):
if not os.path.exists(path):
raise OgError(f'Failed to calculate checksum, image file {path} does not exist')
return self._compute_md5(path)
def _write_md5_file(self, path, checksum):
if not os.path.exists(path):
raise OgError(f'Failed to calculate checksum, image file {path} does not exist')
filename = path + ".full.sum"
try:
with open(filename, 'w') as f:
f.write(checksum)
except:
logging.error(f'Cannot write checksum {filename}')
return -1
return 0
def _copy_image_to_cache(self, image_name):
"""
Copies /opt/opengnsys/image/{image_name} into
/opt/opengnsys/cache/opt/opengnsys/images/
Implies a unicast transfer. Does not use tiptorrent.
"""
src = f'/opt/opengnsys/images/{image_name}.img'
dst = f'{OG_CACHE_IMAGE_PATH}{image_name}.img'
try:
logging.info(f'Fetching image {image_name} from {src}')
logging.info('*DO NOT REBOOT OR POWEROFF* the client during this time')
r = shutil.copy(src, dst)
tip_write_csum(image_name)
except (OSError, OgError) as e:
raise OgError(f'Error copying image {image_name} to cache. Reported: {e}') from e
def _restore_image_unicast(self, repo, name, devpath, cache=False):
if ogChangeRepo(repo, smb_user=self._smb_user, smb_pass=self._smb_pass) != 0:
self._restartBrowser(self._url)
raise OgError(f'Cannot change repository to {repo}')
if cache:
if not get_cache_dev_path():
raise OgError('No cache partition is mounted')
image_path = f'{OG_CACHE_IMAGE_PATH}{name}.img'
if (not os.path.exists(image_path) or
not tip_check_csum(repo, name)):
self._copy_image_to_cache(name)
else:
if os.access(f'/opt/opengnsys/images', os.R_OK) == False:
raise OgError('Cannot access /opt/opengnsys/images in read mode, check permissions')
image_path = f'/opt/opengnsys/images/{name}.img'
self._restore_image(image_path, devpath)
def _restore_image_tiptorrent(self, repo, name, devpath):
if not get_cache_dev_path():
raise OgError('No cache partition is mounted')
image_path = f'{OG_CACHE_IMAGE_PATH}{name}.img'
try:
if (not os.path.exists(image_path) or not tip_check_csum(repo, name)):
tip_client_get(repo, name)
except:
self._restartBrowser(self._url)
raise
if (not os.path.exists(image_path)):
raise OgError(f'could not find {image_path} in cache')
if (not tip_check_csum(repo, name)):
raise OgError(f'Failed to validate checksum for {name}.img')
self._restore_image(image_path, devpath)
def _restore_image(self, image_path, devpath):
logging.info(f'Restoring image at {image_path} into {devpath}')
logging.info('*DO NOT REBOOT OR POWEROFF* the client during this time')
cmd_lzop = shlex.split(f'lzop -dc {image_path}')
cmd_pc = shlex.split(f'partclone.restore -d0 -C -I -o {devpath}')
if not os.path.exists(image_path):
raise OgError(f'Image not found at {image_path} during image restore')
with open('/tmp/command.log', 'wb', 0) as logfile:
proc_lzop = subprocess.Popen(cmd_lzop,
stdout=subprocess.PIPE)
proc_pc = subprocess.Popen(cmd_pc,
stdin=proc_lzop.stdout,
stderr=logfile)
proc_lzop.stdout.close()
proc_pc.communicate()
def _ogbrowser_clear_logs(self):
logfiles = ['/tmp/command.log', '/tmp/session.log']
for logfile in logfiles:
with open(logfile, 'wb', 0) as f:
f.truncate(0)
def _poweroff_oglive(self, operation='poweroff'):
interface = os.getenv('DEVICE')
cmd_ethtool = shlex.split(f'ethtool -s {interface} wol g')
cmd_browser = shlex.split('pkill -9 browser')
if not shutil.which('busyboxOLD'):
busybox = 'busybox'
else:
busybox = shutil.which('busyboxOLD')
cmd_busybox = shlex.split(f'{busybox} {operation}')
umount_all()
umount_cache()
if subprocess.run(cmd_ethtool).returncode != 0:
logging.error('Error running ethtool subprocess')
if subprocess.run(cmd_browser).returncode != 0:
logging.error('Error terminating ogBrowser process')
if subprocess.run(cmd_busybox) != 0:
logging.error('Error running "busybox poweroff" subprocess')
def poweroff(self):
logging.info('Powering off client')
self._poweroff_oglive()
def reboot(self):
logging.info('Rebooting client')
self._poweroff_oglive(operation='reboot')
def shellrun(self, request, ogRest):
cmd = request.getrun()
cmds = cmd.split(";|\n\r")
self._restartBrowser(self._url_log)
shell_path = '/opt/opengnsys/shell/'
restricted_mode = False
for file_name in os.listdir(shell_path):
file_path = os.path.join(shell_path, file_name)
if cmds[0] == file_name:
cmds[0] = file_path
restricted_mode = True
break
try:
if restricted_mode:
ogRest.proc = subprocess.Popen(cmds, stdout=subprocess.PIPE)
else:
ogRest.proc = subprocess.Popen(cmds,
stdout=subprocess.PIPE,
shell=True,
executable=OG_SHELL)
(output, error) = ogRest.proc.communicate()
except OSError as e:
raise OgError(f'Error when running "shell run" subprocess: {e}') from e
if ogRest.proc.returncode != 0:
logging.warn('Non zero exit code when running: %s', ' '.join(cmds))
else:
logging.info('Shell run command OK')
self.refresh(ogRest)
return (ogRest.proc.returncode, " ".join(cmds), output.decode('utf-8'))
def session(self, request, ogRest):
disk = request.getDisk()
partition = request.getPartition()
boot_os_at(int(disk), int(partition))
self.reboot()
def software(self, request, ogRest):
disk = request.getDisk()
partition = request.getPartition()
partdev = get_partition_device(int(disk), int(partition))
mountpoint = partdev.replace('dev', 'mnt')
if not mount_mkdir(partdev, mountpoint):
raise OgError(f'Error mounting {partdev} at {mountpoint}')
if not os.path.ismount(mountpoint):
raise OgError(f'Invalid mountpoint {mountpoint} for software inventory')
self._restartBrowser(self._url_log)
pkgset = get_package_set(mountpoint)
self._restartBrowser(self._url)
umount(mountpoint)
logging.info('Software inventory command OK')
# Software inventory result is still processed by legacy server code
# (ogAdmServer.c). Legacy response format is string where each
# software package is separated by a newline '\n'.
# Each package/software line follows this format:
# "{package_name} {package_version}"
return '\n'.join(map(str,pkgset))
def hardware(self, ogRest):
self._restartBrowser(self._url_log)
logging.info('Running hardware inventory command')
try:
inventory = get_hardware_inventory()
except Exception as e:
raise OgError(f'Error while running hardware inventory. {e}') from e
finally:
self._restartBrowser(self._url)
result = legacy_list_hardware_inventory(inventory)
logging.info('Successful hardware inventory command execution')
return result
def setup(self, request, ogRest):
table_type = request.getType()
disk = request.getDisk()
cache = request.getCache()
cache_size = request.getCacheSize()
partlist = request.getPartitionSetup()
self._ogbrowser_clear_logs()
self._restartBrowser(self._url_log)
umount_all()
umount_cache()
diskname = get_disks()[int(disk)-1]
cxt = fdisk.Context(f'/dev/{diskname}',
details=True)
if table_type == 'MSDOS':
cxt.create_disklabel('dos')
elif table_type == 'GPT':
cxt.create_disklabel('gpt')
else:
raise OgError(f'Unsupported partition scheme {table_type}, only MSDOS and GPT are supported')
logging.info(f'Setting up partition layout to {table_type}')
for part in partlist:
logging.info(f'Creating partition {part["partition"]} with {part["code"]} of {int(part["size"])//1024} Mbytes')
if part["code"] == 'EMPTY':
continue
if ogRest.terminated:
break
pa = fdisk.Partition(start_follow_default=True,
end_follow_default=False,
partno_follow_default=False)
parttype = get_parttype(cxt, part["code"])
size = int(part["size"])
pa.size = (size * (1 << 10)) // cxt.sector_size
pa.partno = int(part["partition"]) - 1
pa.type = parttype
cxt.add_partition(pa)
cxt.write_disklabel()
os.sync()
ret = subprocess.run(['partprobe', f'/dev/{diskname}'])
logging.info(f'first partprobe /dev/{diskname} reports {ret.returncode}')
for part in partlist:
if part["filesystem"] == 'EMPTY':
continue
fs = part["filesystem"].lower()
logging.info(f'Formatting partition {part["partition"]} with filesystem {part["filesystem"]}')
partition = int(part["partition"])
if fs == 'cache':
err = mkfs('ext4', int(disk), partition, label='CACHE')
else:
err = mkfs(fs, int(disk), partition)
if err == -1:
raise OgError(f'Failed to format {part["partition"]} with filesystem {part["filesystem"]}')
ret = subprocess.run(['partprobe', f'/dev/{diskname}'])
logging.info(f'second partprobe /dev/{diskname} reports {ret.returncode}')
for part in partlist:
if part["filesystem"] == 'EMPTY':
continue
fs = part["filesystem"].lower()
if fs == 'cache':
init_cache()
logging.info('Partition setup command OK')
result = self.refresh(ogRest)
self._restartBrowser(self._url)
return result
def image_restore(self, request, ogRest):
disk = request.getDisk()
partition = request.getPartition()
name = request.getName()
repo = request.getRepo()
ctype = request.getType()
profile = request.getProfile()
cid = request.getId()
partdev = get_partition_device(int(disk), int(partition))
self._ogbrowser_clear_logs()
self._restartBrowser(self._url_log)
logging.info(f'Request to restore image {name}.img via {ctype} from {repo}')
if shutil.which('restoreImageCustom'):
restoreImageCustom(repo, name, disk, partition, ctype)
elif 'UNICAST' in ctype:
cache = 'DIRECT' not in ctype
self._restore_image_unicast(repo, name, partdev, cache)
elif ctype == 'TIPTORRENT':
self._restore_image_tiptorrent(repo, name, partdev)
configureOs(disk, partition)
self.refresh(ogRest)
logging.info('Image restore command OK')
json_dict = {
'disk': request.getDisk(),
'partition': request.getPartition(),
'image_id': request.getId(),
'cache': self._get_cache_contents(),
}
return json_dict
def image_create(self, request, ogRest):
disk = int(request.getDisk())
partition = int(request.getPartition())
name = request.getName()
repo = request.getRepo()
backup = request.getBackup()
image_path = f'/opt/opengnsys/images/{name}.img'
self._ogbrowser_clear_logs()
self._restartBrowser(self._url_log)
logging.info(f'Request to create image {name}.img at repository {repo}')
if ogChangeRepo(repo, smb_user=self._smb_user, smb_pass=self._smb_pass) != 0:
self._restartBrowser(self._url)
raise OgError(f'Cannot change image repository to {repo}')
if ogRest.terminated:
return
diskname = get_disks()[disk-1]
cxt = fdisk.Context(f'/dev/{diskname}', details=True)
pa = None
for i, p in enumerate(cxt.partitions):
if (p.partno + 1) == partition:
pa = cxt.partitions[i]
if pa is None:
self._restartBrowser(self._url)
raise OgError(f'Target partition /dev/{diskname} not found')
padev = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_DEVICE)
fstype = cxt.partition_to_string(pa, fdisk.FDISK_FIELD_FSTYPE)
if not fstype:
raise OgError(f'No filesystem detected in {padev}. Aborting image creation')
if change_access(user=self._smb_user, pwd=self._smb_pass) == -1:
raise OgError('remount of /opt/opengnsys/images has failed')
if os.access(f'/opt/opengnsys/images', os.R_OK | os.W_OK) == False:
raise OgError('Cannot access /opt/opengnsys/images in read and write mode, check permissions')
if os.access(f'{image_path}', os.R_OK) == True:
logging.info(f'image file {image_path} already exists, updating.')
copy_windows_efi_bootloader(disk, partition)
if ogReduceFs(disk, partition) == -1:
raise OgError(f'Failed to shrink {fstype} filesystem in {padev}')
cmd1 = shlex.split(f'partclone.{fstype} -I -C --clone -s {padev} -O -')
cmd2 = shlex.split(f'lzop -1 -fo {image_path}')
logfile = open('/tmp/command.log', 'wb', 0)
try:
if os.path.exists(image_path) and backup:
shutil.move(image_path, f'{image_path}.ant')
except OSError as e:
raise OgError(f'Cannot create backup for {image_path}: {e}') from e
try:
p1 = Popen(cmd1, stdout=PIPE, stderr=logfile)
p2 = Popen(cmd2, stdin=p1.stdout)
p1.stdout.close()
logging.info(f'Creating image at {image_path} from {padev} using {fstype}')
logging.info('*DO NOT REBOOT OR POWEROFF* the client during this time')
try:
retdata = p2.communicate()
except OSError as e:
raise OgError(f'Unexpected error when running partclone and lzop commands: {e}') from e
finally:
logfile.close()
p2.terminate()
p1.poll()
logging.info(f'partclone process exited with code {p1.returncode}')
logging.info(f'lzop process exited with code {p2.returncode}')
ogExtendFs(disk, partition)
if os.access(f'{image_path}', os.R_OK) == False:
raise OgError(f'Cannot access partclone image file {image_path}')
image_info = get_image_info(image_path)
except Exception as e:
if os.path.exists(image_path):
os.unlink(image_path)
if os.path.exists(f'{image_path}.ant'):
shutil.move(f'{image_path}.ant', image_path)
self._restartBrowser(self._url)
if isinstance(e, OgError):
raise OgError(f'Failed to create image for {fstype} filesystem in device {padev}: {e}') from e
else:
raise
logging.info(f'Writing checksum file {name}.img.full.sum...')
logging.info('*DO NOT REBOOT OR POWEROFF* the client during this time')
checksum = self._md5_file(f'/opt/opengnsys/images/{name}.img')
if checksum == -1:
raise OgError(f'Cannot access {name}.full.sum file')
if self._write_md5_file(f'/opt/opengnsys/images/{name}.img', checksum) == -1:
raise OgError(f'Cannot write {name}.full.sum file')
image_info.checksum = checksum
self._restartBrowser(self._url)
logging.info('Image creation command OK')
return image_info
def cache_delete(self, request, ogRest):
images = request.getImages()
deleted_images = []
logging.info(f'Request to remove files from cache')
if not mount_cache():
logging.error(f'Cache is not mounted')
return
img_dir = OG_CACHE_IMAGE_PATH
if not os.path.isdir(img_dir):
logging.error(f'{img_dir} is not a directory')
return cache_contents
for file_name in os.listdir(img_dir):
file_path = os.path.join(img_dir, file_name)
if not os.path.isfile(file_path):
continue
if not file_name.endswith('.img'):
continue
if os.sep in file_name:
logging.info(f'Detected cache deletion request of filename with a path, ignoring {file_name}')
continue
if file_name in images:
logging.info(f'Removing file {file_path} from cache')
os.remove(file_path)
csum_path = file_path + '.full.sum'
if not os.path.isfile(csum_path):
logging.info(f'Missing checksum file {csum_path}')
continue
logging.info(f'Removing checksum file {csum_path} from cache')
os.remove(csum_path)
result = {'cache': self._get_cache_contents()}
self._restartBrowser(self._url)
logging.info('Delete file from cache request OK')
return result
def refresh(self, ogRest):
self._restartBrowser(self._url_log)
cache = get_cache_dev_path()
disks = get_disks()
interface = os.getenv('DEVICE')
link = ethtool(interface)
json_body = { 'serial_number': '',
'disk_setup': [],
'partition_setup': [],
'link': link
}
for num_disk, disk in enumerate(get_disks(), start=1):
logging.debug('refresh: processing %s', disk)
part_setup = {}
try:
cxt = fdisk.Context(device=f'/dev/{disk}', details=True)
except:
continue
self._refresh_payload_disk(cxt, part_setup, num_disk)
json_body['disk_setup'].append(part_setup)
for pa in cxt.partitions:
part_setup = part_setup.copy()
self._refresh_payload_partition(cxt, pa, part_setup, disk)
self._refresh_part_setup_cache(cxt, pa, part_setup, cache)
json_body['partition_setup'].append(part_setup)
json_body['cache'] = self._get_cache_contents()
generate_menu(json_body['partition_setup'])
generate_cache_txt()
self._restartBrowser(self._url)
logging.info('Sending response to refresh request')
return json_body
def probe(self, ogRest):
interface = os.getenv('DEVICE')
speed = ethtool(interface)
return {'status': 'OPG' if ogRest.state != ThreadState.BUSY else 'BSY',
'speed': speed}