refs #500 #501 #502 implement job manager

ogagent-jobs
Natalia Serrano 2024-07-30 13:14:50 +02:00
parent 360d0f8fb8
commit 8c6a6523d8
9 changed files with 125 additions and 97 deletions

View File

@ -43,7 +43,7 @@ from opengnsys import VERSION, ipc, operations, utils
from opengnsys.config import readConfig
from opengnsys.loader import loadModules
from opengnsys.log import logger
from opengnsys.scriptThread import ScriptExecutorThread
from opengnsys.jobmgr import JobMgr
from opengnsys.service import IPC_PORT
trayIcon = None
@ -117,6 +117,10 @@ class MessagesProcessor(QtCore.QThread):
if self.ipc:
self.ipc.sendLogout(username)
def sendMessage(self, module, message, data):
if self.ipc:
self.ipc.sendMessage(module, message, data)
def run(self):
if self.ipc is None:
return
@ -149,6 +153,7 @@ class MessagesProcessor(QtCore.QThread):
class OGASystemTray(QtWidgets.QSystemTrayIcon):
jobmgr = JobMgr()
def __init__(self, app_, parent=None):
self.app = app_
self.config = readConfig(client=True)
@ -245,11 +250,11 @@ class OGASystemTray(QtWidgets.QSystemTrayIcon):
logger.error('Module {} not found, messsage {} not sent'.format(module, message))
## when is this run??
def executeScript(self, script):
logger.debug('Executing script')
script = base64.b64decode(script.encode('ascii'))
th = ScriptExecutorThread(script)
th.start()
logger.debug('Executing received script "{}"'.format(script))
self.jobmgr.launch_job (script, True)
def logoff(self):
logger.debug('Logoff invoked')
@ -277,7 +282,10 @@ class OGASystemTray(QtWidgets.QSystemTrayIcon):
except Exception:
# May we have lost connection with server, simply log and exit in that case
logger.exception()
logger.exception("Got an exception, processing quit")
# File "/home/nati/Downloads/work/opengnsys/ogagent/src/OGAgentUser.py", line 286, in cleanup
# logger.exception("Got an exception, processing quit")
#TypeError: Logger.exception() takes 1 positional argument but 2 were given
#logger.exception("Got an exception, processing quit")
try:
# operations.logoff() # Uncomment this after testing to logoff user

View File

@ -58,7 +58,7 @@ class HTTPServerHandler(BaseHTTPRequestHandler):
def sendJsonResponse(self, data):
try: self.send_response(200)
except Exception as e: logger.warn (str(e))
except Exception as e: logger.warn ('exception: "{}"'.format(str(e)))
data = json.dumps(data)
self.send_header('Content-type', 'application/json')
self.send_header('Content-Length', str(len(data)))

View File

@ -0,0 +1,47 @@
import threading
import subprocess
import hashlib
from datetime import datetime, timezone
from opengnsys import operations
from opengnsys.log import logger
def job_readstdout(job):
for l in iter(job['p'].stdout.readline, b''):
job['stdout'] += l.decode ('utf-8', 'ignore')
def job_readstderr(job):
for l in iter(job['p'].stderr.readline, b''):
job['stderr'] += l.decode ('utf-8', 'ignore')
class JobMgr():
jobs = {}
def launch_job(self, script, is_client):
logger.debug ('in launch_job(), is_client "{}"'.format(is_client))
args = operations.build_popen_args (script)
logger.debug ('args "{}"'.format (args))
now = datetime.now (tz=timezone.utc)
ts = float (now.strftime ('%s.%f'))
jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:8]
p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],))
self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],))
self.jobs[jobid]['t1'].start()
self.jobs[jobid]['t2'].start()
logger.debug ('jobs "{}"'.format (self.jobs))
return jobid
def prepare_jobs(self):
## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return
st = {}
for jobid in self.jobs:
j = self.jobs[jobid]
st[jobid] = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr'])
if j['p'].poll() is not None: ## process finished
st[jobid]['rc'] = j['p'].returncode
st[jobid]['status'] = 'finished'
return st
#def kill_job(self, jobid):
# self.jobs[jobid]['p'].kill()

View File

@ -303,3 +303,7 @@ def get_etc_path():
Returns etc directory path.
"""
return os.sep + 'etc'
def build_popen_args(script):
return ['/bin/sh', '-c', script]

View File

@ -297,3 +297,7 @@ def get_etc_path():
Returns etc directory path.
"""
return os.sep + 'etc'
def build_popen_args(script):
return ['/bin/sh', '-c', script]

View File

@ -34,35 +34,37 @@ from opengnsys.workers import ClientWorker
from opengnsys import operations
from opengnsys.log import logger
from opengnsys.scriptThread import ScriptExecutorThread
from opengnsys.jobmgr import JobMgr
class OpenGnSysWorker(ClientWorker):
name = 'opengnsys'
jobmgr = JobMgr()
@staticmethod
def onActivation():
def onActivation(self):
logger.debug('Activate invoked')
@staticmethod
def onDeactivation():
def onDeactivation(self):
logger.debug('Deactivate invoked')
# Processes script execution
@staticmethod
def process_script(json_params):
logger.debug('Processed message: script({})'.format(json_params))
thr = ScriptExecutorThread(json_params['code'])
thr.start()
def process_script(self, json_params):
script = json_params['code']
logger.debug('Processing message: script({})'.format(script))
self.jobmgr.launch_job (script, True)
#self.sendServerMessage('script', {'op', 'launched'})
@staticmethod
def process_logoff(json_params):
def process_preparescripts(self, json_params):
logger.debug('Processing preparescripts request')
st = self.jobmgr.prepare_jobs()
logger.debug('Sending preparescripts to server with data "{}"'.format(st))
self.sendServerMessage('preparescripts', st)
def process_logoff(self, json_params):
logger.debug('Processed message: logoff({})'.format(json_params))
operations.logoff()
@staticmethod
def process_popup(json_params):
def process_popup(self, json_params):
logger.debug('Processed message: popup({})'.format(json_params))
ret = operations.showPopup(json_params['title'], json_params['message'])
#self.sendServerMessage('popup', {'op', ret})

