ogclient/src/virtual/ogOperations.py

626 lines
23 KiB
Python

#
# Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
from src.ogRest import ThreadState
from src.virtual import poweroffd
import socket
import errno
import select
import json
import subprocess
import shutil
import os
import guestfs
import hivex
import pathlib
import re
import math
import sys
import enum
import time
from src.log import OgError
class OgVM:
DEFAULT_CPU = 'host'
DEFAULT_VGA = 'virtio-vga'
DEFAULT_QMP_IP = 'localhost'
DEFAULT_QMP_PORT = 4444
class State(enum.Enum):
STOPPED = 0
RUNNING = 1
def __init__(self,
partition_path,
memory=None,
cpu=DEFAULT_CPU,
vga=DEFAULT_VGA,
qmp_ip=DEFAULT_QMP_IP,
qmp_port=DEFAULT_QMP_PORT,
vnc_params=None):
self.partition_path = partition_path
self.cpu = cpu
self.vga = vga
self.qmp_ip = qmp_ip
self.qmp_port = qmp_port
self.proc = None
self.vnc_params = vnc_params
if memory:
self.mem = memory
else:
available_ram = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
available_ram = available_ram / 2 ** 20
# Calculate the lower power of 2 amout of RAM memory for the VM.
self.mem = 2 ** math.floor(math.log(available_ram) / math.log(2))
def run_vm(self):
if self.vnc_params:
vnc_str = f'-vnc 0.0.0.0:0,password'
else:
vnc_str = ''
cmd = (f'qemu-system-x86_64 -accel kvm -cpu {self.cpu} -smp 4 '
f'-drive file={self.partition_path},if=virtio '
f'-qmp tcp:localhost:4444,server,nowait '
f'-device {self.vga} -display gtk '
f'-m {self.mem}M -boot c -full-screen {vnc_str}')
self.proc = subprocess.Popen([cmd], shell=True)
if self.vnc_params:
# Wait for QMP to be available.
time.sleep(20)
cmd = { "execute": "change",
"arguments": { "device": "vnc",
"target": "password",
"arg": str(self.vnc_params['pass']) } }
with OgQMP(self.qmp_ip, self.qmp_port) as qmp:
qmp.talk(str(cmd))
class OgQMP:
QMP_TIMEOUT = 5
QMP_POWEROFF_TIMEOUT = 300
def __init__(self, ip, port):
self.ip = ip
self.port = port
def connect(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setblocking(0)
try:
self.sock.connect((self.ip, self.port))
except socket.error as err:
if err.errno == errno.ECONNREFUSED:
raise OgError('Cannot connect to QEMU')
elif err.errno == errno.EINPROGRESS:
pass
readset = [ self.sock ]
readable, writable, exception = select.select(readset,
[],
[],
OgQMP.QMP_TIMEOUT)
if self.sock in readable:
try:
out = self.recv()
except:
pass
if 'QMP' not in out:
raise OgError('Cannot handshake QEMU')
out = self.talk(str({"execute": "qmp_capabilities"}))
if 'return' not in out:
raise OgError('Cannot handshake QEMU')
def disconnect(self):
try:
self.sock.close()
except:
pass
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.disconnect()
def talk(self, data, timeout=QMP_TIMEOUT):
writeset = [ self.sock ]
readable, writable, exception = select.select([],
writeset,
[],
timeout)
if self.sock in writable:
try:
self.sock.send(bytes(data, 'utf-8'))
except Exception as e:
raise OgError('Cannot talk to QEMU') from e
else:
raise OgError('Timeout when talking to QEMU')
return self.recv(timeout=timeout)
def recv(self, timeout=QMP_TIMEOUT):
readset = [self.sock]
readable, _, _ = select.select(readset, [], [], timeout)
if self.sock in readable:
try:
out = self.sock.recv(4096).decode('utf-8')
out = json.loads(out)
except socket.error as err:
raise OgError('Cannot talk to QEMU') from err
else:
raise OgError('Timeout when talking to QEMU')
return out
class OgVirtualOperations:
def __init__(self):
self.IP = '127.0.0.1'
self.VIRTUAL_PORT = 4444
self.USABLE_DISK = 0.75
self.OG_PATH = os.path.dirname(os.path.realpath(sys.argv[0]))
self.OG_IMAGES_PATH = f'{self.OG_PATH}/images'
self.OG_PARTITIONS_PATH = f'{self.OG_PATH}/partitions'
self.OG_PARTITIONS_CFG_PATH = f'{self.OG_PATH}/partitions.json'
if not os.path.exists(self.OG_IMAGES_PATH):
os.mkdir(self.OG_IMAGES_PATH, mode=0o755)
if not os.path.exists(self.OG_PARTITIONS_PATH):
os.mkdir(self.OG_PARTITIONS_PATH, mode=0o755)
def poweroff_guest(self):
try:
with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
qmp.talk(str({"execute": "system_powerdown"}))
out = qmp.recv()
assert(out['event'] == 'POWERDOWN')
out = qmp.recv(timeout=OgQMP.QMP_POWEROFF_TIMEOUT)
assert(out['event'] == 'SHUTDOWN')
except:
return
def poweroff_host(self):
subprocess.run(['/sbin/poweroff'])
def poweroff(self):
self.poweroff_guest()
self.poweroff_host()
def reboot(self):
try:
with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
qmp.talk(str({"execute": "system_reset"}))
except:
pass
def check_vm_state(self):
try:
with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
pass
return OgVM.State.RUNNING
except:
return OgVM.State.STOPPED
def get_installed_os(self):
installed_os = {}
try:
with open(self.OG_PARTITIONS_CFG_PATH, 'r') as f:
cfg = json.loads(f.read())
for part in cfg['partition_setup']:
if len(part['os']) > 0:
installed_os[part['os']] = (part['disk'], part['partition'])
except:
pass
return installed_os
def check_vm_state_loop(self, ogRest):
# If we can't connect, wait until it's possible.
while True:
try:
with socket.create_connection((poweroffd.QMP_DEFAULT_HOST,
poweroffd.QMP_DEFAULT_PORT)):
break
except ConnectionRefusedError:
time.sleep(1)
qmpconn = poweroffd.init()
if poweroffd.run(qmpconn) < 0:
return
self.poweroff_host()
def shellrun(self, request, ogRest):
return (0, "", "")
def session(self, request, ogRest):
disk = request.getDisk()
partition = request.getPartition()
part_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{partition}.qcow2'
if ogRest.CONFIG['vnc']['activate']:
qemu = OgVM(part_path, vnc_params=ogRest.CONFIG['vnc'])
else:
qemu = OgVM(part_path)
qemu.run_vm()
def partitions_cfg_to_json(self, data):
for part in data['partition_setup']:
part.pop('virt-drive')
for k, v in part.items():
part[k] = str(v)
for disk in data['disk_setup']:
for k, v in disk.items():
disk[k] = str(v)
return data
def refresh(self, ogRest):
try:
# Return last partitions setup in case VM is running.
with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
pass
with open(self.OG_PARTITIONS_CFG_PATH, 'r') as f:
data = json.loads(f.read())
data = self.partitions_cfg_to_json(data)
return data
except:
pass
try:
with open(self.OG_PARTITIONS_CFG_PATH, 'r+') as f:
data = json.loads(f.read())
for part in data['partition_setup']:
if len(part['virt-drive']) > 0:
if not os.path.exists(part['virt-drive']):
part['code'] = '',
part['filesystem'] = 'EMPTY'
part['os'] = ''
part['size'] = 0
part['used_size'] = 0
part['virt-drive'] = ''
continue
g = guestfs.GuestFS(python_return_dict=True)
g.add_drive_opts(part['virt-drive'],
format="qcow2",
readonly=1)
g.launch()
devices = g.list_devices()
assert(len(devices) == 1)
partitions = g.list_partitions()
assert(len(partitions) == 1)
filesystems_dict = g.list_filesystems()
assert(len(filesystems_dict) == 1)
g.mount(partitions[0], '/')
used_disk = g.du('/')
g.umount_all()
total_size = g.disk_virtual_size(part['virt-drive']) / 1024
part['used_size'] = int(100 * used_disk / total_size)
part['size'] = total_size
root = g.inspect_os()
if len(root) == 1:
part['os'] = f'{g.inspect_get_distro(root[0])} ' \
f'{g.inspect_get_product_name(root[0])}'
else:
part['os'] = ''
filesystem = [fs for fs in filesystems_dict.values()][0]
part['filesystem'] = filesystem.upper()
if filesystem == 'ext4':
part['code'] = '0083'
elif filesystem == 'ntfs':
part['code'] = '0007'
g.close()
f.seek(0)
f.write(json.dumps(data, indent=4))
f.truncate()
except FileNotFoundError:
total_disk, used_disk, free_disk = shutil.disk_usage("/")
free_disk = int(free_disk * self.USABLE_DISK)
data = {'serial_number': '',
'disk_setup': [{'disk': 1,
'partition': 0,
'code': '0',
'filesystem': '',
'os': '',
'size': int(free_disk / 1024),
'used_size': int(100 * used_disk / total_disk)}],
'partition_setup': []}
for i in range(4):
part_json = {'disk': 1,
'partition': i + 1,
'code': '',
'filesystem': 'EMPTY',
'os': '',
'size': 0,
'used_size': 0,
'virt-drive': ''}
data['partition_setup'].append(part_json)
with open(self.OG_PARTITIONS_CFG_PATH, 'w+') as f:
f.write(json.dumps(data, indent=4))
except:
with open(self.OG_PARTITIONS_CFG_PATH, 'r') as f:
data = json.load(f)
data = self.partitions_cfg_to_json(data)
return data
def setup(self, request, ogRest):
self.poweroff_guest()
self.refresh(ogRest)
part_setup = request.getPartitionSetup()
disk = request.getDisk()
for i, part in enumerate(part_setup):
if int(part['format']) == 0:
continue
drive_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{part["partition"]}.qcow2'
g = guestfs.GuestFS(python_return_dict=True)
g.disk_create(drive_path, "qcow2", int(part['size']) * 1024)
g.add_drive_opts(drive_path, format="qcow2", readonly=0)
g.launch()
devices = g.list_devices()
assert(len(devices) == 1)
g.part_disk(devices[0], "gpt")
partitions = g.list_partitions()
assert(len(partitions) == 1)
g.mkfs(part["filesystem"].lower(), partitions[0])
g.close()
with open(self.OG_PARTITIONS_CFG_PATH, 'r+') as f:
data = json.loads(f.read())
if part['code'] == 'LINUX':
data['partition_setup'][i]['code'] = '0083'
elif part['code'] == 'NTFS':
data['partition_setup'][i]['code'] = '0007'
elif part['code'] == 'DATA':
data['partition_setup'][i]['code'] = '00da'
data['partition_setup'][i]['filesystem'] = part['filesystem']
data['partition_setup'][i]['size'] = int(part['size'])
data['partition_setup'][i]['virt-drive'] = drive_path
f.seek(0)
f.write(json.dumps(data, indent=4))
f.truncate()
return self.refresh(ogRest)
def image_create(self, path, request, ogRest):
disk = request.getDisk()
partition = request.getPartition()
name = request.getName()
repo = request.getRepo()
samba_config = ogRest.samba_config
self.poweroff_guest()
self.refresh(ogRest)
drive_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{partition}.qcow2'
cmd = f'mount -t cifs //{repo}/ogimages {self.OG_IMAGES_PATH} -o ' \
f'rw,nolock,serverino,acl,' \
f'username={samba_config["user"]},' \
f'password={samba_config["pass"]}'
subprocess.run([cmd], shell=True)
try:
shutil.copy(drive_path, f'{self.OG_IMAGES_PATH}/{name}')
except:
return None
subprocess.run([f'umount {self.OG_IMAGES_PATH}'], shell=True, check=True)
return True
def image_restore(self, request, ogRest):
disk = request.getDisk()
partition = request.getPartition()
name = request.getName()
repo = request.getRepo()
# TODO Multicast? Unicast? Solo copia con samba.
ctype = request.getType()
profile = request.getProfile()
cid = request.getId()
samba_config = ogRest.samba_config
self.poweroff_guest()
self.refresh(ogRest)
drive_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{partition}.qcow2'
if os.path.exists(drive_path):
os.remove(drive_path)
cmd = f'mount -t cifs //{repo}/ogimages {self.OG_IMAGES_PATH} -o ' \
f'ro,nolock,serverino,acl,' \
f'username={samba_config["user"]},' \
f'password={samba_config["pass"]}'
subprocess.run([cmd], shell=True, check=True)
try:
copy_src = f'{self.OG_IMAGES_PATH}/{name}'
shutil.copy(copy_src, drive_path)
except Exception as e:
raise OgError(f'Error trying to copy {copy_src} into {drive_path}: {e}') from e
subprocess.run([f'umount {self.OG_IMAGES_PATH}'], shell=True)
self.refresh(ogRest)
json_dict = {
'disk': request.getDisk(),
'partition': request.getPartition(),
'image_id': request.getId(),
'cache': [],
}
return json_dict
def cache_delete(self, request, ogRest):
raise NotImplementedError
def cache_fetch(self, request, ogRest):
raise NotImplementedError
def software(self, request, path, ogRest):
DPKG_PATH = '/var/lib/dpkg/status'
disk = request.getDisk()
partition = request.getPartition()
drive_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{partition}.qcow2'
g = guestfs.GuestFS(python_return_dict=True)
g.add_drive_opts(drive_path, readonly=1)
g.launch()
root = g.inspect_os()[0]
os_type = g.inspect_get_type(root)
os_major_version = g.inspect_get_major_version(root)
os_minor_version = g.inspect_get_minor_version(root)
os_distro = g.inspect_get_distro(root)
software = []
if 'linux' in os_type:
g.mount_ro(g.list_partitions()[0], '/')
try:
g.download('/' + DPKG_PATH, 'dpkg_list')
except:
pass
g.umount_all()
if os.path.isfile('dpkg_list'):
pkg_pattern = re.compile('Package: (.+)')
version_pattern = re.compile('Version: (.+)')
with open('dpkg_list', 'r') as f:
for line in f:
pkg_match = pkg_pattern.match(line)
version_match = version_pattern.match(line)
if pkg_match:
pkg = pkg_match.group(1)
elif version_match:
version = version_match.group(1)
elif line == '\n':
software.append(pkg + ' ' + version)
else:
continue
os.remove('dpkg_list')
elif 'windows' in os_type:
g.mount_ro(g.list_partitions()[0], '/')
hive_file_path = g.inspect_get_windows_software_hive(root)
g.download('/' + hive_file_path, 'win_reg')
g.umount_all()
h = hivex.Hivex('win_reg')
key = h.root()
key = h.node_get_child (key, 'Microsoft')
key = h.node_get_child (key, 'Windows')
key = h.node_get_child (key, 'CurrentVersion')
key = h.node_get_child (key, 'Uninstall')
software += [h.node_name(x) for x in h.node_children(key)]
# Just for 64 bit Windows versions, check for 32 bit software.
if (os_major_version == 5 and os_minor_version >= 2) or \
(os_major_version >= 6):
key = h.root()
key = h.node_get_child (key, 'Wow6432Node')
key = h.node_get_child (key, 'Microsoft')
key = h.node_get_child (key, 'Windows')
key = h.node_get_child (key, 'CurrentVersion')
key = h.node_get_child (key, 'Uninstall')
software += [h.node_name(x) for x in h.node_children(key)]
os.remove('win_reg')
return '\n'.join(software)
def parse_pci(self, path='/usr/share/misc/pci.ids'):
data = {}
with open(path, 'r') as f:
for line in f:
if line[0] == '#':
continue
elif len(line.strip()) == 0:
continue
else:
if line[:2] == '\t\t':
fields = line.strip().split(maxsplit=2)
data[last_vendor][last_device][(fields[0], fields[1])] = fields[2]
elif line[:1] == '\t':
fields = line.strip().split(maxsplit=1)
last_device = fields[0]
data[last_vendor][fields[0]] = {'name': fields[1]}
else:
fields = line.strip().split(maxsplit=1)
if fields[0] == 'ffff':
break
last_vendor = fields[0]
data[fields[0]] = {'name': fields[1]}
return data
def hardware(self, path, ogRest):
try:
with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
pci_data = qmp.talk(str({"execute": "query-pci"}))
mem_data = qmp.talk(str({"execute": "query-memory-size-summary"}))
cpu_data = qmp.talk(str({"execute": "query-cpus-fast"}))
except:
return
pci_data = pci_data['return'][0]['devices']
pci_list = self.parse_pci()
device_names = {}
for device in pci_data:
vendor_id = hex(device['id']['vendor'])[2:]
device_id = hex(device['id']['device'])[2:]
subvendor_id = hex(device['id']['subsystem-vendor'])[2:]
subdevice_id = hex(device['id']['subsystem'])[2:]
description = device['class_info']['desc'].lower()
name = ''
try:
name = pci_list[vendor_id]['name']
name += ' ' + pci_list[vendor_id][device_id]['name']
name += ' ' + pci_list[vendor_id][device_id][(subvendor_id, subdevice_id)]
except KeyError:
if vendor_id == '1234':
name = 'VGA Cirrus Logic GD 5446'
else:
pass
if 'usb' in description:
device_names['usb'] = name
elif 'ide' in description:
device_names['ide'] = name
elif 'ethernet' in description:
device_names['net'] = name
elif 'vga' in description:
device_names['vga'] = name
elif 'audio' in description or 'sound' in description:
device_names['aud'] = name
elif 'dvd' in description:
device_names['cdr'] = name
ram_size = int(mem_data['return']['base-memory']) * 2 ** -20
device_names['mem'] = f'QEMU {int(ram_size)}MiB'
cpu_arch = cpu_data['return'][0]['arch']
cpu_target = cpu_data['return'][0]['target']
cpu_cores = len(cpu_data['return'])
device_names['cpu'] = f'CPU arch:{cpu_arch} target:{cpu_target} ' \
f'cores:{cpu_cores}'
with open(path, 'w+') as f:
f.seek(0)
for k, v in device_names.items():
f.write(f'{k}={v}\n')
f.truncate()
def probe(self, ogRest):
return {'status': 'VDI' if ogRest.state != ThreadState.BUSY else 'BSY'}