Compare commits

..

1 Commits

Author SHA1 Message Date
Vadim Trochinsky 60913ff51a refs #2506 modificarimagen con branch y parametros 2025-07-23 08:07:47 +02:00
15 changed files with 64 additions and 200 deletions

View File

@ -6,53 +6,6 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [7.3.2] - 2025-08-04
### Fixed
- Fix syntax
## [7.3.1] - 2025-08-04
### Fixed
- On user logout, write the user to the log
- On EjecutarScript with client=true, return a job ID
## [7.3.0] - 2025-07-31
### Fixed
- Wait for zombies
### Changed
- Change "jobid" for "job_id" for consistency
## [7.2.2] - 2025-07-30
### Added
- Add missing file stop-agent.ps1
## [7.2.1] - 2025-07-29
### Changed
- Run process in a new POSIX session and group, send termination signals to the whole process group
## [7.2.0] - 2025-07-28
### Added
- Log whether we are in ogLive or not
## [7.1.0] - 2025-07-24
### Changed
- Don't pass the "tag" parameter to CrearImagenGit
## [7.0.0] - 2025-07-18
### Changed

View File

@ -1,47 +1,3 @@
ogagent (7.3.2-1) stable; urgency=medium
* Fix syntax
-- OpenGnsys developers <info@opengnsys.es> Mon, 04 Aug 2025 14:36:47 +0200
ogagent (7.3.1-1) stable; urgency=medium
* On user logout, write the user to the log
* On EjecutarScript with client=true, return a job ID
-- OpenGnsys developers <info@opengnsys.es> Mon, 04 Aug 2025 14:22:52 +0200
ogagent (7.3.0-1) stable; urgency=medium
* Wait for zombies
* Change "jobid" for "job_id" for consistency
-- OpenGnsys developers <info@opengnsys.es> Thu, 31 Jul 2025 10:35:16 +0200
ogagent (7.2.2-1) stable; urgency=medium
* Add missing file stop-agent.ps1
-- OpenGnsys developers <info@opengnsys.es> Wed, 30 Jul 2025 10:20:21 +0200
ogagent (7.2.1-1) stable; urgency=medium
* Run process in a new POSIX session and group, send termination signals to the whole process group
-- OpenGnsys developers <info@opengnsys.es> Tue, 29 Jul 2025 12:52:08 +0200
ogagent (7.2.0-1) stable; urgency=medium
* Log whether we are in ogLive or not
-- OpenGnsys developers <info@opengnsys.es> Mon, 28 Jul 2025 13:55:28 +0200
ogagent (7.1.0-1) stable; urgency=medium
* Don't pass the "tag" parameter to CrearImagenGit
-- OpenGnsys developers <info@opengnsys.es> Thu, 24 Jul 2025 15:31:59 +0200
ogagent (7.0.0-1) stable; urgency=medium
* Run the new extension-less scripts from the cloning engine

View File

@ -1 +1 @@
7.3.2
7.0.0

View File

@ -187,6 +187,7 @@ class REST(object):
if self.verify_tls:
r = requests.get(url, cert=(self.crt_file, self.key_file), verify=self.ca_file, timeout=TIMEOUT)
else:
logger.warning ('using insecure TLS for GET')
r = requests.get(url, verify=False, timeout=TIMEOUT)
else:
r = requests.get(url, timeout=TIMEOUT)
@ -199,6 +200,7 @@ class REST(object):
if self.verify_tls:
r = requests.post(url, data=data, headers={'content-type': 'application/json'}, cert=(self.crt_file, self.key_file), verify=self.ca_file, timeout=TIMEOUT)
else:
logger.warning ('using insecure TLS for POST')
r = requests.post(url, data=data, headers={'content-type': 'application/json'}, verify=False, timeout=TIMEOUT)
else:
r = requests.post(url, data=data, headers={'content-type': 'application/json'}, timeout=TIMEOUT)

View File

