Initial version of the API server

ogrepository-integration
Vadim vtroshchinskiy 2025-03-25 09:45:19 +01:00
parent d4ce9c3ee3
commit e67b08cea5
3 changed files with 745 additions and 0 deletions

View File

@ -0,0 +1,139 @@
#!/usr/bin/env python3
import os
import sys
sys.path.insert(0, "/usr/share/opengnsys-modules/python3/dist-packages")
import importlib
import logging
import uuid
import argparse
from flask import Flask, request
from flask_executor import Executor
from flask_restx import Api
from werkzeug.exceptions import HTTPException
from systemd.journal import JournalHandler
parser = argparse.ArgumentParser(
prog="api_server.py",
description="OpenGnsys Repository API Server",
)
debug_enabled = False
listen_host = '0.0.0.0'
parser.add_argument('--debug', action='store_true', help="Enable debug output")
parser.add_argument('--listen', metavar="HOST", help="Listen address")
parser.add_argument("-v", "--verbose", action="store_true", help = "Verbose console output")
args = parser.parse_args()
log = logging.getLogger('api_server')
log.addHandler(JournalHandler())
if args.verbose:
log.addHandler(logging.StreamHandler(stream=sys.stderr))
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)
if args.listen:
listen_host = args.listen
if args.debug:
debug_enabled = True
api_base_dir = os.path.dirname(os.path.realpath(__file__))
blueprints_dir = os.path.join(api_base_dir, 'blueprints')
installer_dir = os.path.join(api_base_dir, '../installer')
sys.path.insert(0, installer_dir)
# Create an instance of the Flask class
app = Flask(__name__)
api = Api(app,
version='0.50',
title = "OpenGnsys Git API",
description = "API for managing disk images stored in Git",
doc = "/swagger/")
executor = Executor(app)
log.info("Loading blueprints from %s", blueprints_dir)
sys.path.insert(0, blueprints_dir)
for filename in os.listdir(blueprints_dir):
if filename.endswith('.py'):
log.info("Loading %s/%s", blueprints_dir, filename)
module_name = filename.replace(".py", "")
log.info("Importing %s", module_name)
importlib.invalidate_caches()
module = importlib.import_module(module_name)
log.debug("Returned: %s", module)
app.register_blueprint(module.blueprint)
@app.errorhandler(HTTPException)
def handle_exception(e):
"""Return JSON for HTTP errors.
We create and log an error UUID for each error, and use journald's additional fields for easier searching.
"""
# start with the correct headers and status code from the error
response = e.get_response()
errid = uuid.uuid4().hex
if debug_enabled:
response = {
"errcode": e.code,
"errname": e.name,
"description": e.description,
}
else:
response = {
"errcode" : 500,
"errname" : "Internal error",
"description": f"Please see the log for error {errid}",
"error_id" : errid
}
log.error("Error ID %s: code %i, name %s, description %s", errid, e.code, e.name, e.description, extra = { "error_id" : errid, "errcode" : e.code, "errname" : e.name, "description" : e.description })
return response, 500
@app.after_request
def after_request(response):
log.info("Request from %s: %s %s %s %s", request.remote_addr, request.method, request.scheme, request.full_path, response.status,
extra = {"remote_addr" : request.remote_addr, "method" : request.method, "scheme" : request.scheme, "full_path" : request.full_path, "status" : response.status})
if debug_enabled:
log.debug("Response: %s", response.data, extra = {"response" : response.data})
return response
# Run the Flask app
if __name__ == '__main__':
print(f"Map: {app.url_map}")
app.run(debug=debug_enabled, host=listen_host)

View File

