Merge pull request 'job manager' (#5) from ogagent-jobs into main

Reviewed-on: #5
ogadmcli
Natalia Serrano 2024-08-07 14:03:24 +02:00
commit 4b5193105c
13 changed files with 464 additions and 99 deletions

View File

@ -1,3 +1,9 @@
ogagent (1.3.4-1) stable; urgency=medium
* Implement JobMgr
-- OpenGnsys developers <info@opengnsys.es> Tue, 30 Jul 2024 13:39:55 +0200
ogagent (1.3.1-1) stable; urgency=medium ogagent (1.3.1-1) stable; urgency=medium
* Migrate the update script from shell to python * Migrate the update script from shell to python

297
openapi.yml 100644
View File

@ -0,0 +1,297 @@
openapi: 3.0.3
info:
title: OgAgent API
description: OgAgent API
version: 0.0.1
paths:
/opengnsys/status:
post:
summary: Get status of the agent
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/StatusReq'
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/StatusRes'
/opengnsys/poweroff:
post:
summary: Power agent off
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/EmptyObj'
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/LaunchedRes'
/opengnsys/reboot:
post:
summary: Reboot agent
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/EmptyObj'
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/LaunchedRes'
/opengnsys/script:
post:
summary: Run script on agent
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ScriptReq'
required: true
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/ScriptRes'
/opengnsys/terminatescript:
post:
summary: Terminate running script on agent
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/TerminateScriptReq'
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/EmptyObj'
/opengnsys/preparescripts:
post:
summary: Prepare list of scripts running on agent
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/EmptyObj'
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/EmptyObj'
/opengnsys/getscripts:
post:
summary: Get the list of scripts running on agent
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/EmptyObj'
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/GetScriptsRes'
/opengnsys/logoff:
post:
summary: Log remote user off
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/EmptyObj'
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/LogoffRes'
/opengnsys/popup:
post:
summary: Show message on agent
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/Popup'
required: true
responses:
'200':
description: Success
content:
application/json:
schema:
$ref: '#/components/schemas/LaunchedRes'
components:
schemas:
EmptyObj:
type: object
additionalProperties: false
Jobid:
type: string
example:
- "deadbeef"
StatusReq:
type: object
properties:
detail:
type: boolean
StatusRes:
type: object
required:
- status
properties:
status:
type: string
enum:
- "LNX"
- "OSX"
- "WIN"
loggedin:
type: boolean
session:
type: string
example:
- "x11"
agent_version:
type: string
os_version:
type: string
sys_load:
type: number
LaunchedRes:
type: object
required:
- op
properties:
op:
type: string
enum:
- "launched"
ScriptReq:
type: object
required:
- script
properties:
script:
type: string
example:
- "uptime\nwho"
- "Start-Process notepad.exe"
client:
type: boolean
default: false
ScriptRes:
type: object
required:
- op
properties:
op:
type: string
enum:
- "launched"
jobid:
$ref: '#/components/schemas/Jobid'
TerminateScriptReq:
type: object
required:
- jobid
properties:
jobid:
$ref: '#/components/schemas/Jobid'
RunningScript:
type: object
required:
- jobid
- pid
- starttime
- script
- client
- status
- stdout
- stderr
properties:
jobid:
$ref: '#/components/schemas/Jobid'
pid:
type: integer
starttime:
type: string
example: "2024-12-31 23:59:59.123456+0000"
script:
type: string
client:
type: boolean
status:
type: string
enum:
- "running"
- "finished"
stdout:
type: string
stderr:
type: string
rc:
type: integer
GetScriptsRes:
type: array
items:
$ref: '#/components/schemas/RunningScript'
LogoffRes:
type: object
required:
- op
properties:
op:
type: string
enum:
- "sent to client"
Popup:
type: object
properties:
title:
type: string
message:
type: string

View File

@ -1,5 +1,9 @@
# -*- mode: python ; coding: utf-8 -*- # -*- mode: python ; coding: utf-8 -*-
## generated on windows using:
## pyi-makespec.exe --windowed --icon img\oga.ico --manifest OGAgent.manifest OGAgentUser.py opengnsys\windows\OGAgentService.py
## move OGAgentUser.spec OGAgent.spec
ogausr_a = Analysis( ogausr_a = Analysis(
['OGAgentUser.py'], ['OGAgentUser.py'],

View File

@ -43,7 +43,7 @@ from opengnsys import VERSION, ipc, operations, utils
from opengnsys.config import readConfig from opengnsys.config import readConfig
from opengnsys.loader import loadModules from opengnsys.loader import loadModules
from opengnsys.log import logger from opengnsys.log import logger
from opengnsys.scriptThread import ScriptExecutorThread from opengnsys.jobmgr import JobMgr
from opengnsys.service import IPC_PORT from opengnsys.service import IPC_PORT
trayIcon = None trayIcon = None
@ -117,6 +117,10 @@ class MessagesProcessor(QtCore.QThread):
if self.ipc: if self.ipc:
self.ipc.sendLogout(username) self.ipc.sendLogout(username)
def sendMessage(self, module, message, data):
if self.ipc:
self.ipc.sendMessage(module, message, data)
def run(self): def run(self):
if self.ipc is None: if self.ipc is None:
return return
@ -149,6 +153,7 @@ class MessagesProcessor(QtCore.QThread):
class OGASystemTray(QtWidgets.QSystemTrayIcon): class OGASystemTray(QtWidgets.QSystemTrayIcon):
jobmgr = JobMgr()
def __init__(self, app_, parent=None): def __init__(self, app_, parent=None):
self.app = app_ self.app = app_
self.config = readConfig(client=True) self.config = readConfig(client=True)
@ -245,11 +250,11 @@ class OGASystemTray(QtWidgets.QSystemTrayIcon):
logger.error('Module {} not found, messsage {} not sent'.format(module, message)) logger.error('Module {} not found, messsage {} not sent'.format(module, message))
## when is this run??
def executeScript(self, script): def executeScript(self, script):
logger.debug('Executing script')
script = base64.b64decode(script.encode('ascii')) script = base64.b64decode(script.encode('ascii'))
th = ScriptExecutorThread(script) logger.debug('Executing received script "{}"'.format(script))
th.start() self.jobmgr.launch_job (script, True)
def logoff(self): def logoff(self):
logger.debug('Logoff invoked') logger.debug('Logoff invoked')
@ -277,7 +282,10 @@ class OGASystemTray(QtWidgets.QSystemTrayIcon):
except Exception: except Exception:
# May we have lost connection with server, simply log and exit in that case # May we have lost connection with server, simply log and exit in that case
logger.exception() logger.exception()
logger.exception("Got an exception, processing quit") # File "/home/nati/Downloads/work/opengnsys/ogagent/src/OGAgentUser.py", line 286, in cleanup
# logger.exception("Got an exception, processing quit")
#TypeError: Logger.exception() takes 1 positional argument but 2 were given
#logger.exception("Got an exception, processing quit")
try: try:
# operations.logoff() # Uncomment this after testing to logoff user # operations.logoff() # Uncomment this after testing to logoff user

View File

@ -1 +1 @@
1.3.3 1.3.4

View File

@ -58,7 +58,7 @@ class HTTPServerHandler(BaseHTTPRequestHandler):
def sendJsonResponse(self, data): def sendJsonResponse(self, data):
try: self.send_response(200) try: self.send_response(200)
except Exception as e: logger.warn (str(e)) except Exception as e: logger.warn ('exception: "{}"'.format(str(e)))
data = json.dumps(data) data = json.dumps(data)
self.send_header('Content-type', 'application/json') self.send_header('Content-type', 'application/json')
self.send_header('Content-Length', str(len(data))) self.send_header('Content-Length', str(len(data)))

View File

@ -0,0 +1,62 @@
import threading
import subprocess
import hashlib
import time
from datetime import datetime, timezone
from opengnsys import operations
from opengnsys.log import logger
def job_readstdout(job):
for l in iter(job['p'].stdout.readline, b''):
job['stdout'] += l.decode ('utf-8', 'ignore')
def job_readstderr(job):
for l in iter(job['p'].stderr.readline, b''):
job['stderr'] += l.decode ('utf-8', 'ignore')
class JobMgr():
jobs = {}
def launch_job(self, script, is_client):
logger.debug ('in launch_job(), is_client "{}"'.format(is_client))
args = operations.build_popen_args (script)
logger.debug ('args "{}"'.format (args))
now = datetime.now (tz=timezone.utc)
ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z') ## '%s' doesn't work on windows
jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],))
self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],))
self.jobs[jobid]['t1'].start()
self.jobs[jobid]['t2'].start()
logger.debug ('jobs "{}"'.format (self.jobs))
return jobid
def prepare_jobs(self):
## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return
st = []
for jobid in self.jobs:
j = self.jobs[jobid]
entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr'])
entry['jobid'] = jobid
if j['p'].poll() is not None: ## process finished
entry['rc'] = j['p'].returncode
entry['status'] = 'finished'
st.append (entry)
return st
def terminate_job(self, jobid):
if jobid not in self.jobs: return {}
p = self.jobs[jobid]['p']
p.terminate()
time.sleep (1)
if p.poll() is not None:
return { 'terminated': True }
p.kill()
time.sleep (1)
if p.poll() is not None:
return { 'killed': True }
return { 'killed': False }