@ -110,11 +110,6 @@ class ClientProcessor(threading.Thread):
logger.debug('Got Client message {}={}'.format(msg, REV_DICT.get(msg)))
if self.parent.clientMessageProcessor is not None:
self.parent.clientMessageProcessor(msg, data)
if msg == REQ_LOGIN:
if b',' in data:
self.user = data.split (b',')[0]
else:
self.user = data
def run(self):
self.running = True
@ -202,14 +197,8 @@ class ClientProcessor(threading.Thread):
logger.error('Invalid message in queue: {}'.format(e))
logger.debug('Client processor stopped')
if os.path.exists ('/windows/temp'):
fd = open ('/windows/temp/ogagentuser_died', 'wb')
fd.write (self.user)
fd.close()
else:
fd = open ('/tmp/ogagentuser_died', 'wb')
fd.write (self.user)
fd.close()
if os.path.exists ('/windows/temp'): open ('/windows/temp/ogagentuser_died', 'w').close()
else: open ( '/tmp/ogagentuser_died', 'w').close()
try:
self.clientSocket.close()
except Exception:

View File

@ -23,32 +23,32 @@ class JobMgr():
logger.debug ('args "{}"'.format (args))
now = datetime.now (tz=timezone.utc)
ts = now.strftime ('%Y-%m-%d %H:%M:%S.%f%z') ## '%s' doesn't work on windows
job_id = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
jobid = hashlib.sha256 (now.isoformat().encode('UTF-8') + script.encode ('UTF-8')).hexdigest()[0:12]
p = subprocess.Popen (args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.jobs[job_id] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[job_id]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[job_id],))
self.jobs[job_id]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[job_id],))
self.jobs[job_id]['t1'].start()
self.jobs[job_id]['t2'].start()
self.jobs[jobid] = { 'p': p, 'pid': p.pid, 'starttime': ts, 'script': script, 'client': is_client, 'status': 'running', 'stdout': '', 'stderr': '' }
self.jobs[jobid]['t1'] = threading.Thread (target=job_readstdout, args=(self.jobs[jobid],))
self.jobs[jobid]['t2'] = threading.Thread (target=job_readstderr, args=(self.jobs[jobid],))
self.jobs[jobid]['t1'].start()
self.jobs[jobid]['t2'].start()
logger.debug ('jobs "{}"'.format (self.jobs))
return job_id
return jobid
def prepare_jobs(self):
## can't return self.jobs because the Popen object at self.jobs[id]['p'] is not serializable. So, need to create a new dict to return
st = []
for job_id in self.jobs:
j = self.jobs[job_id]
for jobid in self.jobs:
j = self.jobs[jobid]
entry = dict ((k, j[k]) for k in ['pid', 'starttime', 'script', 'client', 'status', 'stdout', 'stderr'])
entry['job_id'] = job_id
entry['jobid'] = jobid
if j['p'].poll() is not None: ## process finished
entry['rc'] = j['p'].returncode
entry['status'] = 'finished'
st.append (entry)
return st
def terminate_job(self, job_id):
if job_id not in self.jobs: return {}
p = self.jobs[job_id]['p']
def terminate_job(self, jobid):
if jobid not in self.jobs: return {}
p = self.jobs[jobid]['p']
p.terminate()
time.sleep (1)
if p.poll() is not None:

View File

@ -74,12 +74,10 @@ class OGAgentSvc(Daemon, CommonService):
while self.isAlive:
client_died=False
if os.path.exists ('/tmp/ogagentuser_died'):
with open ('/tmp/ogagentuser_died', 'rb') as fd:
u = fd.read()
os.unlink ('/tmp/ogagentuser_died')
client_died=True
if client_died:
self.notifyLogout (u)
self.notifyLogout (b'')
# In milliseconds, will break
self.doWait(1000)

View File

