Compare commits

...

12 Commits

Author SHA1 Message Date
Natalia Serrano 33f65a45b7 Merge pull request 'jobid-wait-zombies' (#51) from jobid-wait-zombies into main
Reviewed-on: #51
2025-07-31 10:54:26 +02:00
Natalia Serrano 75d222c425 refs #2554 wait for zombies 2025-07-31 10:36:00 +02:00
Natalia Serrano e783d7c1fa refs #2554 wait for zombies 2025-07-31 10:34:01 +02:00
Natalia Serrano ee183f6ad3 refs #2556 change "jobid" for "job_id" 2025-07-31 10:18:19 +02:00
Natalia Serrano 054071a5ab Merge pull request 'refs #2547 add missing file stop-agent.ps1' (#50) from add-file into main
Reviewed-on: #50
2025-07-30 10:21:44 +02:00
Natalia Serrano 95360a244e refs #2547 add missing file stop-agent.ps1 2025-07-30 10:21:02 +02:00
Natalia Serrano cbfcc22d8a Merge pull request 'refs #2509 use setsid, send signals to the pgrp' (#49) from setsid into main
Reviewed-on: #49
2025-07-29 14:13:56 +02:00
Natalia Serrano 9fe1b5d1d5 refs #2509 use setsid, send signals to the pgrp 2025-07-29 14:12:56 +02:00
Natalia Serrano 565299c7c0 Merge pull request 'log-inoglive' (#48) from log-inoglive into main
Reviewed-on: #48
2025-07-28 15:18:25 +02:00
Natalia Serrano c53be3f3ec refs #2537 log whether we are in ogLive or not 2025-07-28 15:17:40 +02:00
Natalia Serrano 198353b214 refs #2537 log whether we are in ogLive or not 2025-07-28 13:56:56 +02:00
Natalia Serrano 4a8fc2b469 Merge pull request 'refs #2520 don't pass the tag parameter to CrearImagenGit' (#47) from crearimagengit-no-tag into main
Reviewed-on: #47
2025-07-24 15:33:19 +02:00
10 changed files with 114 additions and 48 deletions

View File

@ -6,6 +6,34 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [7.3.0] - 2025-07-31
### Fixed
- Wait for zombies
### Changed
- Change "jobid" for "job_id" for consistency
## [7.2.2] - 2025-07-30
### Added
- Add missing file stop-agent.ps1
## [7.2.1] - 2025-07-29
### Changed
- Run process in a new POSIX session and group, send termination signals to the whole process group
## [7.2.0] - 2025-07-28
### Added
- Log whether we are in ogLive or not
## [7.1.0] - 2025-07-24
### Changed

View File

@ -1,3 +1,28 @@
ogagent (7.3.0-1) stable; urgency=medium
* Wait for zombies
* Change "jobid" for "job_id" for consistency
-- OpenGnsys developers <info@opengnsys.es> Thu, 31 Jul 2025 10:35:16 +0200
ogagent (7.2.2-1) stable; urgency=medium
* Add missing file stop-agent.ps1
-- OpenGnsys developers <info@opengnsys.es> Wed, 30 Jul 2025 10:20:21 +0200
ogagent (7.2.1-1) stable; urgency=medium
* Run process in a new POSIX session and group, send termination signals to the whole process group
-- OpenGnsys developers <info@opengnsys.es> Tue, 29 Jul 2025 12:52:08 +0200
ogagent (7.2.0-1) stable; urgency=medium
* Log whether we are in ogLive or not
-- OpenGnsys developers <info@opengnsys.es> Mon, 28 Jul 2025 13:55:28 +0200
ogagent (7.1.0-1) stable; urgency=medium
* Don't pass the "tag" parameter to CrearImagenGit

View File

@ -1 +1 @@
7.1.0
7.3.0

View File

@ -23,32 +23,32 @@ class JobMgr():
logger.debug ('args "{}"'.format (args))
now = datetime.now (tz=timezone.utc)
ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z') ## '%s' doesn't work on windows
jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
job_id = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],))
self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],))
self.jobs[jobid]['t1'].start()
self.jobs[jobid]['t2'].start()
self.jobs[job_id] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[job_id]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[job_id],))
self.jobs[job_id]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[job_id],))
self.jobs[job_id]['t1'].start()
self.jobs[job_id]['t2'].start()
logger.debug ('jobs "{}"'.format (self.jobs))
return jobid
return job_id
def prepare_jobs(self):
## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return
st = []
for jobid in self.jobs:
j = self.jobs[jobid]
for job_id in self.jobs:
j = self.jobs[job_id]
entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr'])
entry['jobid'] = jobid
entry['job_id'] = job_id
if j['p'].poll() is not None: ## process finished
entry['rc'] = j['p'].returncode
entry['status'] = 'finished'
st.append (entry)
return st
def terminate_job(self, jobid):
if jobid not in self.jobs: return {}
p = self.jobs[jobid]['p']
def terminate_job(self, job_id):
if job_id not in self.jobs: return {}
p = self.jobs[job_id]['p']
p.terminate()
time.sleep (1)
if p.poll() is not None:

