source: ogClient-Git/src/virtual/qmp.py

Last change on this file was 0c00f64, checked in by OpenGnSys Support Team <soporte-og@…>, 4 years ago

#1059 virtual: replace qmp polling for event listening

Polling for a qmp port availability is undesirable, as QEMU only handles
one connection to the qmp port at a time, ogClient may interfere with
cloneer-manager.

Check vm thread now connects to a separate qmp tcp socket, listening for
a shutdown guest event.

When ogClient is run just after ogVDI installation (before guest
installation) it will try to connect until it's possible, ie: after an
iso is specified and a qemu vm is started that exposes the appropiate
qmp tcp port.

  • Property mode set to 100644
File size: 8.1 KB
Line 
1# QEMU Monitor Protocol Python class
2#
3# Copyright (C) 2009, 2010 Red Hat Inc.
4#
5# Authors:
6#  Luiz Capitulino <lcapitulino@redhat.com>
7#
8# This work is licensed under the terms of the GNU GPL, version 2.  See
9# the COPYING file in the top-level directory.
10
11import json
12import errno
13import socket
14import logging
15
16
17class QMPError(Exception):
18    pass
19
20
21class QMPConnectError(QMPError):
22    pass
23
24
25class QMPCapabilitiesError(QMPError):
26    pass
27
28
29class QMPTimeoutError(QMPError):
30    pass
31
32
33class QEMUMonitorProtocol(object):
34
35    #: Logger object for debugging messages
36    logger = logging.getLogger('QMP')
37    #: Socket's error class
38    error = socket.error
39    #: Socket's timeout
40    timeout = socket.timeout
41
42    def __init__(self, address, server=False):
43        """
44        Create a QEMUMonitorProtocol class.
45
46        @param address: QEMU address, can be either a unix socket path (string)
47                        or a tuple in the form ( address, port ) for a TCP
48                        connection
49        @param server: server mode listens on the socket (bool)
50        @raise socket.error on socket connection errors
51        @note No connection is established, this is done by the connect() or
52              accept() methods
53        """
54        self.__events = []
55        self.__address = address
56        self.__sock = self.__get_sock()
57        self.__sockfile = None
58        if server:
59            self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
60            self.__sock.bind(self.__address)
61            self.__sock.listen(1)
62
63    def __get_sock(self):
64        if isinstance(self.__address, tuple):
65            family = socket.AF_INET
66        else:
67            family = socket.AF_UNIX
68        return socket.socket(family, socket.SOCK_STREAM)
69
70    def __negotiate_capabilities(self):
71        greeting = self.__json_read()
72        if greeting is None or "QMP" not in greeting:
73            raise QMPConnectError
74        # Greeting seems ok, negotiate capabilities
75        resp = self.cmd('qmp_capabilities')
76        if "return" in resp:
77            return greeting
78        raise QMPCapabilitiesError
79
80    def __json_read(self, only_event=False):
81        while True:
82            data = self.__sockfile.readline()
83            if not data:
84                return
85            resp = json.loads(data)
86            if 'event' in resp:
87                self.logger.debug("<<< %s", resp)
88                self.__events.append(resp)
89                if not only_event:
90                    continue
91            return resp
92
93    def __get_events(self, wait=False):
94        """
95        Check for new events in the stream and cache them in __events.
96
97        @param wait (bool): block until an event is available.
98        @param wait (float): If wait is a float, treat it as a timeout value.
99
100        @raise QMPTimeoutError: If a timeout float is provided and the timeout
101                                period elapses.
102        @raise QMPConnectError: If wait is True but no events could be
103                                retrieved or if some other error occurred.
104        """
105
106        # Check for new events regardless and pull them into the cache:
107        self.__sock.setblocking(0)
108        try:
109            self.__json_read()
110        except socket.error as err:
111            if err[0] == errno.EAGAIN:
112                # No data available
113                pass
114        self.__sock.setblocking(1)
115
116        # Wait for new events, if needed.
117        # if wait is 0.0, this means "no wait" and is also implicitly false.
118        if not self.__events and wait:
119            if isinstance(wait, float):
120                self.__sock.settimeout(wait)
121            try:
122                ret = self.__json_read(only_event=True)
123            except socket.timeout:
124                raise QMPTimeoutError("Timeout waiting for event")
125            except:
126                raise QMPConnectError("Error while reading from socket")
127            if ret is None:
128                raise QMPConnectError("Error while reading from socket")
129            self.__sock.settimeout(None)
130
131    def connect(self, negotiate=True):
132        """
133        Connect to the QMP Monitor and perform capabilities negotiation.
134
135        @return QMP greeting dict
136        @raise socket.error on socket connection errors
137        @raise QMPConnectError if the greeting is not received
138        @raise QMPCapabilitiesError if fails to negotiate capabilities
139        """
140        self.__sock.connect(self.__address)
141        self.__sockfile = self.__sock.makefile()
142        if negotiate:
143            return self.__negotiate_capabilities()
144
145    def accept(self):
146        """
147        Await connection from QMP Monitor and perform capabilities negotiation.
148
149        @return QMP greeting dict
150        @raise socket.error on socket connection errors
151        @raise QMPConnectError if the greeting is not received
152        @raise QMPCapabilitiesError if fails to negotiate capabilities
153        """
154        self.__sock.settimeout(15)
155        self.__sock, _ = self.__sock.accept()
156        self.__sockfile = self.__sock.makefile()
157        return self.__negotiate_capabilities()
158
159    def cmd_obj(self, qmp_cmd):
160        """
161        Send a QMP command to the QMP Monitor.
162
163        @param qmp_cmd: QMP command to be sent as a Python dict
164        @return QMP response as a Python dict or None if the connection has
165                been closed
166        """
167        self.logger.debug(">>> %s", qmp_cmd)
168        try:
169            self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8'))
170        except socket.error as err:
171            if err[0] == errno.EPIPE:
172                return
173            raise socket.error(err)
174        resp = self.__json_read()
175        self.logger.debug("<<< %s", resp)
176        return resp
177
178    def cmd(self, name, args=None, cmd_id=None):
179        """
180        Build a QMP command and send it to the QMP Monitor.
181
182        @param name: command name (string)
183        @param args: command arguments (dict)
184        @param cmd_id: command id (dict, list, string or int)
185        """
186        qmp_cmd = {'execute': name}
187        if args:
188            qmp_cmd['arguments'] = args
189        if cmd_id:
190            qmp_cmd['id'] = cmd_id
191        return self.cmd_obj(qmp_cmd)
192
193    def command(self, cmd, **kwds):
194        """
195        Build and send a QMP command to the monitor, report errors if any
196        """
197        ret = self.cmd(cmd, kwds)
198        if "error" in ret:
199            raise Exception(ret['error']['desc'])
200        return ret['return']
201
202    def pull_event(self, wait=False):
203        """
204        Pulls a single event.
205
206        @param wait (bool): block until an event is available.
207        @param wait (float): If wait is a float, treat it as a timeout value.
208
209        @raise QMPTimeoutError: If a timeout float is provided and the timeout
210                                period elapses.
211        @raise QMPConnectError: If wait is True but no events could be
212                                retrieved or if some other error occurred.
213
214        @return The first available QMP event, or None.
215        """
216        self.__get_events(wait)
217
218        if self.__events:
219            return self.__events.pop(0)
220        return None
221
222    def get_events(self, wait=False):
223        """
224        Get a list of available QMP events.
225
226        @param wait (bool): block until an event is available.
227        @param wait (float): If wait is a float, treat it as a timeout value.
228
229        @raise QMPTimeoutError: If a timeout float is provided and the timeout
230                                period elapses.
231        @raise QMPConnectError: If wait is True but no events could be
232                                retrieved or if some other error occurred.
233
234        @return The list of available QMP events.
235        """
236        self.__get_events(wait)
237        return self.__events
238
239    def clear_events(self):
240        """
241        Clear current list of pending events.
242        """
243        self.__events = []
244
245    def close(self):
246        self.__sock.close()
247        self.__sockfile.close()
248
249    def settimeout(self, timeout):
250        self.__sock.settimeout(timeout)
251
252    def get_sock_fd(self):
253        return self.__sock.fileno()
254
255    def is_scm_available(self):
256        return self.__sock.family == socket.AF_UNIX
Note: See TracBrowser for help on using the repository browser.