Improve task management, cleanup when there are too many
parent
09baf6d1e8
commit
1f2095ce1a
|
@ -77,7 +77,7 @@ REPOSITORIES_BASE_PATH = "/opt/opengnsys/ogrepository/oggit/git/oggit/"
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
tasks = {}
|
tasks = {}
|
||||||
|
tasks_max = 1024
|
||||||
|
|
||||||
# Create an instance of the Flask class
|
# Create an instance of the Flask class
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
|
@ -93,6 +93,24 @@ executor = Executor(app)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def add_task(future):
|
||||||
|
task_id = uuid.uuid4().hex
|
||||||
|
task_data = {
|
||||||
|
"future" : future,
|
||||||
|
"start_time" : time.time()
|
||||||
|
}
|
||||||
|
|
||||||
|
while len(tasks) >= tasks_max:
|
||||||
|
oldest_task_id = min(tasks, key=lambda k: tasks[k]['start_time'])
|
||||||
|
task = tasks[task_id]["future"]
|
||||||
|
if task.running():
|
||||||
|
log.error("Cancelling still running task %s, maximum task limit of %i reached", task_id, tasks_max)
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
del tasks[oldest_task_id]
|
||||||
|
|
||||||
|
tasks[task_id] = task_data
|
||||||
|
return task_id
|
||||||
|
|
||||||
def do_repo_backup(repo, params):
|
def do_repo_backup(repo, params):
|
||||||
"""
|
"""
|
||||||
|
@ -325,15 +343,16 @@ class GitRepoSync(Resource):
|
||||||
data = request.json
|
data = request.json
|
||||||
|
|
||||||
if data is None:
|
if data is None:
|
||||||
|
log.error("Can't create repository, JSON post data missing")
|
||||||
return {"error" : "Parameters missing"}, 400
|
return {"error" : "Parameters missing"}, 400
|
||||||
|
|
||||||
if not "remote_repository" in data:
|
if not "remote_repository" in data:
|
||||||
|
log.error("Can't create repository, parameter 'remote_repository' missing")
|
||||||
return {"error" : "Parameter 'remote_repository' missing"}, 400
|
return {"error" : "Parameter 'remote_repository' missing"}, 400
|
||||||
|
|
||||||
|
|
||||||
future = executor.submit(do_repo_sync, repo, data)
|
future = executor.submit(do_repo_sync, repo, data)
|
||||||
task_id = str(uuid.uuid4())
|
task_id = add_task(future)
|
||||||
tasks[task_id] = future
|
|
||||||
|
|
||||||
log.info("Starting synchronization of repository %s, task %s", repo, task_id, extra = {"repository" : repo, "task_id" : task_id})
|
log.info("Starting synchronization of repository %s, task %s", repo, task_id, extra = {"repository" : repo, "task_id" : task_id})
|
||||||
return {"status": "started", "task_id" : task_id}, 200
|
return {"status": "started", "task_id" : task_id}, 200
|
||||||
|
@ -381,8 +400,7 @@ class GitRepoBackup(Resource):
|
||||||
|
|
||||||
|
|
||||||
future = executor.submit(do_repo_backup, repo, data)
|
future = executor.submit(do_repo_backup, repo, data)
|
||||||
task_id = str(uuid.uuid4())
|
task_id = add_task(future)
|
||||||
tasks[task_id] = future
|
|
||||||
|
|
||||||
log.info("Starting backup of repository %s, task %s", repo, task_id, extra = {"repository" : repo, "task_id" : task_id})
|
log.info("Starting backup of repository %s, task %s", repo, task_id, extra = {"repository" : repo, "task_id" : task_id})
|
||||||
return {"status": "started", "task_id" : task_id}, 200
|
return {"status": "started", "task_id" : task_id}, 200
|
||||||
|
@ -411,8 +429,7 @@ class GitRepoCompact(Resource):
|
||||||
return {"error": "Repository not found"}, 404
|
return {"error": "Repository not found"}, 404
|
||||||
|
|
||||||
future = executor.submit(do_repo_gc, repo)
|
future = executor.submit(do_repo_gc, repo)
|
||||||
task_id = str(uuid.uuid4())
|
task_id = add_task(future)
|
||||||
tasks[task_id] = future
|
|
||||||
|
|
||||||
log.info("Starting compaction of repository %s, task %s", repo, task_id, extra = {"repository" : repo, "task_id" : task_id})
|
log.info("Starting compaction of repository %s, task %s", repo, task_id, extra = {"repository" : repo, "task_id" : task_id})
|
||||||
return {"status": "started", "task_id" : task_id}, 200
|
return {"status": "started", "task_id" : task_id}, 200
|
||||||
|
@ -437,7 +454,7 @@ class GitTaskStatus(Resource):
|
||||||
log.error("Task %s was not found", task_id, extra = {"task_id" : task_id})
|
log.error("Task %s was not found", task_id, extra = {"task_id" : task_id})
|
||||||
return {"error": "Task not found"}, 404
|
return {"error": "Task not found"}, 404
|
||||||
|
|
||||||
future = tasks[task_id]
|
future = tasks[task_id]["future"]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if future.done():
|
if future.done():
|
||||||
|
|
Loading…
Reference in New Issue