View File

@ -303,3 +303,7 @@ def get_etc_path():
Returns etc directory path. Returns etc directory path.
""" """
return os.sep + 'etc' return os.sep + 'etc'
def build_popen_args(script):
return ['/bin/sh', '-c', script]

View File

@ -297,3 +297,7 @@ def get_etc_path():
Returns etc directory path. Returns etc directory path.
""" """
return os.sep + 'etc' return os.sep + 'etc'
def build_popen_args(script):
return ['/bin/sh', '-c', script]

View File

@ -34,35 +34,41 @@ from opengnsys.workers import ClientWorker
from opengnsys import operations from opengnsys import operations
from opengnsys.log import logger from opengnsys.log import logger
from opengnsys.scriptThread import ScriptExecutorThread from opengnsys.jobmgr import JobMgr
class OpenGnSysWorker(ClientWorker): class OpenGnSysWorker(ClientWorker):
name = 'opengnsys' name = 'opengnsys'
jobmgr = JobMgr()
@staticmethod def onActivation(self):
def onActivation():
logger.debug('Activate invoked') logger.debug('Activate invoked')
@staticmethod def onDeactivation(self):
def onDeactivation():
logger.debug('Deactivate invoked') logger.debug('Deactivate invoked')
# Processes script execution def process_script(self, json_params):
@staticmethod script = json_params['code']
def process_script(json_params): logger.debug('Processing message: script({})'.format(script))
logger.debug('Processed message: script({})'.format(json_params)) self.jobmgr.launch_job (script, True)
thr = ScriptExecutorThread(json_params['code'])
thr.start()
#self.sendServerMessage('script', {'op', 'launched'}) #self.sendServerMessage('script', {'op', 'launched'})
@staticmethod def process_terminatescript(self, json_params):
def process_logoff(json_params): jobid = json_params['jobid']
logger.debug('Processing terminatescript request, jobid "{}"'.format (jobid))
self.jobmgr.terminate_job (jobid)
def process_preparescripts(self, json_params):
logger.debug('Processing preparescripts request')
st = self.jobmgr.prepare_jobs()
logger.debug('Sending preparescripts to server with data "{}"'.format(st))
self.sendServerMessage('preparescripts', st)
def process_logoff(self, json_params):
logger.debug('Processed message: logoff({})'.format(json_params)) logger.debug('Processed message: logoff({})'.format(json_params))
operations.logoff() operations.logoff()
@staticmethod def process_popup(self, json_params):
def process_popup(json_params):
logger.debug('Processed message: popup({})'.format(json_params)) logger.debug('Processed message: popup({})'.format(json_params))
ret = operations.showPopup(json_params['title'], json_params['message']) ret = operations.showPopup(json_params['title'], json_params['message'])
#self.sendServerMessage('popup', {'op', ret}) #self.sendServerMessage('popup', {'op', ret})

