Compare commits

..

No commits in common. "main" and "setsid" have entirely different histories.
main ... setsid

12 changed files with 44 additions and 173 deletions

View File

@ -6,47 +6,6 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [8.0.0] - 2025-08-08
### Changed
- Return HTTP 409 on more than one background jobs
## [7.3.3] - 2025-08-07
### Fixed
- wait() for the browser
## [7.3.2] - 2025-08-04
### Fixed
- Fix syntax
## [7.3.1] - 2025-08-04
### Fixed
- On user logout, write the user to the log
- On EjecutarScript with client=true, return a job ID
## [7.3.0] - 2025-07-31
### Fixed
- Wait for zombies
### Changed
- Change "jobid" for "job_id" for consistency
## [7.2.2] - 2025-07-30
### Added
- Add missing file stop-agent.ps1
## [7.2.1] - 2025-07-29
### Changed

View File

@ -1,41 +1,3 @@
ogagent (8.0.0-1) stable; urgency=medium
* Return HTTP 409 on more than one background jobs
-- OpenGnsys developers <info@opengnsys.es> Fri, 08 Aug 2025 13:13:33 +0200
ogagent (7.3.3-1) stable; urgency=medium
* wait() for the browser
-- OpenGnsys developers <info@opengnsys.es> Thu, 07 Aug 2025 12:08:39 +0200
ogagent (7.3.2-1) stable; urgency=medium
* Fix syntax
-- OpenGnsys developers <info@opengnsys.es> Mon, 04 Aug 2025 14:36:47 +0200
ogagent (7.3.1-1) stable; urgency=medium
* On user logout, write the user to the log
* On EjecutarScript with client=true, return a job ID
-- OpenGnsys developers <info@opengnsys.es> Mon, 04 Aug 2025 14:22:52 +0200
ogagent (7.3.0-1) stable; urgency=medium
* Wait for zombies
* Change "jobid" for "job_id" for consistency
-- OpenGnsys developers <info@opengnsys.es> Thu, 31 Jul 2025 10:35:16 +0200
ogagent (7.2.2-1) stable; urgency=medium
* Add missing file stop-agent.ps1
-- OpenGnsys developers <info@opengnsys.es> Wed, 30 Jul 2025 10:20:21 +0200
ogagent (7.2.1-1) stable; urgency=medium
* Run process in a new POSIX session and group, send termination signals to the whole process group

View File

@ -1 +1 @@
8.0.0
7.2.1

View File

@ -105,7 +105,6 @@ class HTTPServerHandler(BaseHTTPRequestHandler):
self.sendJsonError(500, exceptionToMessage(e))
else:
arg0 = e.args[0]
while isinstance (arg0, Exception): arg0 = arg0.args[0] ## handle nested exceptions
if type (arg0) is str:
logger.debug ('Message processor for "{}" returned exception string "{}"'.format(path[0], str(e)))
self.sendJsonError (500, exceptionToMessage(e))

View File

@ -110,11 +110,6 @@ class ClientProcessor(threading.Thread):
logger.debug('Got Client message {}={}'.format(msg, REV_DICT.get(msg)))
if self.parent.clientMessageProcessor is not None:
self.parent.clientMessageProcessor(msg, data)
if msg == REQ_LOGIN:
if b',' in data:
self.user = data.split (b',')[0]
else:
self.user = data
def run(self):
self.running = True
@ -202,14 +197,8 @@ class ClientProcessor(threading.Thread):
logger.error('Invalid message in queue: {}'.format(e))
logger.debug('Client processor stopped')
if os.path.exists ('/windows/temp'):
fd = open ('/windows/temp/ogagentuser_died', 'wb')
fd.write (self.user)
fd.close()
else:
fd = open ('/tmp/ogagentuser_died', 'wb')
fd.write (self.user)
fd.close()
if os.path.exists ('/windows/temp'): open ('/windows/temp/ogagentuser_died', 'w').close()
else: open ( '/tmp/ogagentuser_died', 'w').close()
try:
self.clientSocket.close()
except Exception:

View File