View File

@ -42,6 +42,8 @@ OTHER, DEBUG, INFO, WARN, ERROR, FATAL = (10000 * (x + 1) for x in range(6))
class LocalLogger(object):
def __init__(self):
self.extra = { 'in_oglive': None }
# tempdir is different for "user application" and "service"
# service wil get c:\windows\temp, while user will get c:\users\XXX\temp
# Try to open logger at /var/log path
@ -51,13 +53,13 @@ class LocalLogger(object):
for logDir in ('/var/log', os.path.expanduser('~'), tempfile.gettempdir()):
try:
fname1 = os.path.join (logDir, 'opengnsys.log')
fmt1 = logging.Formatter (fmt='%(levelname)s %(asctime)s (%(threadName)s) (%(funcName)s) %(message)s')
fmt1 = logging.Formatter (fmt='%(levelname)s %(asctime)s in_oglive=%(in_oglive)s (%(threadName)s) (%(funcName)s) %(message)s')
fh1 = logging.FileHandler (filename=fname1, mode='a')
fh1.setFormatter (fmt1)
fh1.setLevel (logging.DEBUG)
fname2 = os.path.join (logDir, 'opengnsys.json.log')
fmt2 = JsonFormatter ({"timestamp": "asctime", "severity": "levelname", "threadName": "threadName", "function": "funcName", "message": "message"}, time_format='%Y-%m-%d %H:%M:%S', msec_format='')
fmt2 = JsonFormatter ({"timestamp": "asctime", "severity": "levelname", "in_oglive": "in_oglive", "threadName": "threadName", "function": "funcName", "message": "message"}, time_format='%Y-%m-%d %H:%M:%S', msec_format='')
fh2 = logging.FileHandler (filename=fname2, mode='a')
fh2.setFormatter (fmt2)
fh2.setLevel (logging.DEBUG)
@ -77,11 +79,14 @@ class LocalLogger(object):
self.logger = None
def log(self, level, message):
if self.extra['in_oglive'] is None:
self.extra['in_oglive'] = os.path.exists ('/scripts/functions')
# Debug messages are logged to a file
# our loglevels are 10000 (other), 20000 (debug), ....
# logging levels are 10 (debug), 20 (info)
# OTHER = logging.NOTSET
self.logger.log(int(level / 1000) - 10, message, stacklevel=4)
self.logger.log(int(level / 1000) - 10, message, stacklevel=4, extra=self.extra)
def isWindows(self):
return False

View File

@ -54,9 +54,9 @@ class OpenGnSysWorker(ClientWorker):
#self.sendServerMessage('script', {'op', 'launched'})
def process_terminatescript(self, json_params):
jobid = json_params['jobid']
logger.debug('Processing terminatescript request, jobid "{}"'.format (jobid))
self.jobmgr.terminate_job (jobid)
job_id = json_params['job_id']
logger.debug('Processing terminatescript request, job_id "{}"'.format (job_id))
self.jobmgr.terminate_job (job_id)
def process_preparescripts(self, json_params):
logger.debug('Processing preparescripts request')

View File

@ -369,24 +369,24 @@ class OpenGnSysWorker(ServerWorker):
logger.debug('received script "{}"'.format(script))
if post_params.get('client', 'false') == 'false':
jobid = self.jobmgr.launch_job (script, False)
return {'op': 'launched', 'jobid': jobid}
job_id = self.jobmgr.launch_job (script, False)
return {'op': 'launched', 'job_id': job_id}
else: ## post_params.get('client') is not 'false'
## send script as-is
self.sendClientMessage('script', {'code': script})
#return {'op': 'launched', 'jobid': jobid} ## TODO obtain jobid generated at the client (can it be done?)
#return {'op': 'launched', 'job_id': job_id} ## TODO obtain job_id generated at the client (can it be done?)
return {'op': 'launched'}
@execution_level('full')
@check_secret
def process_terminatescript(self, path, get_params, post_params, server):
jobid = post_params.get('jobid', None)
logger.debug('Processing terminate_script request, jobid "{}"'.format (jobid))
if jobid is None:
job_id = post_params.get('job_id', None)
logger.debug('Processing terminate_script request, job_id "{}"'.format (job_id))
if job_id is None:
return {}
self.sendClientMessage('terminatescript', {'jobid': jobid})
self.jobmgr.terminate_job (jobid)
self.sendClientMessage('terminatescript', {'job_id': job_id})
self.jobmgr.terminate_job (job_id)
return {}
@execution_level('full')

View File