@ -0,0 +1,573 @@
#!/usr/bin/env python3
"""
This module provides a Flask-based API for managing Git repositories in the OpenGnsys system.
It includes endpoints for creating, deleting, synchronizing, backing up, and performing garbage
collection on Git repositories. The API also provides endpoints for retrieving repository
information such as the list of repositories and branches, as well as checking the status of
asynchronous tasks.
Classes:
None
Functions:
do_repo_backup(repo, params)
do_repo_sync(repo, params)
do_repo_gc(repo)
home()
get_repositories()
create_repo(repo)
sync_repo(repo)
backup_repository(repo)
gc_repo(repo)
tasks_status(task_id)
delete_repo(repo)
get_repository_branches(repo)
health_check()
Constants:
REPOSITORIES_BASE_PATH (str): The base path where Git repositories are stored.
Global Variables:
app (Flask): The Flask application instance.
executor (Executor): The Flask-Executor instance for managing asynchronous tasks.
tasks (dict): A dictionary to store the status of asynchronous tasks.
"""
# pylint: disable=locally-disabled, line-too-long
import os.path
import os
import shutil
import uuid
import time
import logging
import traceback
import git
from opengnsys_git_installer import OpengnsysGitInstaller
from flask import Blueprint, request
from flask_restx import Resource, Api
import paramiko
from systemd.journal import JournalHandler
debug_enabled = False
log = logging.getLogger('gitapi')
log.addHandler(JournalHandler())
log.setLevel(logging.INFO)
log.info("Started")
REPOSITORIES_BASE_PATH = "/opt/opengnsys/ogrepository/oggit/git/oggit/"
start_time = time.time()
tasks = {}
tasks_max = 1024
blueprint = Blueprint('git_api', __name__, template_folder='templates', url_prefix = '/oggit/v1')
api = Api(blueprint)
git_ns = api
def add_task(future):
task_id = uuid.uuid4().hex
task_data = {
"future" : future,
"start_time" : time.time()
}
while len(tasks) >= tasks_max:
oldest_task_id = min(tasks, key=lambda k: tasks[k]['start_time'])
task = tasks[task_id]["future"]
if task.running():
log.error("Cancelling still running task %s, maximum task limit of %i reached", task_id, tasks_max)
task.cancel()
del tasks[oldest_task_id]
tasks[task_id] = task_data
return task_id
def do_repo_backup(repo, params):
"""
Creates a backup of the specified Git repository and uploads it to a remote server via SFTP.
Args:
repo (str): The name of the repository to back up.
params (dict): A dictionary containing the following keys:
- ssh_server (str): The SSH server address.
- ssh_port (int): The SSH server port.
- ssh_user (str): The SSH username.
- filename (str): The remote filename where the backup will be stored.
Returns:
bool: True if the backup was successful.
"""
git_repo_path = f"{REPOSITORIES_BASE_PATH}/{repo}.git"
git_repo = git.Repo(git_repo_path)
git_repo.git.config('--global', '--add', 'safe.directory', git_repo_path)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(params["ssh_server"], params["ssh_port"], params["ssh_user"])
sftp = ssh.open_sftp()
with sftp.file(params["filename"], mode='wb+') as remote_file:
git_repo.archive(remote_file, format="tar.gz")
return True
def do_repo_sync(repo, params):
"""
Synchronizes a local Git repository with a remote repository.
Args:
repo (str): The name of the local repository to synchronize.
params (dict): A dictionary containing the remote repository URL with the key "remote_repository".
Returns:
list: A list of dictionaries, each containing:
- "local_ref" (str): The name of the local reference.
- "remote_ref" (str): The name of the remote reference.
- "summary" (str): A summary of the push operation for the reference.
"""
git_repo_path = f"{REPOSITORIES_BASE_PATH}/{repo}.git"
git_repo = git.Repo(git_repo_path)
git_repo.git.config('--global', '--add', 'safe.directory', git_repo_path)
# Recreate the remote every time, it might change
if "backup" in git_repo.remotes:
git_repo.delete_remote("backup")
backup_repo = git_repo.create_remote("backup", params["remote_repository"])
pushed_references = backup_repo.push("*:*")
results = []
# This gets returned to the API
for ref in pushed_references:
results = results + [ {"local_ref" : ref.local_ref.name, "remote_ref" : ref.remote_ref.name, "summary" : ref.summary }]
return results
def do_repo_gc(repo):
"""
Perform garbage collection on the specified Git repository.
Args:
repo (str): The name of the repository to perform garbage collection on.
Returns:
bool: True if the garbage collection command was executed successfully.
"""
git_repo_path = os.path.join(REPOSITORIES_BASE_PATH, repo + ".git")
git_repo = git.Repo(git_repo_path)
git_repo.git.config('--global', '--add', 'safe.directory', git_repo_path)
git_repo.git.gc()
# Define a route for the root URL
@api.route('/')
class GitLib(Resource):
#@api.doc('home')
def get(self):
"""
Home route that returns a JSON response with a welcome message for the OpenGnsys Git API.
Returns:
Response: A Flask JSON response containing a welcome message.
"""
log.info("Root URL accessed")
return {
"message": "OpenGnsys Git API"
}
@git_ns.route('/repositories')
class GitRepositories(Resource):
def get(self):
"""
Retrieve a list of Git repositories.
This endpoint scans the OpenGnsys image path for directories that
appear to be Git repositories (i.e., they contain a "HEAD" file).
It returns a JSON response containing the names of these repositories.
Returns:
Response: A JSON response with a list of repository names or an
error message if the repository storage is not found.
- 200 OK: When the repositories are successfully retrieved.
- 500 Internal Server Error: When the repository storage is not found.
Example JSON response:
{
"repositories": ["repo1", "repo2"]
}
"""
if not os.path.isdir(REPOSITORIES_BASE_PATH):
log.error("Can't list repositories. Repository storage at %s not found", REPOSITORIES_BASE_PATH, extra = {"path" : REPOSITORIES_BASE_PATH})
return {"error": "Repository storage not found, git functionality may not be installed."}, 500
repos = []
for entry in os.scandir(REPOSITORIES_BASE_PATH):
if entry.is_dir(follow_symlinks=False) and os.path.isfile(os.path.join(entry.path, "HEAD")):
name = entry.name
if name.endswith(".git"):
name = name[:-4]
repos = repos + [name]
log.info("Returning %i repositories", len(repos))
return {
"repositories": repos
}
def post(self):
"""
Create a new Git repository.
This endpoint creates a new Git repository with the specified name.
If the repository already exists, it returns a status message indicating so.
Args:
repo (str): The name of the repository to be created.
Returns:
Response: A JSON response with a status message and HTTP status code.
- 200: If the repository already exists.
- 201: If the repository is successfully created.
"""
data = request.json
if data is None:
log.error("Can't create repository, JSON post data missing")
return {"error" : "Parameters missing"}, 400
repo = data["name"]
repo_path = os.path.join(REPOSITORIES_BASE_PATH, repo + ".git")
if os.path.isdir(repo_path):
log.error("Can't create repository %s, already exists at %s", repo, repo_path, extra = {"repository" : repo, "path" : repo_path})
return {"status": "Repository already exists"}, 200
installer = OpengnsysGitInstaller()
installer.add_forgejo_repo(repo)
#installer.init_git_repo(repo + ".git")
log.info("Repository %s created", repo, extra = {"repository" : repo})
return {"status": "Repository created"}, 201
@git_ns.route('/repositories/<repo>/sync')
class GitRepoSync(Resource):
def post(self, repo):
"""
Synchronize a repository with a remote repository.
This endpoint triggers the synchronization process for a specified repository.
It expects a JSON payload with the remote repository details.
Args:
repo (str): The name of the repository to be synchronized.
Returns:
Response: A JSON response indicating the status of the synchronization process.
- 200: If the synchronization process has started successfully.
- 400: If the request payload is missing or invalid.
- 404: If the specified repository is not found.
"""
repo_path = os.path.join(REPOSITORIES_BASE_PATH, repo + ".git")
if not os.path.isdir(repo_path):
log.error("Can't sync repository %s, not found. Looked in %s", repo, repo_path, extra = {"repository" : repo, "path" : repo_path })
return {"error": "Repository not found"}, 404
data = request.json
if data is None:
log.error("Can't create repository, JSON post data missing")
return {"error" : "Parameters missing"}, 400
if not "remote_repository" in data:
log.error("Can't create repository, parameter 'remote_repository' missing")
return {"error" : "Parameter 'remote_repository' missing"}, 400
future = executor.submit(do_repo_sync, repo, data)
task_id = add_task(future)
log.info("Starting synchronization of repository %s, task %s", repo, task_id, extra = {"repository" : repo, "task_id" : task_id})
return {"status": "started", "task_id" : task_id}, 200
@git_ns.route('/repositories/<repo>/backup')
class GitRepoBackup(Resource):
def backup_repository(self, repo):
"""
Backup a specified repository.
Endpoint: POST /repositories/<repo>/backup
Args:
repo (str): The name of the repository to back up.
Request Body (JSON):
ssh_port (int, optional): The SSH port to use for the backup. Defaults to 22.
Returns:
Response: A JSON response indicating the status of the backup operation.
- If the repository is not found, returns a 404 error with a message.
- If the request body is missing, returns a 400 error with a message.
- If the backup process starts successfully, returns a 200 status with the task ID.
Notes:
- The repository path is constructed by appending ".git" to the repository name.
- The backup operation is performed asynchronously using a thread pool executor.
- The task ID of the backup operation is generated using UUID and stored in a global tasks dictionary.
"""
repo_path = os.path.join(REPOSITORIES_BASE_PATH, repo + ".git")
if not os.path.isdir(repo_path):
log.error("Can't backup repository %s, not found. Looked in %s", repo, repo_path, extra = {"repository" : repo, "path" : repo_path })
return {"error": "Repository not found"}, 404
data = request.json
if data is None:
log.error("Can't create repository, JSON post data missing")
return {"error" : "Parameters missing"}, 400
if not "ssh_port" in data:
data["ssh_port"] = 22
future = executor.submit(do_repo_backup, repo, data)
task_id = add_task(future)
log.info("Starting backup of repository %s, task %s", repo, task_id, extra = {"repository" : repo, "task_id" : task_id})
return {"status": "started", "task_id" : task_id}, 200
@git_ns.route('/repositories/<repo>/compact', methods=['POST'])
class GitRepoCompact(Resource):
def post(self, repo):
"""
Initiates a garbage collection (GC) process for a specified Git repository.
This endpoint triggers an asynchronous GC task for the given repository.
The task is submitted to an executor, and a unique task ID is generated
and returned to the client.
Args:
repo (str): The name of the repository to perform GC on.
Returns:
Response: A JSON response containing the status of the request and
a unique task ID if the repository is found, or an error
message if the repository is not found.
"""
repo_path = os.path.join(REPOSITORIES_BASE_PATH, repo + ".git")
if not os.path.isdir(repo_path):
log.error("Can't compact repository %s, not found. Looked in %s", repo, repo_path, extra = {"repository" : repo, "path" : repo_path })
return {"error": "Repository not found"}, 404
future = executor.submit(do_repo_gc, repo)
task_id = add_task(future)
log.info("Starting compaction of repository %s, task %s", repo, task_id, extra = {"repository" : repo, "task_id" : task_id})
return {"status": "started", "task_id" : task_id}, 200
@git_ns.route('/tasks/<task_id>/status')
class GitTaskStatus(Resource):
def get(self, task_id):
"""
Endpoint to check the status of a specific task.
Args:
task_id (str): The unique identifier of the task.
Returns:
Response: A JSON response containing the status of the task.
- If the task is not found, returns a 404 error with an error message.
- If the task is completed, returns a 200 status with the result.
- If the task is still in progress, returns a 202 status indicating the task is in progress.
"""
if not task_id in tasks:
log.error("Task %s was not found", task_id, extra = {"task_id" : task_id})
return {"error": "Task not found"}, 404
future = tasks[task_id]["future"]
try:
if future.done():
result = future.result()
log.info("Returning completion of task %s", task_id, extra = {"task_id" : task_id})
return {"status" : "completed", "result" : result}, 200
else:
log.info("Task %s is still in progress", task_id, extra = {"task_id" : task_id})
return {"status" : "in progress"}, 202
except Exception as e:
errid = uuid.uuid4().hex
log.error("Task %s failed with exception %s, UUID %s", task_id, traceback.format_exception(e), errid, extra = {"task_id" : task_id, "exception" : traceback.format_exception(e), "error_id" : errid})
return {"status" : "internal error", "error_id" : errid }, 500
@git_ns.route('/repositories/<repo>', methods=['DELETE'])
class GitRepo(Resource):
def delete(self, repo):
"""
Deletes a Git repository.
This endpoint deletes a Git repository specified by the `repo` parameter.
If the repository does not exist, it returns a 404 error with a message
indicating that the repository was not found. If the repository is successfully
deleted, it returns a 200 status with a message indicating that the repository
was deleted.
Args:
repo (str): The name of the repository to delete.
Returns:
Response: A JSON response with a status message and the appropriate HTTP status code.
"""
repo_path = os.path.join(REPOSITORIES_BASE_PATH, repo + ".git")
if not os.path.isdir(repo_path):
log.error("Can't delete repository %s, not found. Looked in %s", repo, repo_path, extra = {"repository" : repo, "path" : repo_path })
return {"error": "Repository not found"}, 404
shutil.rmtree(repo_path)
log.info("Deleted repository %s", repo, extra = {"repository" : repo})
return {"status": "Repository deleted"}, 200
@git_ns.route('/repositories/<repo>/branches')
class GitRepoBranches(Resource):
def get(self, repo):
"""
Retrieve the list of branches for a given repository.
Args:
repo (str): The name of the repository.
Returns:
Response: A JSON response containing a list of branch names or an error message if the repository is not found.
- 200: A JSON object with a "branches" key containing a list of branch names.
- 404: A JSON object with an "error" key containing the message "Repository not found" if the repository does not exist.
"""
repo_path = os.path.join(REPOSITORIES_BASE_PATH, repo + ".git")
if not os.path.isdir(repo_path):
log.error("Can't get branches of repository repository %s, not found. Looked in %s", repo, repo_path, extra = {"repository" : repo, "path" : repo_path })
return {"error": "Repository not found"}, 404
git_repo = git.Repo(repo_path)
git_repo.git.config('--global', '--add', 'safe.directory', repo_path)
branches = []
for branch in git_repo.branches:
branches = branches + [branch.name]
log.info("Returning %i branches", len(branches))
return {
"branches": branches
}
@git_ns.route('/repositories/<repo>/branches/<branch>')
class GitRepoBranchesDeleter(Resource):
def delete(self, repo, branch):
"""Delete a given branch in a given repository
Args:
repo (str): The name of the repository.
Returns:
Response: A JSON response containing a list of branch names or an error message if the repository is not found.
- 200: A JSON object with a "status" key containing "deleted"
- 404: A JSON object with an "error" key containing the message "Repository not found" or "Branch not found"
"""
repo_path = os.path.join(REPOSITORIES_BASE_PATH, repo + ".git")
if not os.path.isdir(repo_path):
log.error("Can't get branches of repository repository %s, not found. Looked in %s", repo, repo_path, extra = {"repository" : repo, "path" : repo_path })
return {"error": "Repository not found"}, 404
git_repo = git.Repo(repo_path)
git_repo.git.config('--global', '--add', 'safe.directory', repo_path)
if not branch in git_repo.branches:
log.error("Can't delete branch %s, not found in repository %s", branch, repo, extra = {"repository" : repo, "branch" : branch})
return {"error": "Branch not found"}, 404
git_repo.delete_head(branch)
log.info("Branch %s of repository %s deleted", branch, repo, extra = {"repository" : repo, "branch" : branch})
return {"status": "deleted"}, 200
@git_ns.route('/health')
class GitHealth(Resource):
def get(self):
"""
Health check endpoint.
This endpoint returns a JSON response indicating the health status of the application.
Returns:
Response: A JSON response with a status key set to "OK". Currently it always returns
a successful value, but this endpoint can still be used to check that the API is
active and functional.
"""
log.info("Health check endpoint called")
return {
"status": "OK"
}
@git_ns.route('/status')
class GitStatus(Resource):
def get(self):
"""
Status check endpoint.
This endpoint returns a JSON response indicating the status of the application.
Returns:
Response: A JSON response with status information
"""
log.info("Status endpoint called")
return {
"uptime" : time.time() - start_time,
"active_tasks" : len(tasks)
}