@ -23,32 +23,32 @@ class JobMgr():
logger.debug ('args "{}"'.format (args))
now = datetime.now (tz=timezone.utc)
ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z') ## '%s' doesn't work on windows
job_id = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.jobs[job_id] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[job_id]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[job_id],))
self.jobs[job_id]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[job_id],))
self.jobs[job_id]['t1'].start()
self.jobs[job_id]['t2'].start()
self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],))
self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],))
self.jobs[jobid]['t1'].start()
self.jobs[jobid]['t2'].start()
logger.debug ('jobs "{}"'.format (self.jobs))
return job_id
return jobid
def prepare_jobs(self):
## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return
st = []
for job_id in self.jobs:
j = self.jobs[job_id]
for jobid in self.jobs:
j = self.jobs[jobid]
entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr'])
entry['job_id'] = job_id
entry['jobid'] = jobid
if j['p'].poll() is not None: ## process finished
entry['rc'] = j['p'].returncode
entry['status'] = 'finished'
st.append (entry)
return st
def terminate_job(self, job_id):
if job_id not in self.jobs: return {}
p = self.jobs[job_id]['p']
def terminate_job(self, jobid):
if jobid not in self.jobs: return {}
p = self.jobs[jobid]['p']
p.terminate()
time.sleep (1)
if p.poll() is not None:

View File

@ -74,12 +74,10 @@ class OGAgentSvc(Daemon, CommonService):
while self.isAlive:
client_died=False
if os.path.exists ('/tmp/ogagentuser_died'):
with open ('/tmp/ogagentuser_died', 'rb') as fd:
u = fd.read()
os.unlink ('/tmp/ogagentuser_died')
client_died=True
if client_died:
self.notifyLogout (u)
self.notifyLogout (b'')
# In milliseconds, will break
self.doWait(1000)

View File

@ -50,13 +50,13 @@ class OpenGnSysWorker(ClientWorker):
def process_script(self, json_params):
script = json_params['code']
logger.debug('Processing message: script({})'.format(script))
job_id = self.jobmgr.launch_job (script, True)
self.sendServerMessage('script_launched', {'op': 'launched', 'job_id': job_id})
self.jobmgr.launch_job (script, True)
#self.sendServerMessage('script', {'op', 'launched'})
def process_terminatescript(self, json_params):
job_id = json_params['job_id']
logger.debug('Processing terminatescript request, job_id "{}"'.format (job_id))
self.jobmgr.terminate_job (job_id)
jobid = json_params['jobid']
logger.debug('Processing terminatescript request, jobid "{}"'.format (jobid))
self.jobmgr.terminate_job (jobid)
def process_preparescripts(self, json_params):
logger.debug('Processing preparescripts request')

View File

@ -369,46 +369,24 @@ class OpenGnSysWorker(ServerWorker):
logger.debug('received script "{}"'.format(script))
if post_params.get('client', 'false') == 'false':
job_id = self.jobmgr.launch_job (script, False)
return {'op': 'launched', 'job_id': job_id}
jobid = self.jobmgr.launch_job (script, False)
return {'op': 'launched', 'jobid': jobid}
else: ## post_params.get('client') is not 'false'
## send script as-is
self.sendClientMessage('script', {'code': script})
## wait for job_id generated at the client
job_id = None
iters = 0
while True:
time.sleep (0.2)
if os.path.exists ('/tmp/EjecutarScript-jobid'):
with open ('/tmp/EjecutarScript-jobid', 'r') as fd:
job_id = fd.read()
break
iters += 1
if iters >= 10: break
try: os.unlink ('/tmp/EjecutarScript-jobid')
except: pass
if job_id is None: return {'op': 'launched'}
else: return {'op': 'launched', 'job_id': job_id}
def process_client_script_launched(self, data):
fd = open ('/tmp/EjecutarScript-jobid', 'w')
fd.write (data['job_id'])
fd.close()
return True
#return {'op': 'launched', 'jobid': jobid} ## TODO obtain jobid generated at the client (can it be done?)
return {'op': 'launched'}
@execution_level('full')
@check_secret
def process_terminatescript(self, path, get_params, post_params, server):
job_id = post_params.get('job_id', None)
logger.debug('Processing terminate_script request, job_id "{}"'.format (job_id))
if job_id is None:
jobid = post_params.get('jobid', None)
logger.debug('Processing terminate_script request, jobid "{}"'.format (jobid))
if jobid is None:
return {}
self.sendClientMessage('terminatescript', {'job_id': job_id})
self.jobmgr.terminate_job (job_id)
self.sendClientMessage('terminatescript', {'jobid': jobid})
self.jobmgr.terminate_job (jobid)
return {}
@execution_level('full')