View File

@ -45,7 +45,7 @@ import urllib.request
from configparser import NoOptionError from configparser import NoOptionError
from opengnsys import REST, operations, VERSION from opengnsys import REST, operations, VERSION
from opengnsys.log import logger from opengnsys.log import logger
from opengnsys.scriptThread import ScriptExecutorThread from opengnsys.jobmgr import JobMgr
from opengnsys.workers import ServerWorker from opengnsys.workers import ServerWorker
@ -111,6 +111,7 @@ class OpenGnSysWorker(ServerWorker):
random = None # Random string for secure connections random = None # Random string for secure connections
length = 32 # Random string length length = 32 # Random string length
exec_level = None # Execution level (permitted operations) exec_level = None # Execution level (permitted operations)
jobmgr = JobMgr()
def onActivation(self): def onActivation(self):
""" """
@ -202,9 +203,6 @@ class OpenGnSysWorker(ServerWorker):
self.REST.sendMessage('ogagent/stopped', {'mac': self.interface.mac, 'ip': self.interface.ip, self.REST.sendMessage('ogagent/stopped', {'mac': self.interface.mac, 'ip': self.interface.ip,
'ostype': operations.os_type, 'osversion': operations.os_version}) 'ostype': operations.os_type, 'osversion': operations.os_version})
def processClientMessage(self, message, data):
logger.debug('Got OpenGnsys message from client: {}, data {}'.format(message, data))
def onLogin(self, data): def onLogin(self, data):
""" """
Sends session login notification to OpenGnsys server Sends session login notification to OpenGnsys server
@ -335,31 +333,50 @@ class OpenGnSysWorker(ServerWorker):
logger.debug('Processing script request') logger.debug('Processing script request')
# Decoding script # Decoding script
script = urllib.parse.unquote(base64.b64decode(post_params.get('script')).decode('utf-8')) script = urllib.parse.unquote(base64.b64decode(post_params.get('script')).decode('utf-8'))
logger.debug('received script {}'.format(script)) logger.debug('received script "{}"'.format(script))
if operations.os_type == 'Windows':
## for windows, we turn the script into utf16le, then to b64 again, and feed the blob to powershell
u16 = script.encode ('utf-16le') ## utf16
b64 = base64.b64encode (u16).decode ('utf-8') ## b64 (which returns bytes, so we need an additional decode(utf8))
script = """
import os
import tempfile
import subprocess
cp = subprocess.run ("powershell -WindowStyle Hidden -EncodedCommand {}", capture_output=True)
subprocs_log = os.path.join (tempfile.gettempdir(), 'opengnsys-subprocs.log')
with open (subprocs_log, 'ab') as fd: ## TODO improve this logging
fd.write (cp.stdout)
fd.write (cp.stderr)
""".format (b64)
else:
script = 'import subprocess; subprocess.check_output("""{0}""",shell=True)'.format(script)
# Executing script.
if post_params.get('client', 'false') == 'false': if post_params.get('client', 'false') == 'false':
thr = ScriptExecutorThread(script) jobid = self.jobmgr.launch_job (script, False)
thr.start() return {'op': 'launched', 'jobid': jobid}
else:
else: ## post_params.get('client') is not 'false'
## send script as-is
self.sendClientMessage('script', {'code': script}) self.sendClientMessage('script', {'code': script})
#return {'op': 'launched', 'jobid': jobid} ## TODO obtain jobid generated at the client (can it be done?)
return {'op': 'launched'} return {'op': 'launched'}
@execution_level('full')
@check_secret
def process_terminatescript(self, path, get_params, post_params, server):
jobid = post_params.get('jobid', None)
logger.debug('Processing terminate_script request, jobid "{}"'.format (jobid))
if jobid is None:
return {}
self.sendClientMessage('terminatescript', {'jobid': jobid})
self.jobmgr.terminate_job (jobid)
return {}
@execution_level('full')
@check_secret
def process_preparescripts(self, path, get_params, post_params, server):
logger.debug('Processing preparescripts request')
self.st = self.jobmgr.prepare_jobs()
logger.debug('Sending preparescripts to client')
self.sendClientMessage('preparescripts', None)
return {}
def process_client_preparescripts(self, params):
logger.debug('Processing preparescripts message from client')
for p in params:
#logger.debug ('p "{}"'.format(p))
self.st.append (p)
@execution_level('full')
@check_secret
def process_getscripts(self, path, get_params, post_params, server):
logger.debug('Processing getscripts request')
return self.st
@execution_level('full') @execution_level('full')
@check_secret @check_secret
def process_logoff(self, path, get_params, post_params, server): def process_logoff(self, path, get_params, post_params, server):