View File

@ -45,7 +45,7 @@ import urllib.request
from configparser import NoOptionError
from opengnsys import REST, operations, VERSION
from opengnsys.log import logger
from opengnsys.scriptThread import ScriptExecutorThread
from opengnsys.jobmgr import JobMgr
from opengnsys.workers import ServerWorker
@ -111,6 +111,7 @@ class OpenGnSysWorker(ServerWorker):
random = None # Random string for secure connections
length = 32 # Random string length
exec_level = None # Execution level (permitted operations)
jobmgr = JobMgr()
def onActivation(self):
"""
@ -202,9 +203,6 @@ class OpenGnSysWorker(ServerWorker):
self.REST.sendMessage('ogagent/stopped', {'mac': self.interface.mac, 'ip': self.interface.ip,
'ostype': operations.os_type, 'osversion': operations.os_version})
def processClientMessage(self, message, data):
logger.debug('Got OpenGnsys message from client: {}, data {}'.format(message, data))
def onLogin(self, data):
"""
Sends session login notification to OpenGnsys server
@ -335,30 +333,38 @@ class OpenGnSysWorker(ServerWorker):
logger.debug('Processing script request')
# Decoding script
script = urllib.parse.unquote(base64.b64decode(post_params.get('script')).decode('utf-8'))
logger.debug('received script {}'.format(script))
if operations.os_type == 'Windows':
## for windows, we turn the script into utf16le, then to b64 again, and feed the blob to powershell
u16 = script.encode ('utf-16le') ## utf16
b64 = base64.b64encode (u16).decode ('utf-8') ## b64 (which returns bytes, so we need an additional decode(utf8))
script = """
import os
import tempfile
import subprocess
cp = subprocess.run ("powershell -WindowStyle Hidden -EncodedCommand {}", capture_output=True)
subprocs_log = os.path.join (tempfile.gettempdir(), 'opengnsys-subprocs.log')
with open (subprocs_log, 'ab') as fd: ## TODO improve this logging
fd.write (cp.stdout)
fd.write (cp.stderr)
""".format (b64)
else:
script = 'import subprocess; subprocess.check_output("""{0}""",shell=True)'.format(script)
# Executing script.
logger.debug('received script "{}"'.format(script))
if post_params.get('client', 'false') == 'false':
thr = ScriptExecutorThread(script)
thr.start()
else:
jobid = self.jobmgr.launch_job (script, False)
return {'op': 'launched', 'jobid': jobid}
else: ## post_params.get('client') is not 'false'
## send script as-is
self.sendClientMessage('script', {'code': script})
return {'op': 'launched'}
#return {'op': 'launched', 'jobid': jobid} ## TODO obtain jobid generated at the client (can it be done?)
return {'op': 'launched'}
@execution_level('full')
@check_secret
def process_preparescripts(self, path, get_params, post_params, server):
logger.debug('Processing preparescripts request')
self.st = self.jobmgr.prepare_jobs()
logger.debug('Sending preparescripts to client')
self.sendClientMessage('preparescripts', None)
return {}
def process_client_preparescripts(self, params):
logger.debug('Processing preparescripts message from client')
for p in params:
#logger.debug ('p "{}"'.format(p))
self.st[p] = params[p]
@execution_level('full')
@check_secret
def process_getscripts(self, path, get_params, post_params, server):
logger.debug('Processing getscripts request')
return self.st
@execution_level('full')
@check_secret

View File

@ -1,51 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 201 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
# pylint: disable-msg=E1101,W0703
from opengnsys.log import logger
import threading
import six
class ScriptExecutorThread(threading.Thread):
def __init__(self, script):
super(ScriptExecutorThread, self).__init__()
self.script = script
def run(self):
try:
logger.debug('Executing script: {}'.format(self.script))
six.exec_(self.script, globals(), None)
except Exception as e:
logger.error('Error executing script: {}'.format(e))

View File

@ -35,6 +35,7 @@ import os
import locale
import subprocess
import ctypes
import base64
from ctypes.wintypes import DWORD, LPCWSTR
import win32com.client # @UnresolvedImport, pylint: disable=import-error
import win32net # @UnresolvedImport, pylint: disable=import-error
@ -275,3 +276,10 @@ def get_etc_path():
Returns etc directory path.
"""
return os.path.join('C:', os.sep, 'Windows', 'System32', 'drivers', 'etc')
def build_popen_args(script):
## turn the script into utf16le, then to b64 again, and feed the blob to powershell
u16 = script.encode ('utf-16le') ## utf16
b64 = base64.b64encode (u16).decode ('utf-8') ## b64 (which returns bytes, so we need an additional decode(utf8))
return ['powershell', '-WindowStyle', 'Hidden', '-EncodedCommand', b64]