refs #1461 keep track of RestaurarImagen unicast

pull/15/head
Natalia Serrano 2025-02-11 09:36:27 +01:00
parent b5b09ec132
commit f8d6706897
3 changed files with 105 additions and 22 deletions

View File

@ -1,4 +1,11 @@
ogagent (1.4.6-1) UNRELEASED; urgency=medium
ogagent (1.4.7-1) UNRELEASED; urgency=medium
* Merge server modules
* Track the progress of children
-- OpenGnsys developers <info@opengnsys.es> Tue, 04 Feb 2025 14:12:19 +0100
ogagent (1.4.6-1) stable; urgency=medium
* Point to the new menu browser

View File

@ -1 +1 @@
1.4.6
1.4.7

View File

@ -31,6 +31,7 @@
# pylint: disable=unused-wildcard-import,wildcard-import
import os
import re
import time
import random
import subprocess
@ -49,15 +50,16 @@ class ThreadWithResult (threading.Thread):
self.result = None
if self._target is not None:
## the first arg in self._args is the queue
self.q = self._args[0]
self._args = self._args[1:]
self.pid_q = self._args[0]
self.stdout_q = self._args[1]
self._args = self._args[2:]
try:
self.result = self._target (*self._args, **self._kwargs)
except Exception as e:
self.result = { 'res': 2, 'der': f'got exception: ({e})' } ## res=2 as defined in ogAdmClient.c:2048
finally:
# Avoid a refcycle if the thread is running a function with an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs, self.q
del self._target, self._args, self._kwargs, self.pid_q, self.stdout_q
class ogLiveWorker(ServerWorker):
thread_list = {}
@ -143,7 +145,6 @@ class ogLiveWorker(ServerWorker):
]
def notifier (self, job_id, result):
logger.debug (f'notifier() called, job_id ({job_id}) result ({result})')
result['job_id'] = job_id
self.REST.sendMessage ('clients/status/webhook', result)
@ -193,6 +194,57 @@ class ogLiveWorker(ServerWorker):
return { 'res':success, 'der':msg }
def _extract_progress (self, job_id, ary=[]):
is_create = is_restore = False
if job_id not in self.progress_jobs: self.progress_jobs[job_id] = { 'restore_with_cache': False }
if 'CrearImagen-' in job_id:
is_create = True
elif 'RestaurarImagen-' in job_id:
is_restore = True
else:
return None
progress = None
for i in ary:
if is_restore:
## RestaurarImagen:
## - si nos traemos la imagen a cache por unicast, se llama en última instancia a rsync:
## - ' 32,77K 0% 0,00kB/s 0:00:00'
## - ' 11,57M 0% 8,77MB/s 0:06:16'
## - ' 1,03G 30% 11,22MB/s 0:03:25'
## - ' 3,39G 100% 11,22MB/s 0:04:48 (xfr#1, to-chk=0/1)'
if m := re.search (r'\d+,\d+[KMGT]\s+(\d+)%.*[kM]B/s', i):
progress = float (m.groups()[0]) / 100
progress /= 2
self.progress_jobs[job_id]['restore_with_cache'] = True
## - si nos traemos la imagen a cache por multicast:
#elif regex:
#TODO
#progress =/ 2
#self.progress_jobs[job_id]['restore_with_cache'] = True
## - si nos traemos la imagen a cache por torrent:
#elif regex:
#TODO
#progress =/ 2
#self.progress_jobs[job_id]['restore_with_cache'] = True
## - tanto si nos la hemos traído a cache como si no, pasamos la imagen a la partición:
## - 'Current block: 720646, Total block: 1750078, Complete: 41,18%'
## - 'Elapsed: 00:00:20, Remaining: 00:00:15, Completed: 57,06%, 9,81GB/min,'
## - 'Current block: 1606658, Total block: 1750078, Complete: 100.00%'
## - 'Elapsed: 00:00:36, Remaining: 00:00:00, Completed: 100.00%, Rate: 9,56GB/min,'
elif m := re.search (r'Current block:.*Complete:\s+(\d+[,.]\d+)%', i):
progress = float (m.groups()[0].replace (',', '.')) / 100
if self.progress_jobs[job_id]['restore_with_cache']:
progress /= 2
progress += 0.5
elif is_create:
pass
if progress and progress > 1: progress = 1
return progress
def mon (self):
while True:
with self.thread_lock:
@ -201,12 +253,22 @@ class ogLiveWorker(ServerWorker):
if 'thread' not in elem: continue
logger.debug (f'considering thread ({k})')
if self.q:
if not self.q.empty():
elem['child_pid'] = self.q.get()
if self.pid_q:
if not self.pid_q.empty():
elem['child_pid'] = self.pid_q.get()
logger.debug (f'queue not empty, got pid ({elem["child_pid"]})')
else:
logger.debug (f'queue empty')
if self.stdout_q:
partial = ''
while not self.stdout_q.empty():
partial += self.stdout_q.get()
lines = partial.splitlines()
if len (lines):
p = self._extract_progress (k, lines)
if p:
m = { "job_id": k, "progress": p }
logger.debug (f'would sendMessage ({m})')
#self.REST.sendMessage ('clients/status/webhook', { "job_id": "EjecutarScript-333feb3f", "progress": 0.91337824 })
elem['thread'].join (0.05)
if not elem['thread'].is_alive():
@ -234,15 +296,21 @@ class ogLiveWorker(ServerWorker):
logger.debug ('subprocess.run ("{}", capture_output=True)'.format (proc))
p = subprocess.Popen (proc, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if self.q:
self.q.put (p.pid)
if self.pid_q:
self.pid_q.put (p.pid)
else:
## esto sucede por ejemplo cuando arranca el agente, que estamos en interfaceAdmin() en el mismo hilo, sin _long_running_job ni hilo separado
logger.debug ('no queue--not writing any PID to it')
sout = serr = ''
while p.poll() is None:
for l in iter (p.stdout.readline, b''): sout += l.decode ('utf-8', 'ignore')
for l in iter (p.stderr.readline, b''): serr += l.decode ('utf-8', 'ignore')
for l in iter (p.stdout.readline, b''):
partial = l.decode ('utf-8', 'ignore')
if self.stdout_q: self.stdout_q.put (partial)
sout += partial
for l in iter (p.stderr.readline, b''):
partial = l.decode ('utf-8', 'ignore')
serr += partial
time.sleep (1)
sout = sout.strip()
serr = serr.strip()
@ -376,7 +444,9 @@ class ogLiveWorker(ServerWorker):
self.idproautoexec = None
self.idcentro = None ## Identificador del centro
self.idaula = None ## Identificador del aula
self.q = None ## for passing PIDs around
self.pid_q = None ## for passing PIDs around
self.stdout_q = None ## for passing stdout
self.progress_jobs = {}
ogcore_scheme = os.environ.get ('OGAGENTCFG_OGCORE_SCHEME', 'https')
ogcore_ip = os.environ.get ('OGAGENTCFG_OGCORE_IP', '192.168.2.1')
@ -421,9 +491,10 @@ class ogLiveWorker(ServerWorker):
job_id = '{}-{}'.format (name, ''.join (random.choice ('0123456789abcdef') for _ in range (8)))
import queue
self.q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time
self.pid_q = queue.Queue() ## a single queue works for us because we never have more than one long_running_job at the same time
self.stdout_q = queue.Queue()
self.thread_list[job_id] = {
'thread': ThreadWithResult (target=f, args=(self.q,) + args),
'thread': ThreadWithResult (target=f, args=(self.pid_q, self.stdout_q) + args),
'starttime': time.time(),
'child_pid': None,
'running': True,
@ -433,10 +504,10 @@ class ogLiveWorker(ServerWorker):
return { 'job_id': job_id }
## para matar threads tengo lo siguiente:
## - aqui en _long_running_job meto una cola en self.q
## - (self.q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'q'")
## - aqui en _long_running_job meto una cola en self.pid_q
## - (self.pid_q fue inicializado a None al instanciar el objeto, para evitar error "objeto no tiene 'pid_q'")
## - en el thread_list también tengo un child_pid para almacenar el pid de los procesos hijos
## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.q del thread
## - al crear el ThreadWithResult le paso la cola, y luego en run() la recojo y la meto en el self.pid_q del thread
## - en interfaceAdmin() al hacer subprocess.Popen(), recojo el pid y lo escribo en la queue
## - en mon() recojo pids de la queue y los meto en thread_list 'child_pid'
## - algunas funciones llaman a interfaceAdmin más de una vez, y escriben más de un pid en la cola, y en mon() voy recogiendo y actualizando
@ -447,6 +518,11 @@ class ogLiveWorker(ServerWorker):
##
## $ curl --insecure -X POST --data '{"nfn":"EjecutarScript","scp":"cd /usr; sleep 30; pwd; ls","ids":"0"}' https://192.168.2.199:8000/ogAdmClient/EjecutarScript
## {"job_id": "EjecutarScript-333feb3f"}
## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/CloningEngine/KillJob
## $ curl --insecure -X POST --data '{"job_id":"EjecutarScript-333feb3f"}' https://192.168.2.199:8000/ogAdmClient/KillJob
##
## funciona bien, excepto que el PID no muere xD, ni siquiera haciendo subprocess.run('kill')
## para mostrar el progreso de los jobs reutilizo la misma infra
## una cola self.stdout_q
## en interfaceAdmin escribo la stdout parcial que ya venia recogiendo
## mon() lo recoge y le hace un POST a ogcore