@ -42,8 +42,6 @@ OTHER, DEBUG, INFO, WARN, ERROR, FATAL = (10000 * (x + 1) for x in range(6))
class LocalLogger(object):
def __init__(self):
self.extra = { 'in_oglive': None }
# tempdir is different for "user application" and "service"
# service wil get c:\windows\temp, while user will get c:\users\XXX\temp
# Try to open logger at /var/log path
@ -53,13 +51,13 @@ class LocalLogger(object):
for logDir in ('/var/log', os.path.expanduser('~'), tempfile.gettempdir()):
try:
fname1 = os.path.join (logDir, 'opengnsys.log')
fmt1 = logging.Formatter (fmt='%(levelname)s %(asctime)s in_oglive=%(in_oglive)s (%(threadName)s) (%(funcName)s) %(message)s')
fmt1 = logging.Formatter (fmt='%(levelname)s %(asctime)s (%(threadName)s) (%(funcName)s) %(message)s')
fh1 = logging.FileHandler (filename=fname1, mode='a')
fh1.setFormatter (fmt1)
fh1.setLevel (logging.DEBUG)
fname2 = os.path.join (logDir, 'opengnsys.json.log')
fmt2 = JsonFormatter ({"timestamp": "asctime", "severity": "levelname", "in_oglive": "in_oglive", "threadName": "threadName", "function": "funcName", "message": "message"}, time_format='%Y-%m-%d %H:%M:%S', msec_format='')
fmt2 = JsonFormatter ({"timestamp": "asctime", "severity": "levelname", "threadName": "threadName", "function": "funcName", "message": "message"}, time_format='%Y-%m-%d %H:%M:%S', msec_format='')
fh2 = logging.FileHandler (filename=fname2, mode='a')
fh2.setFormatter (fmt2)
fh2.setLevel (logging.DEBUG)
@ -79,14 +77,11 @@ class LocalLogger(object):
self.logger = None
def log(self, level, message):
if self.extra['in_oglive'] is None:
self.extra['in_oglive'] = os.path.exists ('/scripts/functions')
# Debug messages are logged to a file
# our loglevels are 10000 (other), 20000 (debug), ....
# logging levels are 10 (debug), 20 (info)
# OTHER = logging.NOTSET
self.logger.log(int(level / 1000) - 10, message, stacklevel=4, extra=self.extra)
self.logger.log(int(level / 1000) - 10, message, stacklevel=4)
def isWindows(self):
return False

View File

@ -50,13 +50,13 @@ class OpenGnSysWorker(ClientWorker):
def process_script(self, json_params):
script = json_params['code']
logger.debug('Processing message: script({})'.format(script))
job_id = self.jobmgr.launch_job (script, True)
self.sendServerMessage('script_launched', {'op': 'launched', 'job_id': job_id})
self.jobmgr.launch_job (script, True)
#self.sendServerMessage('script', {'op', 'launched'})
def process_terminatescript(self, json_params):
job_id = json_params['job_id']
logger.debug('Processing terminatescript request, job_id "{}"'.format (job_id))
self.jobmgr.terminate_job (job_id)
jobid = json_params['jobid']
logger.debug('Processing terminatescript request, jobid "{}"'.format (jobid))
self.jobmgr.terminate_job (jobid)
def process_preparescripts(self, json_params):
logger.debug('Processing preparescripts request')

View File

@ -369,46 +369,24 @@ class OpenGnSysWorker(ServerWorker):
logger.debug('received script "{}"'.format(script))
if post_params.get('client', 'false') == 'false':
job_id = self.jobmgr.launch_job (script, False)
return {'op': 'launched', 'job_id': job_id}
jobid = self.jobmgr.launch_job (script, False)
return {'op': 'launched', 'jobid': jobid}
else: ## post_params.get('client') is not 'false'
## send script as-is
self.sendClientMessage('script', {'code': script})
## wait for job_id generated at the client
job_id = None
iters = 0
while True:
time.sleep (0.2)
if os.path.exists ('/tmp/EjecutarScript-jobid'):
with open ('/tmp/EjecutarScript-jobid', 'r') as fd:
job_id = fd.read()
break
iters += 1
if iters >= 10: break
try: os.unlink ('/tmp/EjecutarScript-jobid')
except: pass
if job_id is None: return {'op': 'launched'}
else: return {'op': 'launched', 'job_id': job_id}
def process_client_script_launched(self, data):
fd = open ('/tmp/EjecutarScript-jobid', 'w')
fd.write (data['job_id'])
fd.close()
return True
#return {'op': 'launched', 'jobid': jobid} ## TODO obtain jobid generated at the client (can it be done?)
return {'op': 'launched'}
@execution_level('full')
@check_secret
def process_terminatescript(self, path, get_params, post_params, server):
job_id = post_params.get('job_id', None)
logger.debug('Processing terminate_script request, job_id "{}"'.format (job_id))
if job_id is None:
jobid = post_params.get('jobid', None)
logger.debug('Processing terminate_script request, jobid "{}"'.format (jobid))
if jobid is None:
return {}
self.sendClientMessage('terminatescript', {'job_id': job_id})
self.jobmgr.terminate_job (job_id)
self.sendClientMessage('terminatescript', {'jobid': jobid})
self.jobmgr.terminate_job (jobid)
return {}
@execution_level('full')