View File

@ -0,0 +1,33 @@
#!/bin/bash
#!/bin/bash
set -e
if [ ! -f "/etc/apt/sources.list.d/opengnsys.sources" ] ; then
cat > /etc/apt/sources.list.d/opengnsys.sources <<HERE
Types: deb
URIs: https://ognproject.evlt.uma.es/debian-opengnsys/
Suites: noble
Components: main
Signed-By:
-----BEGIN PGP PUBLIC KEY BLOCK-----
.
mDMEZzx/SxYJKwYBBAHaRw8BAQdAa83CuAJ5/+7Pn9LHT/k34EAGpx5FnT/ExHSj
XZG1JES0Ik9wZW5HbnN5cyA8b3Blbmduc3lzQG9wZW5nbnN5cy5lcz6ImQQTFgoA
QRYhBC+J38Xsso227ZbDVt2S5xJQRhKDBQJnPH9LAhsDBQkFo5qABQsJCAcCAiIC
BhUKCQgLAgQWAgMBAh4HAheAAAoJEN2S5xJQRhKDW/MBAO6swnpwdrbm48ypMyPh
NboxvF7rCqBqHWwRHvkvrq7pAP9zd98r7z2AvqVXZxnaCsLTUNMEL12+DVZAUZ1G
EquRBbg4BGc8f0sSCisGAQQBl1UBBQEBB0B6D6tkrwXSHi7ebGYsiMPntqwdkQ/S
84SFTlSxRqdXfgMBCAeIfgQYFgoAJhYhBC+J38Xsso227ZbDVt2S5xJQRhKDBQJn
PH9LAhsMBQkFo5qAAAoJEN2S5xJQRhKDJ+cBAM9jYbeq5VXkHLfODeVztgSXnSUe
yklJ18oQmpeK5eWeAQDKYk/P0R+1ZJDItxkeP6pw62bCDYGQDvdDGPMAaIT6CA==
=xcNc
-----END PGP PUBLIC KEY BLOCK-----
HERE
fi
apt update
apt install -y python3-git opengnsys-libarchive-c python3-termcolor python3-requests python3-tqdm bsdextrautils python3-paramiko python3-aniso8601 opengnsys-flask-restx opengnsys-flask-executor python3-flask