@ -44,17 +44,19 @@ OTHER, DEBUG, INFO, WARN, ERROR, FATAL = (10000 * (x + 1) for x in range(6))
class LocalLogger(object):
def __init__(self):
self.extra = { 'in_oglive': False }
# tempdir is different for "user application" and "service"
# service wil get c:\windows\temp, while user will get c:\users\XXX\appdata\local\temp
fname1 = os.path.join (tempfile.gettempdir(), 'opengnsys.log')
fmt1 = logging.Formatter (fmt='%(levelname)s %(asctime)s (%(threadName)s) (%(funcName)s) %(message)s')
fmt1 = logging.Formatter (fmt='%(levelname)s %(asctime)s in_oglive=%(in_oglive)s (%(threadName)s) (%(funcName)s) %(message)s')
fh1 = logging.FileHandler (filename=fname1, mode='a')
fh1.setFormatter (fmt1)
fh1.setLevel (logging.DEBUG)
fname2 = os.path.join (tempfile.gettempdir(), 'opengnsys.json.log')
fmt2 = JsonFormatter ({"timestamp": "asctime", "severity": "levelname", "threadName": "threadName", "function": "funcName", "message": "message"}, time_format='%Y-%m-%d %H:%M:%S', msec_format='')
fmt2 = JsonFormatter ({"timestamp": "asctime", "severity": "levelname", "in_oglive": "in_oglive", "threadName": "threadName", "function": "funcName", "message": "message"}, time_format='%Y-%m-%d %H:%M:%S', msec_format='')
fh2 = logging.FileHandler (filename=fname2, mode='a')
fh2.setFormatter (fmt2)
fh2.setLevel (logging.DEBUG)
@ -71,7 +73,7 @@ class LocalLogger(object):
# our loglevels are 10000 (other), 20000 (debug), ....
# logging levels are 10 (debug), 20 (info)
# OTHER = logging.NOTSET
self.logger.log(int(level / 1000 - 10), message, stacklevel=4)
self.logger.log(int(level / 1000 - 10), message, stacklevel=4, extra=self.extra)
if level < INFO or self.serviceLogger is False: # Only information and above will be on event log
return

View File

@ -35,6 +35,7 @@ import re
import time
try: import dbus ## don't fail on windows (the worker will later refuse to load anyway)
except: pass
import select
import random
import subprocess
import threading
@ -158,10 +159,10 @@ class ogLiveWorker(ServerWorker):
if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
t = self.thread_list[job_id]['thread']
pid = self.thread_list[job_id]['child_pid']
logger.debug (f'pid ({pid})')
logger.debug (f'pid/pgid/sid ({pid})')
try_times = 8
sig = signal.SIGTERM
msg = f'could not kill pid ({pid}) after ({try_times}) tries'
msg = f'could not killpg pid ({pid}) after ({try_times}) tries'
success = 2 ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
while True:
t.join (0.05)
@ -175,10 +176,10 @@ class ogLiveWorker(ServerWorker):
## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
if pid:
if os.path.exists (f'/proc/{pid}'):
logger.debug (f'sending signal ({sig}) to pid ({pid})')
logger.debug (f'sending signal ({sig}) to process group ({pid})')
## if the process finishes just here, nothing happens: the signal is sent to the void
os.kill (pid, sig)
#subprocess.run (['kill', '--signal', str(sig), str(pid)])
os.killpg (pid, sig)
#subprocess.run (['kill', '--signal', str(sig), f'-{pid}']) ## negative PID is used for sending signals to the process group
else:
msg = f'pid ({pid}) is gone, nothing to kill'
success = 1
@ -283,31 +284,35 @@ class ogLiveWorker(ServerWorker):
proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
logger.debug ('subprocess.run ("{}")'.format (' '.join (proc)))
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True)
if self.pid_q:
self.pid_q.put (p.pid)
self.pid_q.put (p.pid) ## p.pid is also a session ID and a process group ID--we'll use it later to send signals to the whole group
else:
## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
#logger.debug ('no queue--not writing any PID to it')
pass
sout = serr = ''
poll_iterations = 1
while p.poll() is None:
for l in iter (p.stdout.readline, b''):
finished = False
while True:
try:
p.wait (0.05)
finished = True
except subprocess.TimeoutExpired:
pass
ready_to_read, _, _ = select.select ([p.stdout, p.stderr], [], [], 0.2)
if p.stdout in ready_to_read:
l = p.stdout.readline()
partial = l.decode ('utf-8', 'ignore')
if self.stdout_q: self.stdout_q.put (partial)
sout += partial
for l in iter (p.stderr.readline, b''):
if p.stderr in ready_to_read:
l = p.stderr.readline()
partial = l.decode ('utf-8', 'ignore')
serr += partial
## poll quickly at first, then poll less frequently
if poll_iterations > 15: sleep_time = 1
elif poll_iterations > 10: sleep_time = 0.2
else: sleep_time = 0.1
time.sleep (sleep_time)
poll_iterations += 1
if finished: break
sout = sout.strip()
serr = serr.strip()

View File

@ -0,0 +1 @@
get-process -name ogagentuser|stop-process