source: ogClient-Git/src/virtual/ogOperations.py

Last change on this file was 0c00f64, checked in by OpenGnSys Support Team <soporte-og@…>, 4 years ago

#1059 virtual: replace qmp polling for event listening

Polling for a qmp port availability is undesirable, as QEMU only handles
one connection to the qmp port at a time, ogClient may interfere with
cloneer-manager.

Check vm thread now connects to a separate qmp tcp socket, listening for
a shutdown guest event.

When ogClient is run just after ogVDI installation (before guest
installation) it will try to connect until it's possible, ie: after an
iso is specified and a qemu vm is started that exposes the appropiate
qmp tcp port.

  • Property mode set to 100644
File size: 22.1 KB
Line 
1#
2# Copyright (C) 2020-2021 Soleta Networks <info@soleta.eu>
3#
4# This program is free software: you can redistribute it and/or modify it under
5# the terms of the GNU Affero General Public License as published by the
6# Free Software Foundation; either version 3 of the License, or
7# (at your option) any later version.
8
9from src.ogRest import ThreadState
10from src.virtual import poweroffd
11import socket
12import errno
13import select
14import json
15import subprocess
16import shutil
17import os
18import guestfs
19import hivex
20import pathlib
21import re
22import math
23import sys
24import enum
25import time
26
27class OgVM:
28    DEFAULT_CPU = 'host'
29    DEFAULT_VGA = 'virtio-vga'
30    DEFAULT_QMP_IP = 'localhost'
31    DEFAULT_QMP_PORT = 4444
32
33    class State(enum.Enum):
34        STOPPED = 0
35        RUNNING = 1
36
37    def __init__(self,
38                 partition_path,
39                 memory=None,
40                 cpu=DEFAULT_CPU,
41                 vga=DEFAULT_VGA,
42                 qmp_ip=DEFAULT_QMP_IP,
43                 qmp_port=DEFAULT_QMP_PORT,
44                 vnc_params=None):
45        self.partition_path = partition_path
46        self.cpu = cpu
47        self.vga = vga
48        self.qmp_ip = qmp_ip
49        self.qmp_port = qmp_port
50        self.proc = None
51        self.vnc_params = vnc_params
52
53        if memory:
54            self.mem = memory
55        else:
56            available_ram = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
57            available_ram = available_ram / 2 ** 20
58            # Calculate the lower power of 2 amout of RAM memory for the VM.
59            self.mem = 2 ** math.floor(math.log(available_ram) / math.log(2))
60
61
62    def run_vm(self):
63        if self.vnc_params:
64            vnc_str = f'-vnc 0.0.0.0:0,password'
65        else:
66            vnc_str = ''
67
68        cmd = (f'qemu-system-x86_64 -accel kvm -cpu {self.cpu} -smp 4 '
69               f'-drive file={self.partition_path},if=virtio '
70               f'-qmp tcp:localhost:4444,server,nowait '
71               f'-device {self.vga} -display gtk '
72               f'-m {self.mem}M -boot c -full-screen {vnc_str}')
73        self.proc = subprocess.Popen([cmd], shell=True)
74
75        if self.vnc_params:
76            # Wait for QMP to be available.
77            time.sleep(20)
78            cmd = { "execute": "change",
79                    "arguments": { "device": "vnc",
80                                   "target": "password",
81                                   "arg": str(self.vnc_params['pass']) } }
82            with OgQMP(self.qmp_ip, self.qmp_port) as qmp:
83                qmp.talk(str(cmd))
84
85class OgQMP:
86    QMP_TIMEOUT = 5
87    QMP_POWEROFF_TIMEOUT = 300
88
89    def __init__(self, ip, port):
90        self.ip = ip
91        self.port = port
92
93    def connect(self):
94        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
95        self.sock.setblocking(0)
96        try:
97            self.sock.connect((self.ip, self.port))
98        except socket.error as err:
99            if err.errno == errno.ECONNREFUSED:
100                raise Exception('cannot connect to qemu')
101            elif err.errno == errno.EINPROGRESS:
102                pass
103
104        readset = [ self.sock ]
105        readable, writable, exception = select.select(readset,
106                                                      [],
107                                                      [],
108                                                      OgQMP.QMP_TIMEOUT)
109
110        if self.sock in readable:
111            try:
112                out = self.recv()
113            except:
114                pass
115
116        if 'QMP' not in out:
117            raise Exception('cannot handshake qemu')
118
119        out = self.talk(str({"execute": "qmp_capabilities"}))
120        if 'return' not in out:
121            raise Exception('cannot handshake qemu')
122
123    def disconnect(self):
124        try:
125            self.sock.close()
126        except:
127            pass
128
129    def __enter__(self):
130        self.connect()
131        return self
132
133    def __exit__(self, exc_type, exc_val, exc_tb):
134        self.disconnect()
135
136    def talk(self, data, timeout=QMP_TIMEOUT):
137        writeset = [ self.sock ]
138        readable, writable, exception = select.select([],
139                                                      writeset,
140                                                      [],
141                                                      timeout)
142        if self.sock in writable:
143            try:
144                self.sock.send(bytes(data, 'utf-8'))
145            except:
146                raise Exception('cannot talk to qemu')
147        else:
148            raise Exception('timeout when talking to qemu')
149
150        return self.recv(timeout=timeout)
151
152    def recv(self, timeout=QMP_TIMEOUT):
153        readset = [self.sock]
154        readable, _, _ = select.select(readset, [], [], timeout)
155
156        if self.sock in readable:
157            try:
158                out = self.sock.recv(4096).decode('utf-8')
159                out = json.loads(out)
160            except socket.error as err:
161                raise Exception('cannot talk to qemu')
162        else:
163            raise Exception('timeout when talking to qemu')
164        return out
165
166class OgVirtualOperations:
167    def __init__(self):
168        self.IP = '127.0.0.1'
169        self.VIRTUAL_PORT = 4444
170        self.USABLE_DISK = 0.75
171        self.OG_PATH = os.path.dirname(os.path.realpath(sys.argv[0]))
172        self.OG_IMAGES_PATH = f'{self.OG_PATH}/images'
173        self.OG_PARTITIONS_PATH = f'{self.OG_PATH}/partitions'
174        self.OG_PARTITIONS_CFG_PATH = f'{self.OG_PATH}/partitions.json'
175
176        if not os.path.exists(self.OG_IMAGES_PATH):
177            os.mkdir(self.OG_IMAGES_PATH, mode=0o755)
178        if not os.path.exists(self.OG_PARTITIONS_PATH):
179            os.mkdir(self.OG_PARTITIONS_PATH, mode=0o755)
180
181    def poweroff_guest(self):
182        try:
183            with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
184                qmp.talk(str({"execute": "system_powerdown"}))
185                out = qmp.recv()
186                assert(out['event'] == 'POWERDOWN')
187                out = qmp.recv(timeout=OgQMP.QMP_POWEROFF_TIMEOUT)
188                assert(out['event'] == 'SHUTDOWN')
189        except:
190            return
191
192    def poweroff_host(self):
193        subprocess.run(['/sbin/poweroff'])
194
195    def poweroff(self):
196        self.poweroff_guest()
197        self.poweroff_host()
198
199    def reboot(self):
200        try:
201            with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
202                qmp.talk(str({"execute": "system_reset"}))
203        except:
204            pass
205
206    def check_vm_state(self):
207        try:
208            with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
209                pass
210            return OgVM.State.RUNNING
211        except:
212            return OgVM.State.STOPPED
213
214    def get_installed_os(self):
215        installed_os = {}
216        try:
217            with open(self.OG_PARTITIONS_CFG_PATH, 'r') as f:
218                cfg = json.loads(f.read())
219            for part in cfg['partition_setup']:
220                if len(part['os']) > 0:
221                    installed_os[part['os']] = (part['disk'], part['partition'])
222        except:
223            pass
224        return installed_os
225
226    def check_vm_state_loop(self, ogRest):
227        # If we can't connect, wait until it's possible.
228        while True:
229            try:
230                with socket.create_connection((poweroffd.QMP_DEFAULT_HOST,
231                                               poweroffd.QMP_DEFAULT_PORT)):
232                    break
233            except ConnectionRefusedError:
234                time.sleep(1)
235
236        qmpconn = poweroffd.init()
237        if poweroffd.run(qmpconn) < 0:
238            return
239        self.poweroff_host()
240
241    def shellrun(self, request, ogRest):
242        return
243
244    def session(self, request, ogRest):
245        disk = request.getDisk()
246        partition = request.getPartition()
247
248        part_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{partition}.qcow2'
249        if ogRest.CONFIG['vnc']['activate']:
250            qemu = OgVM(part_path, vnc_params=ogRest.CONFIG['vnc'])
251        else:
252            qemu = OgVM(part_path)
253        qemu.run_vm()
254
255    def partitions_cfg_to_json(self, data):
256        for part in data['partition_setup']:
257            part.pop('virt-drive')
258            for k, v in part.items():
259                part[k] = str(v)
260        for disk in data['disk_setup']:
261            for k, v in disk.items():
262                disk[k] = str(v)
263        return data
264
265    def refresh(self, ogRest):
266        try:
267            # Return last partitions setup in case VM is running.
268            with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
269                pass
270            with open(self.OG_PARTITIONS_CFG_PATH, 'r') as f:
271                data = json.loads(f.read())
272            data = self.partitions_cfg_to_json(data)
273            return data
274        except:
275            pass
276
277        try:
278            with open(self.OG_PARTITIONS_CFG_PATH, 'r+') as f:
279                data = json.loads(f.read())
280                for part in data['partition_setup']:
281                    if len(part['virt-drive']) > 0:
282                        if not os.path.exists(part['virt-drive']):
283                            part['code'] = '',
284                            part['filesystem'] = 'EMPTY'
285                            part['os'] = ''
286                            part['size'] = 0
287                            part['used_size'] = 0
288                            part['virt-drive'] = ''
289                            continue
290                        g = guestfs.GuestFS(python_return_dict=True)
291                        g.add_drive_opts(part['virt-drive'],
292                                         format="qcow2",
293                                         readonly=1)
294                        g.launch()
295                        devices = g.list_devices()
296                        assert(len(devices) == 1)
297                        partitions = g.list_partitions()
298                        assert(len(partitions) == 1)
299                        filesystems_dict = g.list_filesystems()
300                        assert(len(filesystems_dict) == 1)
301                        g.mount(partitions[0], '/')
302                        used_disk = g.du('/')
303                        g.umount_all()
304                        total_size = g.disk_virtual_size(part['virt-drive']) / 1024
305
306                        part['used_size'] = int(100 * used_disk / total_size)
307                        part['size'] = total_size
308                        root = g.inspect_os()
309                        if len(root) == 1:
310                            part['os'] = f'{g.inspect_get_distro(root[0])} ' \
311                                         f'{g.inspect_get_product_name(root[0])}'
312                        else:
313                            part['os'] = ''
314                        filesystem = [fs for fs in filesystems_dict.values()][0]
315                        part['filesystem'] = filesystem.upper()
316                        if filesystem == 'ext4':
317                            part['code'] = '0083'
318                        elif filesystem == 'ntfs':
319                            part['code'] = '0007'
320                        g.close()
321                f.seek(0)
322                f.write(json.dumps(data, indent=4))
323                f.truncate()
324        except FileNotFoundError:
325            total_disk, used_disk, free_disk = shutil.disk_usage("/")
326            free_disk = int(free_disk * self.USABLE_DISK)
327            data = {'serial_number': '',
328                    'disk_setup': [{'disk': 1,
329                                   'partition': 0,
330                                   'code': '0',
331                                   'filesystem': '',
332                                   'os': '',
333                                   'size': int(free_disk / 1024),
334                                   'used_size': int(100 * used_disk / total_disk)}],
335                    'partition_setup': []}
336            for i in range(4):
337                part_json = {'disk': 1,
338                             'partition': i + 1,
339                             'code': '',
340                             'filesystem': 'EMPTY',
341                             'os': '',
342                             'size': 0,
343                             'used_size': 0,
344                             'virt-drive': ''}
345                data['partition_setup'].append(part_json)
346            with open(self.OG_PARTITIONS_CFG_PATH, 'w+') as f:
347                f.write(json.dumps(data, indent=4))
348        except:
349            with open(self.OG_PARTITIONS_CFG_PATH, 'r') as f:
350                data = json.load(f)
351
352        data = self.partitions_cfg_to_json(data)
353
354        return data
355
356    def setup(self, request, ogRest):
357        self.poweroff_guest()
358        self.refresh(ogRest)
359
360        part_setup = request.getPartitionSetup()
361        disk = request.getDisk()
362
363        for i, part in enumerate(part_setup):
364            if int(part['format']) == 0:
365                continue
366
367            drive_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{part["partition"]}.qcow2'
368            g = guestfs.GuestFS(python_return_dict=True)
369            g.disk_create(drive_path, "qcow2", int(part['size']) * 1024)
370            g.add_drive_opts(drive_path, format="qcow2", readonly=0)
371            g.launch()
372            devices = g.list_devices()
373            assert(len(devices) == 1)
374            g.part_disk(devices[0], "gpt")
375            partitions = g.list_partitions()
376            assert(len(partitions) == 1)
377            g.mkfs(part["filesystem"].lower(), partitions[0])
378            g.close()
379
380            with open(self.OG_PARTITIONS_CFG_PATH, 'r+') as f:
381                data = json.loads(f.read())
382                if part['code'] == 'LINUX':
383                    data['partition_setup'][i]['code'] = '0083'
384                elif part['code'] == 'NTFS':
385                    data['partition_setup'][i]['code'] = '0007'
386                elif part['code'] == 'DATA':
387                    data['partition_setup'][i]['code'] = '00da'
388                data['partition_setup'][i]['filesystem'] = part['filesystem']
389                data['partition_setup'][i]['size'] = int(part['size'])
390                data['partition_setup'][i]['virt-drive'] = drive_path
391                f.seek(0)
392                f.write(json.dumps(data, indent=4))
393                f.truncate()
394
395        return self.refresh(ogRest)
396
397    def image_create(self, path, request, ogRest):
398        disk = request.getDisk()
399        partition = request.getPartition()
400        name = request.getName()
401        repo = request.getRepo()
402        samba_config = ogRest.samba_config
403
404        self.poweroff_guest()
405
406        self.refresh(ogRest)
407
408        drive_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{partition}.qcow2'
409
410        cmd = f'mount -t cifs //{repo}/ogimages {self.OG_IMAGES_PATH} -o ' \
411              f'rw,nolock,serverino,acl,' \
412              f'username={samba_config["user"]},' \
413              f'password={samba_config["pass"]}'
414        subprocess.run([cmd], shell=True)
415
416        try:
417            shutil.copy(drive_path, f'{self.OG_IMAGES_PATH}/{name}')
418        except:
419            return None
420
421        subprocess.run([f'umount {self.OG_IMAGES_PATH}'], shell=True)
422
423        return True
424
425    def image_restore(self, request, ogRest):
426        disk = request.getDisk()
427        partition = request.getPartition()
428        name = request.getName()
429        repo = request.getRepo()
430        # TODO Multicast? Unicast? Solo copia con samba.
431        ctype = request.getType()
432        profile = request.getProfile()
433        cid = request.getId()
434        samba_config = ogRest.samba_config
435
436        self.poweroff_guest()
437        self.refresh(ogRest)
438
439        drive_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{partition}.qcow2'
440
441        if os.path.exists(drive_path):
442            os.remove(drive_path)
443
444        cmd = f'mount -t cifs //{repo}/ogimages {self.OG_IMAGES_PATH} -o ' \
445              f'ro,nolock,serverino,acl,' \
446              f'username={samba_config["user"]},' \
447              f'password={samba_config["pass"]}'
448        subprocess.run([cmd], shell=True)
449
450        try:
451            shutil.copy(f'{self.OG_IMAGES_PATH}/{name}', drive_path)
452        except:
453            return None
454
455        subprocess.run([f'umount {self.OG_IMAGES_PATH}'], shell=True)
456        self.refresh(ogRest)
457
458        return True
459
460    def software(self, request, path, ogRest):
461        DPKG_PATH = '/var/lib/dpkg/status'
462
463        disk = request.getDisk()
464        partition = request.getPartition()
465        drive_path = f'{self.OG_PARTITIONS_PATH}/disk{disk}_part{partition}.qcow2'
466        g = guestfs.GuestFS(python_return_dict=True)
467        g.add_drive_opts(drive_path, readonly=1)
468        g.launch()
469        root = g.inspect_os()[0]
470
471        os_type = g.inspect_get_type(root)
472        os_major_version = g.inspect_get_major_version(root)
473        os_minor_version = g.inspect_get_minor_version(root)
474        os_distro = g.inspect_get_distro(root)
475
476        software = []
477
478        if 'linux' in os_type:
479            g.mount_ro(g.list_partitions()[0], '/')
480            try:
481                g.download('/' + DPKG_PATH, 'dpkg_list')
482            except:
483                pass
484            g.umount_all()
485            if os.path.isfile('dpkg_list'):
486                pkg_pattern = re.compile('Package: (.+)')
487                version_pattern = re.compile('Version: (.+)')
488                with open('dpkg_list', 'r') as f:
489                    for line in f:
490                        pkg_match = pkg_pattern.match(line)
491                        version_match = version_pattern.match(line)
492                        if pkg_match:
493                            pkg = pkg_match.group(1)
494                        elif version_match:
495                            version = version_match.group(1)
496                        elif line == '\n':
497                            software.append(pkg + ' ' + version)
498                        else:
499                            continue
500                os.remove('dpkg_list')
501        elif 'windows' in os_type:
502            g.mount_ro(g.list_partitions()[0], '/')
503            hive_file_path = g.inspect_get_windows_software_hive(root)
504            g.download('/' + hive_file_path, 'win_reg')
505            g.umount_all()
506            h = hivex.Hivex('win_reg')
507            key = h.root()
508            key = h.node_get_child (key, 'Microsoft')
509            key = h.node_get_child (key, 'Windows')
510            key = h.node_get_child (key, 'CurrentVersion')
511            key = h.node_get_child (key, 'Uninstall')
512            software += [h.node_name(x) for x in h.node_children(key)]
513            # Just for 64 bit Windows versions, check for 32 bit software.
514            if (os_major_version == 5 and os_minor_version >= 2) or \
515               (os_major_version >= 6):
516                key = h.root()
517                key = h.node_get_child (key, 'Wow6432Node')
518                key = h.node_get_child (key, 'Microsoft')
519                key = h.node_get_child (key, 'Windows')
520                key = h.node_get_child (key, 'CurrentVersion')
521                key = h.node_get_child (key, 'Uninstall')
522                software += [h.node_name(x) for x in h.node_children(key)]
523            os.remove('win_reg')
524
525        return '\n'.join(software)
526
527    def parse_pci(self, path='/usr/share/misc/pci.ids'):
528        data = {}
529        with open(path, 'r') as f:
530            for line in f:
531                if line[0] == '#':
532                    continue
533                elif len(line.strip()) == 0:
534                    continue
535                else:
536                    if line[:2] == '\t\t':
537                        fields = line.strip().split(maxsplit=2)
538                        data[last_vendor][last_device][(fields[0], fields[1])] = fields[2]
539                    elif line[:1] == '\t':
540                        fields = line.strip().split(maxsplit=1)
541                        last_device = fields[0]
542                        data[last_vendor][fields[0]] = {'name': fields[1]}
543                    else:
544                        fields = line.strip().split(maxsplit=1)
545                        if fields[0] == 'ffff':
546                            break
547                        last_vendor = fields[0]
548                        data[fields[0]] = {'name': fields[1]}
549        return data
550
551    def hardware(self, path, ogRest):
552        try:
553            with OgQMP(self.IP, self.VIRTUAL_PORT) as qmp:
554                pci_data = qmp.talk(str({"execute": "query-pci"}))
555                mem_data = qmp.talk(str({"execute": "query-memory-size-summary"}))
556                cpu_data = qmp.talk(str({"execute": "query-cpus-fast"}))
557        except:
558            return
559
560        pci_data = pci_data['return'][0]['devices']
561        pci_list = self.parse_pci()
562        device_names = {}
563        for device in pci_data:
564            vendor_id = hex(device['id']['vendor'])[2:]
565            device_id = hex(device['id']['device'])[2:]
566            subvendor_id = hex(device['id']['subsystem-vendor'])[2:]
567            subdevice_id = hex(device['id']['subsystem'])[2:]
568            description = device['class_info']['desc'].lower()
569            name = ''
570            try:
571                name = pci_list[vendor_id]['name']
572                name += ' ' + pci_list[vendor_id][device_id]['name']
573                name += ' ' + pci_list[vendor_id][device_id][(subvendor_id, subdevice_id)]
574            except KeyError:
575                if vendor_id == '1234':
576                    name = 'VGA Cirrus Logic GD 5446'
577                else:
578                    pass
579
580            if 'usb' in description:
581                device_names['usb'] = name
582            elif 'ide' in description:
583                device_names['ide'] = name
584            elif 'ethernet' in description:
585                device_names['net'] = name
586            elif 'vga' in description:
587                device_names['vga'] = name
588
589            elif 'audio' in description or 'sound' in description:
590                device_names['aud'] = name
591            elif 'dvd' in description:
592                device_names['cdr'] = name
593
594        ram_size = int(mem_data['return']['base-memory']) * 2 ** -20
595        device_names['mem'] = f'QEMU {int(ram_size)}MiB'
596
597        cpu_arch = cpu_data['return'][0]['arch']
598        cpu_target = cpu_data['return'][0]['target']
599        cpu_cores = len(cpu_data['return'])
600        device_names['cpu'] = f'CPU arch:{cpu_arch} target:{cpu_target} ' \
601                              f'cores:{cpu_cores}'
602
603        with open(path, 'w+') as f:
604            f.seek(0)
605            for k, v in device_names.items():
606                f.write(f'{k}={v}\n')
607            f.truncate()
608
609    def probe(self, ogRest):
610        return {'status': 'VDI' if ogRest.state != ThreadState.BUSY else 'BSY'}
Note: See TracBrowser for help on using the repository browser.