View File

@ -108,12 +108,10 @@ class OGAgentSvc(win32serviceutil.ServiceFramework, CommonService):
while self.isAlive:
client_died=False
if os.path.exists ('/windows/temp/ogagentuser_died'):
with open ('/windows/temp/ogagentuser_died', 'rb') as fd:
u = fd.read()
os.unlink ('/windows/temp/ogagentuser_died')
client_died=True
if client_died:
self.notifyLogout (u)
self.notifyLogout (b'')
# Pumps & processes any waiting messages
pythoncom.PumpWaitingMessages()

View File

@ -35,7 +35,6 @@ import re
import time
try: import dbus ## don't fail on windows (the worker will later refuse to load anyway)
except: pass
import select
import random
import subprocess
import threading
@ -210,11 +209,6 @@ class ogLiveWorker(ServerWorker):
def mon (self):
n = 0
while True:
if self.browser_process is not None:
try:
self.browser_process.wait (0.05)
except subprocess.TimeoutExpired:
pass
with self.thread_lock:
for k in self.thread_list:
elem = self.thread_list[k]
@ -298,26 +292,22 @@ class ogLiveWorker(ServerWorker):
pass
sout = serr = ''
finished = False
while True:
try:
p.wait (0.05)
finished = True
except subprocess.TimeoutExpired:
pass
ready_to_read, _, _ = select.select ([p.stdout, p.stderr], [], [], 0.2)
if p.stdout in ready_to_read:
l = p.stdout.readline()
poll_iterations = 1
while p.poll() is None:
for l in iter (p.stdout.readline, b''):
partial = l.decode ('utf-8', 'ignore')
if self.stdout_q: self.stdout_q.put (partial)
sout += partial
if p.stderr in ready_to_read:
l = p.stderr.readline()
for l in iter (p.stderr.readline, b''):
partial = l.decode ('utf-8', 'ignore')
serr += partial
if finished: break
## poll quickly at first, then poll less frequently
if poll_iterations > 15: sleep_time = 1
elif poll_iterations > 10: sleep_time = 0.2
else: sleep_time = 0.1
time.sleep (sleep_time)
poll_iterations += 1
sout = sout.strip()
serr = serr.strip()
@ -404,7 +394,7 @@ class ogLiveWorker(ServerWorker):
if 'ServiceUnknown' in str(e):
logger.warning ('browser is not running, launching a new one')
browser_log_fd = open ('/var/log/launch_browser.log', 'a')
self.browser_process = subprocess.Popen (['/usr/bin/launch_browser', url], stdout=browser_log_fd, stderr=subprocess.STDOUT)
subprocess.Popen (['/usr/bin/launch_browser', url], stdout=browser_log_fd, stderr=subprocess.STDOUT)
browser_log_fd.close()
else:
logger.error (f'Error al cambiar URL: ({e})')
@ -464,7 +454,6 @@ class ogLiveWorker(ServerWorker):
self.pid_q = None ## for passing PIDs around
self.stdout_q = None ## for passing stdout
self.progress_jobs = {}
self.browser_process = None
ogcore_scheme = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME', 'https')
ogcore_ip = os.environ.get ('OGAGENTCFG_OGCORE_IP', '192.168.2.1')
@ -508,7 +497,7 @@ class ogLiveWorker(ServerWorker):
break
if any_job_running:
logger.info ('some job is already running, refusing to launch another one')
raise Exception ({ '_httpcode': 409, '_msg': 'some job is already running, refusing to launch another one' })
return { 'job_id': None, 'message': 'some job is already running, refusing to launch another one' }
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
import queue

View File

@ -1 +0,0 @@
get-process -name ogagentuser|stop-process