View File

@ -372,6 +372,7 @@ class ogAdmClientWorker (ogLiveWorker):
ipr = post_params['ipr'] ## Ip del repositorio
nfn = post_params['nfn']
ids = post_params['ids']
tag = post_params['tag'] ## Tag a crear en git una vez hecho el commit
self.muestraMensaje (7)
@ -385,7 +386,7 @@ class ogAdmClientWorker (ogLiveWorker):
self.muestraMensaje (2)
inv_sft = res['contents']
try:
self.interfaceAdmin (nfn, [dsk, par, nci, ipr])
self.interfaceAdmin (nfn, [dsk, par, nci, ipr, tag])
self.muestraMensaje (9)
herror = 0
except:
@ -410,17 +411,19 @@ class ogAdmClientWorker (ogLiveWorker):
def do_ModificarImagenGit (self, post_params):
for k in ['dsk', 'par', 'nci', 'ipr', 'nfn', 'ids', 'msg']:
for k in ['dsk', 'par', 'ipr', 'nfn', 'ids', 'branch', 'options', 'msg']:
if k not in post_params:
logger.error (f'required parameter ({k}) not in POST params')
return {}
dsk = post_params['dsk'] ## Disco
par = post_params['par'] ## Número de partición
nci = post_params['nci'] ## Nombre canónico de la imagen
ipr = post_params['ipr'] ## Ip del repositorio
nfn = post_params['nfn']
ids = post_params['ids']
brn = post_params['branch'] # Rama nueva a crear
opt = post_params['options'] # Rama nueva a crear
msg = post_params['msg'] ## Mensaje de commit
self.muestraMensaje (7)
@ -435,7 +438,7 @@ class ogAdmClientWorker (ogLiveWorker):
self.muestraMensaje (2)
inv_sft = res['contents']
try:
self.interfaceAdmin (nfn, [dsk, par, nci, msg])
self.interfaceAdmin (nfn, [dsk, par, ipr, brn, opt, msg])
self.muestraMensaje (9)
herror = 0
except:

View File

@ -108,12 +108,10 @@ class OGAgentSvc(win32serviceutil.ServiceFramework, CommonService):
while self.isAlive:
client_died=False
if os.path.exists ('/windows/temp/ogagentuser_died'):
with open ('/windows/temp/ogagentuser_died', 'rb') as fd:
u = fd.read()
os.unlink ('/windows/temp/ogagentuser_died')
client_died=True
if client_died:
self.notifyLogout (u)
self.notifyLogout (b'')
# Pumps & processes any waiting messages
pythoncom.PumpWaitingMessages()

View File

@ -44,19 +44,17 @@ OTHER, DEBUG, INFO, WARN, ERROR, FATAL = (10000 * (x + 1) for x in range(6))
class LocalLogger(object):
def __init__(self):
self.extra = { 'in_oglive': False }
# tempdir is different for "user application" and "service"
# service wil get c:\windows\temp, while user will get c:\users\XXX\appdata\local\temp
fname1 = os.path.join (tempfile.gettempdir(), 'opengnsys.log')
fmt1 = logging.Formatter (fmt='%(levelname)s %(asctime)s in_oglive=%(in_oglive)s (%(threadName)s) (%(funcName)s) %(message)s')
fmt1 = logging.Formatter (fmt='%(levelname)s %(asctime)s (%(threadName)s) (%(funcName)s) %(message)s')
fh1 = logging.FileHandler (filename=fname1, mode='a')
fh1.setFormatter (fmt1)
fh1.setLevel (logging.DEBUG)
fname2 = os.path.join (tempfile.gettempdir(), 'opengnsys.json.log')
fmt2 = JsonFormatter ({"timestamp": "asctime", "severity": "levelname", "in_oglive": "in_oglive", "threadName": "threadName", "function": "funcName", "message": "message"}, time_format='%Y-%m-%d %H:%M:%S', msec_format='')
fmt2 = JsonFormatter ({"timestamp": "asctime", "severity": "levelname", "threadName": "threadName", "function": "funcName", "message": "message"}, time_format='%Y-%m-%d %H:%M:%S', msec_format='')
fh2 = logging.FileHandler (filename=fname2, mode='a')
fh2.setFormatter (fmt2)
fh2.setLevel (logging.DEBUG)
@ -73,7 +71,7 @@ class LocalLogger(object):
# our loglevels are 10000 (other), 20000 (debug), ....
# logging levels are 10 (debug), 20 (info)
# OTHER = logging.NOTSET
self.logger.log(int(level / 1000 - 10), message, stacklevel=4, extra=self.extra)
self.logger.log(int(level / 1000 - 10), message, stacklevel=4)
if level < INFO or self.serviceLogger is False: # Only information and above will be on event log
return