View File

@ -1,51 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 201 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
# pylint: disable-msg=E1101,W0703
from opengnsys.log import logger
import threading
import six
class ScriptExecutorThread(threading.Thread):
def __init__(self, script):
super(ScriptExecutorThread, self).__init__()
self.script = script
def run(self):
try:
logger.debug('Executing script: {}'.format(self.script))
six.exec_(self.script, globals(), None)
except Exception as e:
logger.error('Error executing script: {}'.format(e))

View File

@ -35,6 +35,7 @@ import os
import locale import locale
import subprocess import subprocess
import ctypes import ctypes
import base64
from ctypes.wintypes import DWORD, LPCWSTR from ctypes.wintypes import DWORD, LPCWSTR
import win32com.client # @UnresolvedImport, pylint: disable=import-error import win32com.client # @UnresolvedImport, pylint: disable=import-error
import win32net # @UnresolvedImport, pylint: disable=import-error import win32net # @UnresolvedImport, pylint: disable=import-error
@ -275,3 +276,10 @@ def get_etc_path():
Returns etc directory path. Returns etc directory path.
""" """
return os.path.join('C:', os.sep, 'Windows', 'System32', 'drivers', 'etc') return os.path.join('C:', os.sep, 'Windows', 'System32', 'drivers', 'etc')
def build_popen_args(script):
## turn the script into utf16le, then to b64 again, and feed the blob to powershell
u16 = script.encode ('utf-16le') ## utf16
b64 = base64.b64encode (u16).decode ('utf-8') ## b64 (which returns bytes, so we need an additional decode(utf8))
return ['powershell', '-WindowStyle', 'Hidden', '-EncodedCommand', b64]