View File

@ -35,7 +35,6 @@ import re
import time
try: import dbus ## don't fail on windows (the worker will later refuse to load anyway)
except: pass
import select
import random
import subprocess
import threading
@ -159,10 +158,10 @@ class ogLiveWorker(ServerWorker):
if 'thread' not in self.thread_list[job_id]: return { 'res': 2, 'der': 'Job is not running' }
t = self.thread_list[job_id]['thread']
pid = self.thread_list[job_id]['child_pid']
logger.debug (f'pid/pgid/sid ({pid})')
logger.debug (f'pid ({pid})')
try_times = 8
sig = signal.SIGTERM
msg = f'could not killpg pid ({pid}) after ({try_times}) tries'
msg = f'could not kill pid ({pid}) after ({try_times}) tries'
success = 2 ## mimic cmd['res'] in respuestaEjecucionComando(): "1" means success, "2" means failed
while True:
t.join (0.05)
@ -176,10 +175,10 @@ class ogLiveWorker(ServerWorker):
## this is fine in the first iteration of the loop, before we send any signals. In the rest of iterations, after some signals were sent, msg should be 'job terminated' instead.
if pid:
if os.path.exists (f'/proc/{pid}'):
logger.debug (f'sending signal ({sig}) to process group ({pid})')
logger.debug (f'sending signal ({sig}) to pid ({pid})')
## if the process finishes just here, nothing happens: the signal is sent to the void
os.killpg (pid, sig)
#subprocess.run (['kill', '--signal', str(sig), f'-{pid}']) ## negative PID is used for sending signals to the process group
os.kill (pid, sig)
#subprocess.run (['kill', '--signal', str(sig), str(pid)])
else:
msg = f'pid ({pid}) is gone, nothing to kill'
success = 1
@ -284,35 +283,31 @@ class ogLiveWorker(ServerWorker):
proc = ['bash', '-c', '{} {}'.format (devel_bash_prefix, exe)]
logger.debug ('subprocess.run ("{}")'.format (' '.join (proc)))
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True)
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if self.pid_q:
self.pid_q.put (p.pid) ## p.pid is also a session ID and a process group ID--we'll use it later to send signals to the whole group
self.pid_q.put (p.pid)
else:
## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
#logger.debug ('no queue--not writing any PID to it')
pass
sout = serr = ''
finished = False
while True:
try:
p.wait (0.05)
finished = True
except subprocess.TimeoutExpired:
pass
ready_to_read, _, _ = select.select ([p.stdout, p.stderr], [], [], 0.2)
if p.stdout in ready_to_read:
l = p.stdout.readline()
poll_iterations = 1
while p.poll() is None:
for l in iter (p.stdout.readline, b''):
partial = l.decode ('utf-8', 'ignore')
if self.stdout_q: self.stdout_q.put (partial)
sout += partial
if p.stderr in ready_to_read:
l = p.stderr.readline()
for l in iter (p.stderr.readline, b''):
partial = l.decode ('utf-8', 'ignore')
serr += partial
if finished: break
## poll quickly at first, then poll less frequently
if poll_iterations > 15: sleep_time = 1
elif poll_iterations > 10: sleep_time = 0.2
else: sleep_time = 0.1
time.sleep (sleep_time)
poll_iterations += 1
sout = sout.strip()
serr = serr.strip()

View File

@ -1 +0,0 @@
get-process -name ogagentuser|stop-process