Merge pull request 'refs #1378 - Add API tests and modify API' (#17) from add_python_scripts into main
Reviewed-on: #17deb-packages 0.5.16
commit
a1d4773c7e
|
@ -403,7 +403,20 @@ def recall_ogcore(data):
|
|||
journal.send(f"HTTP Status Code: {response.status_code}", PRIORITY=journal.LOG_INFO, SYSLOG_IDENTIFIER="ogrepo-api_DEBUG")
|
||||
#journal.send(f"HTTP Response: {response.text}", PRIORITY=journal.LOG_INFO, SYSLOG_IDENTIFIER="ogrepo-api_DEBUG")
|
||||
journal.send(f"{{'component':'ogRepo', 'severity':'INFO', 'operation':'Run function recall_ogcore', 'desc':'HTTP Status Code response: {response.status_code}'}}", PRIORITY=journal.LOG_INFO, SYSLOG_IDENTIFIER="ogrepo-api")
|
||||
|
||||
|
||||
|
||||
# ---------------------------------------------------------
|
||||
|
||||
|
||||
def check_file_exists(file_path):
|
||||
""" Comprueba la existencia del archivo cuya ruta recibe como parámetro.
|
||||
Si el archivo existe devuelve "True", y si no devuelve "False".
|
||||
"""
|
||||
if os.path.exists(file_path):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
@ -873,7 +886,7 @@ def import_image():
|
|||
}), 400
|
||||
|
||||
# Construimos la llamada al script:
|
||||
cmd = ['sudo', 'python3', f"{script_path}/importImage.py", image_file_path, remote_ip, remote_user]
|
||||
cmd = ['python3', f"{script_path}/importImage.py", image_file_path, remote_ip, remote_user]
|
||||
|
||||
try:
|
||||
journal.send("Running script 'importImage.py'...", PRIORITY=journal.LOG_INFO, SYSLOG_IDENTIFIER="ogrepo-api_DEBUG")
|
||||
|
@ -947,8 +960,12 @@ def export_image():
|
|||
# Evaluamos los parámetros obtenidos, para construir la ruta de la imagen, o para devover un error si no se ha encontrado la imagen (o si está bloqueada):
|
||||
if param_dict:
|
||||
image_file_path = f"{param_dict['name']}.{param_dict['extension']}"
|
||||
|
||||
# Comprobamos si el archivo que bloquea la imagen existe, llamando a la función "check_file_exists":
|
||||
image_lock_exists = check_file_exists(f"{repo_path}{image_file_path}.lock")
|
||||
|
||||
# Si la imagen existe pero está bloqueada, devolvemos un error:
|
||||
if os.path.exists(f"{repo_path}{image_file_path}.lock"):
|
||||
if image_lock_exists == True:
|
||||
journal.send("Image is locked", PRIORITY=journal.LOG_WARNING, SYSLOG_IDENTIFIER="ogrepo-api_DEBUG")
|
||||
journal.send("{'component':'ogRepo', 'severity':'WARNING', 'http_code':'400', 'operation':'Run endpoint export_image', 'desc':'Warning: Image is locked'}", PRIORITY=journal.LOG_WARNING, SYSLOG_IDENTIFIER="ogrepo-api")
|
||||
return jsonify({
|
||||
|
@ -975,7 +992,7 @@ def export_image():
|
|||
}), 400
|
||||
|
||||
# Construimos la llamada al script:
|
||||
cmd = ['sudo', 'python3', f"{script_path}/exportImage.py", image_file_path, remote_ip, remote_user]
|
||||
cmd = ['python3', f"{script_path}/exportImage.py", image_file_path, remote_ip, remote_user]
|
||||
|
||||
try:
|
||||
journal.send("Running script 'exportImage.py'...", PRIORITY=journal.LOG_INFO, SYSLOG_IDENTIFIER="ogrepo-api_DEBUG")
|
||||
|
@ -1047,8 +1064,11 @@ def create_torrent_sum():
|
|||
json_data = json.loads(request.data)
|
||||
image_name = json_data.get("image")
|
||||
|
||||
# Comprobamos si la imagen existe, llamando a la función "check_file_exists":
|
||||
image_exists = check_file_exists(f"{repo_path}{image_name}")
|
||||
|
||||
# Si la imagen no existe, retornamos un error y salimos del endpoint:
|
||||
if not os.path.exists(f"{repo_path}{image_name}"):
|
||||
if image_exists == False:
|
||||
journal.send("Image not found", PRIORITY=journal.LOG_WARNING, SYSLOG_IDENTIFIER="ogrepo-api_DEBUG")
|
||||
journal.send("{'component':'ogRepo', 'severity':'WARNING', 'http_code':'400', 'operation':'Run endpoint create_torrent_sum', 'desc':'Warning: Image not found'}", PRIORITY=journal.LOG_WARNING, SYSLOG_IDENTIFIER="ogrepo-api")
|
||||
return jsonify({
|
||||
|
|
|
@ -6,14 +6,15 @@
|
|||
----------------------------------------------------------------------------------
|
||||
|
||||
Define un método "setUp", que configura el cliente de prueba de Flask y habilita el modo de prueba
|
||||
(y que crea una imagen para realizar los tests).
|
||||
(y que crea una imagen para realizar cada uno de los tests).
|
||||
Esto permite simular peticiones HTTP a la API sin necesidad de un servidor en ejecución.
|
||||
|
||||
También define un método de prueba por cada uno de los endpoints de la API, y decora cada uno con "@patch",
|
||||
para reemplazar las llamadas a "subprocess.run", "subprocess.Popen" y otras funciones con objetos "MagicMock".
|
||||
Esto permite simular diferentes respuestas y comportamientos sin ejecutar realmente los comandos del sistema.
|
||||
|
||||
Finalmente, define un método "tearDown" que limpia el entorno de pruebas (eliminando la imagen creado por "setUp").
|
||||
Finalmente, define un método "tearDown" que limpia el entorno de pruebas, eliminando la imagen creada por el método "setUp"
|
||||
(al finalizar cada una de los tests).
|
||||
|
||||
NOTA: Se debe ejecutar como "root", o dará errores de permisos con la imagen de prueba.
|
||||
"""
|
||||
|
@ -26,7 +27,7 @@ import unittest
|
|||
from unittest.mock import patch, mock_open, MagicMock, AsyncMock
|
||||
from flask import json
|
||||
import os
|
||||
from repo_api import app, get_image_params, search_process, check_remote_connection, check_remote_image, check_lock_remote, check_lock_local, check_aux_files
|
||||
from repo_api import app, get_image_params, search_process, check_remote_connection, check_remote_image, check_lock_remote, check_lock_local, check_aux_files, check_file_exists
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
@ -48,8 +49,11 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
para reemplazar las llamadas a "subprocess.run", "subprocess.Popen" y otras funciones con objetos "MagicMock".
|
||||
"""
|
||||
|
||||
# ------------------------------------------------------------------- FUNCTIONS
|
||||
|
||||
|
||||
def setUp(self):
|
||||
""" Configura el cliente de prueba de Flask y habilita el modo de prueba.
|
||||
""" Configura el cliente de prueba de Flask y habilita el modo de prueba (antes de cada test).
|
||||
Esto permite simular peticiones HTTP a la API sin necesidad de un servidor en ejecución.
|
||||
Además, crea el archivo "test4unittest.img", para realizar las pruebas.
|
||||
"""
|
||||
|
@ -60,6 +64,14 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
test_image.write(' ')
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
""" Limpia el entorno de pruebas (después de cada test).
|
||||
En este caso, elimina el archivo "test4unittest.img" (imagen de prueba).
|
||||
"""
|
||||
if os.path.exists(f"{repo_path}/test4unittest.img"):
|
||||
os.remove(f"{repo_path}/test4unittest.img")
|
||||
|
||||
|
||||
def mock_search_process(process, string_to_search):
|
||||
""" Esta función simula la respuesta de la función "search_process" de la API.
|
||||
Es necesaria para los métodos "test_send_udpcast" y "test_send_uftp", porque de alguna forma se asocia el valor esperado ("True")
|
||||
|
@ -70,6 +82,17 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
return True
|
||||
|
||||
|
||||
def mock_search_process_error(process, string_to_search):
|
||||
""" Esta función simula la respuesta de la función "search_process" de la API.
|
||||
Es necesaria para el método "test_send_p2p_error500", porque de alguna forma se asocia el valor esperado ("False")
|
||||
a los parámetros que se pasan al script al que se llama desde el endpoint a testear.
|
||||
"""
|
||||
return False
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Obtener Información de Estado de ogRepository"
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_repo_status(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Obtener Información de Estado de ogRepository".
|
||||
|
@ -81,6 +104,21 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('cpu', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_repo_status_error(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Obtener Información de Estado de ogRepository"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Obtener Información de Estado de ogRepository' (error 500)...")
|
||||
mock_subprocess.side_effect = Exception("Error al obtener el estado del repositorio")
|
||||
response = self.app.get('/ogrepository/v1/status')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al obtener el estado del repositorio', response.data.decode())
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Obtener Información de todas las Imágenes"
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_repo_info(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Obtener Información de todas las Imágenes".
|
||||
|
@ -92,6 +130,21 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('images', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_repo_info_error(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Obtener Información de todas las Imágenes"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Obtener Información de todas las Imágenes' (error 500)...")
|
||||
mock_subprocess.side_effect = Exception("Error al obtener informacion de todas las imagenes")
|
||||
response = self.app.get('/ogrepository/v1/images')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al obtener informacion de todas las imagenes', response.data.decode())
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Obtener Información de una Imagen concreta"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_repo_image_info(self, mock_subprocess, mock_get_image_params):
|
||||
|
@ -105,6 +158,36 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('image', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_repo_image_info_error500(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Obtener Información de una Imagen concreta"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Obtener Información de una Imagen concreta' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.side_effect = Exception("Error al obtener informacion de una imagen concreta")
|
||||
response = self.app.get('/ogrepository/v1/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al obtener informacion de una imagen concreta', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_repo_image_info_error400(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Obtener Información de una Imagen concreta"
|
||||
(en caso de error "400").
|
||||
"""
|
||||
print("Testing endpoint 'Obtener Información de una Imagen concreta' (error 400)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.get('/ogrepository/v1/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found', json.loads(response.data)['error'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Actualizar Información del Repositorio"
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_update_repo_info(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Actualizar Información del Repositorio".
|
||||
|
@ -116,6 +199,21 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Repository info updated successfully', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_update_repo_info_error(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Actualizar Información del Repositorio"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Actualizar Información del Repositorio' (error 500)...")
|
||||
mock_subprocess.side_effect = Exception("Error al actualizar la informacion del repositorio")
|
||||
response = self.app.put('/ogrepository/v1/images')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al actualizar la informacion del repositorio', response.data.decode())
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Chequear Integridad de Imagen"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_check_image(self, mock_subprocess, mock_get_image_params):
|
||||
|
@ -123,10 +221,43 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
"""
|
||||
print("Testing endpoint 'Chequear Integridad de Imagen'...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stdout='Image file passed the Integrity Check correctly')
|
||||
mock_subprocess.return_value = MagicMock(returncode=0, stdout=["Image file passed the Integrity Check correctly", "Image file didn't pass the Integrity Check"])
|
||||
response = self.app.get('/ogrepository/v1/status/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
self.assertIn('Image file passed the Integrity Check correctly', json.loads(response.data)['output'])
|
||||
self.assertTrue(
|
||||
"Image file passed the Integrity Check correctly" in json.loads(response.data)['output'] or
|
||||
"Image file didn't pass the Integrity Check" in json.loads(response.data)['output']
|
||||
)
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_check_image_error500(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Chequear Integridad de Imagen"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Chequear Integridad de Imagen' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.side_effect = Exception("Error al chequear la integridad de la imagen")
|
||||
response = self.app.get('/ogrepository/v1/status/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al chequear la integridad de la imagen', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_check_image_error400(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Chequear Integridad de Imagen"
|
||||
(en caso de error "400").
|
||||
"""
|
||||
print("Testing endpoint 'Chequear Integridad de Imagen' (error 400)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.get('/ogrepository/v1/status/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found (inexistent or deleted)', json.loads(response.data)['error'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Eliminar una Imagen"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
|
@ -142,6 +273,47 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Image deleted successfully', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_delete_image_error500(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Eliminar una Imagen"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Eliminar una Imagen' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.side_effect = Exception("Error al eliminar la imagen")
|
||||
response = self.app.delete('/ogrepository/v1/images/test_image_id?method=trash')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al eliminar la imagen', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
def test_delete_image_error400_image(self, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Eliminar una Imagen"
|
||||
(en caso de error "400", por imagen inexistente).
|
||||
"""
|
||||
print("Testing endpoint 'Eliminar una Imagen' (error 400, no image)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.delete('/ogrepository/v1/images/test_image_id?method=trash')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("Image not found (inexistent or deleted)", json.loads(response.data)['error'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
def test_delete_image_error400_incorrect_method(self, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Eliminar una Imagen"
|
||||
(en caso de error "400", por método incorrecto).
|
||||
"""
|
||||
print("Testing endpoint 'Eliminar una Imagen' (error 400, incorrect method)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
response = self.app.delete('/ogrepository/v1/images/test_image_id?method=incorrect')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("Incorrect method (must be 'permanent' or 'trash')", json.loads(response.data)['error'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Recuperar una Imagen"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_recover_image(self, mock_subprocess, mock_get_image_params):
|
||||
|
@ -155,6 +327,36 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Image recovered successfully', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_recover_image_error500(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Recuperar una Imagen"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Recuperar una Imagen' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.side_effect = Exception("Error al recuperar la imagen")
|
||||
response = self.app.post('/ogrepository/v1/trash/images', data=json.dumps({"ID_img": "test_image_id"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al recuperar la imagen', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_recover_image_error400(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Recuperar una Imagen"
|
||||
(en caso de error "400").
|
||||
"""
|
||||
print("Testing endpoint 'Recuperar una Imagen' (error 400)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.post('/ogrepository/v1/trash/images', data=json.dumps({"ID_img": "test_image_id"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found (inexistent or recovered previously)', json.loads(response.data)['error'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Eliminar una Imagen de la Papelera"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_delete_trash_image(self, mock_subprocess, mock_get_image_params):
|
||||
|
@ -168,6 +370,36 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Image deleted successfully', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_delete_trash_image_error500(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Eliminar una Imagen de la Papelera"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Eliminar una Imagen de la Papelera' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.side_effect = Exception("Error al eliminar la imagen de la papelera")
|
||||
response = self.app.delete('/ogrepository/v1/trash/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al eliminar la imagen de la papelera', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_delete_trash_image_error400(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Eliminar una Imagen de la Papelera"
|
||||
(en caso de error "400").
|
||||
"""
|
||||
print("Testing endpoint 'Eliminar una Imagen de la Papelera' (error 400)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.delete('/ogrepository/v1/trash/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found at trash', json.loads(response.data)['error'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Importar una Imagen"
|
||||
|
||||
|
||||
@patch('repo_api.check_remote_connection')
|
||||
@patch('repo_api.check_lock_local')
|
||||
@patch('repo_api.check_remote_image')
|
||||
|
@ -185,6 +417,56 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Importing image...', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.check_remote_connection')
|
||||
@patch('repo_api.check_lock_local')
|
||||
@patch('repo_api.check_remote_image')
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_import_image_error500(self, mock_popen, mock_check_remote_image, mock_check_remote_connection, mock_check_lock_local):
|
||||
""" Método de prueba del endpoint "Importar una Imagen"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Importar una Imagen' (error 500)...")
|
||||
mock_check_remote_connection.return_value = True
|
||||
mock_check_remote_image.return_value = None
|
||||
mock_popen.side_effect = Exception("Error al importar la imagen")
|
||||
mock_check_lock_local.return_value = AsyncMock(returncode=None)
|
||||
response = self.app.post('/ogrepository/v1/repo/images', data=json.dumps({"image": "test4unittest.img", "repo_ip": "127.0.0.1", "user": "test_user"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al importar la imagen', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.check_remote_connection')
|
||||
def test_import_image_error400_connection(self, mock_check_remote_connection):
|
||||
""" Método de prueba del endpoint "Importar una Imagen"
|
||||
(en caso de error "400", por fallo en la conexión).
|
||||
"""
|
||||
print("Testing endpoint 'Importar una Imagen' (error 400, no connection)...")
|
||||
mock_check_remote_connection.return_value = False
|
||||
response = self.app.post('/ogrepository/v1/repo/images', data=json.dumps({"image": "test4unittest.img", "repo_ip": "127.0.0.1", "user": "test_user"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("Can't connect to remote server", json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
@patch('repo_api.check_remote_connection')
|
||||
@patch('repo_api.check_remote_image')
|
||||
def test_import_image_error400_image(self, mock_check_remote_image, mock_check_remote_connection):
|
||||
""" Método de prueba del endpoint "Importar una Imagen"
|
||||
(en caso de error "400", por imagen inexistente o bloqueada).
|
||||
"""
|
||||
print("Testing endpoint 'Importar una Imagen' (error 400, no image or image locked)...")
|
||||
mock_check_remote_connection.return_value = True
|
||||
mock_check_remote_image.side_effect = ["Remote image not found", "Remote image is locked"]
|
||||
response = self.app.post('/ogrepository/v1/repo/images', data=json.dumps({"image": "test4unittest.img", "repo_ip": "127.0.0.1", "user": "test_user"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertTrue(
|
||||
'Remote image not found' in json.loads(response.data)['exception'] or
|
||||
'Remote image is locked' in json.loads(response.data)['exception']
|
||||
)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Exportar una Imagen"
|
||||
|
||||
|
||||
@patch('repo_api.check_remote_connection')
|
||||
@patch('repo_api.check_lock_remote')
|
||||
@patch('repo_api.get_image_params')
|
||||
|
@ -202,6 +484,83 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Exporting image...', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.check_remote_connection')
|
||||
@patch('repo_api.check_lock_remote')
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_export_image_error500(self, mock_popen, mock_get_image_params, mock_check_remote_connection, mock_check_lock_remote):
|
||||
""" Método de prueba del endpoint "Exportar una Imagen"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Exportar una Imagen' (error 500)...")
|
||||
mock_check_remote_connection.return_value = True
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_popen.side_effect = Exception("Error al exportar la imagen")
|
||||
mock_check_lock_remote.return_value = AsyncMock(returncode=None)
|
||||
response = self.app.put('/ogrepository/v1/repo/images', data=json.dumps({"ID_img": "test_image_id", "repo_ip": "127.0.0.1", "user": "test_user"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al exportar la imagen', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.check_remote_connection')
|
||||
@patch('repo_api.get_image_params')
|
||||
def test_export_image_error400_connection(self, mock_get_image_params, mock_check_remote_connection):
|
||||
""" Método de prueba del endpoint "Exportar una Imagen"
|
||||
(en caso de error "400", por fallo en la conexión).
|
||||
"""
|
||||
print("Testing endpoint 'Exportar una Imagen' (error 400, no connection)...")
|
||||
mock_check_remote_connection.return_value = False
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
response = self.app.put('/ogrepository/v1/repo/images', data=json.dumps({"ID_img": "test_image_id", "repo_ip": "127.0.0.1", "user": "test_user"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("Can't connect to remote server", json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
def test_export_image_error400_image_inexistent(self, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Exportar una Imagen"
|
||||
(en caso de error "400", por imagen inexistente).
|
||||
"""
|
||||
print("Testing endpoint 'Exportar una Imagen' (error 400, no image)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.put('/ogrepository/v1/repo/images', data=json.dumps({"ID_img": "test_image_id", "repo_ip": "127.0.0.1", "user": "test_user"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("Image not found", json.loads(response.data)['error'])
|
||||
|
||||
|
||||
@patch('repo_api.check_file_exists')
|
||||
@patch('repo_api.get_image_params')
|
||||
def test_export_image_error400_image_locked(self, mock_get_image_params, mock_check_file_exists):
|
||||
""" Método de prueba del endpoint "Exportar una Imagen"
|
||||
(en caso de error "400", por imagen bloqueada).
|
||||
"""
|
||||
print("Testing endpoint 'Exportar una Imagen' (error 400, image locked)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_check_file_exists.return_value = True
|
||||
response = self.app.put('/ogrepository/v1/repo/images', data=json.dumps({"ID_img": "test_image_id", "repo_ip": "127.0.0.1", "user": "test_user"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("Image is locked", json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
@patch('repo_api.check_remote_connection')
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_export_image_error400_remote_image_exists(self, mock_popen, mock_get_image_params, mock_check_remote_connection):
|
||||
""" Método de prueba del endpoint "Exportar una Imagen"
|
||||
(en caso de error "400", por imagen remota ya existente).
|
||||
"""
|
||||
print("Testing endpoint 'Exportar una Imagen' (error 400, remote image exists)...")
|
||||
mock_check_remote_connection.return_value = True
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_popen.side_effect = Exception("exit status 5")
|
||||
response = self.app.put('/ogrepository/v1/repo/images', data=json.dumps({"ID_img": "test_image_id", "repo_ip": "127.0.0.1", "user": "test_user"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn("Image already exists on remote server", json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Crear archivos auxiliares"
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
@patch('repo_api.check_aux_files')
|
||||
def test_create_torrent_sum(self, mock_popen, mock_check_aux_files):
|
||||
|
@ -215,6 +574,45 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Creating auxiliar files...', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_create_torrent_sum_error500(self, mock_popen):
|
||||
""" Método de prueba del endpoint "Crear archivos auxiliares"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Crear archivos auxiliares' (error 500)...")
|
||||
mock_popen.side_effect = Exception("Error al crear los archivos auxiliares")
|
||||
response = self.app.post('/ogrepository/v1/images/torrentsum', data=json.dumps({"image": "test4unittest.img"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al crear los archivos auxiliares', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.check_file_exists')
|
||||
def test_create_torrent_sum_error400_no_image(self, mock_check_file_exists):
|
||||
""" Método de prueba del endpoint "Crear archivos auxiliares"
|
||||
(en caso de error "400", por imagen inexistente).
|
||||
"""
|
||||
print("Testing endpoint 'Crear archivos auxiliares' (error 400, no image)...")
|
||||
mock_check_file_exists.return_value = False
|
||||
response = self.app.post('/ogrepository/v1/images/torrentsum', data=json.dumps({"image": "test4unittest.img"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found', json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_create_torrent_sum_error400_image_locked(self, mock_popen):
|
||||
""" Método de prueba del endpoint "Crear archivos auxiliares"
|
||||
(en caso de error "400", por imagen bloqueada).
|
||||
"""
|
||||
print("Testing endpoint 'Crear archivos auxiliares' (error 400, image locked)...")
|
||||
mock_popen.side_effect = Exception("exit status 3")
|
||||
response = self.app.post('/ogrepository/v1/images/torrentsum', data=json.dumps({"image": "test4unittest.img"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image is locked', json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Enviar paquete Wake On Lan"
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_send_wakeonlan(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Enviar paquete Wake On Lan".
|
||||
|
@ -226,6 +624,21 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Wake On Lan packet sended successfully', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_send_wakeonlan_error500(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Enviar paquete Wake On Lan"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Enviar paquete Wake On Lan' (error 500)...")
|
||||
mock_subprocess.side_effect = Exception("Error al enviar el paquete Wake On Lan")
|
||||
response = self.app.post('/ogrepository/v1/wol', data=json.dumps({"broadcast_ip": "192.168.1.255", "mac": "00:11:22:33:44:55"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al enviar el paquete Wake On Lan', response.data.decode())
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Enviar una Imagen mediante UDPcast"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.search_process', side_effect=mock_search_process) # Esto hace que pille el retorno de la función "search_process" de la API llamando al método "mock_search_process" de esta clase.)
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
|
@ -240,6 +653,36 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Sending image...', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_send_udpcast_error500(self, mock_popen, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Enviar una Imagen mediante UDPcast"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Enviar una Imagen mediante UDPcast' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_popen.side_effect = Exception("Error al enviar la imagen mediante UDPcast")
|
||||
response = self.app.post('/ogrepository/v1/udpcast', data=json.dumps({"ID_img": "test_image_id", "port": "9000", "method": "mcast", "ip": "224.0.0.1", "bitrate": "10M", "nclients": "5", "maxtime": "60"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al enviar la imagen mediante UDPcast', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_send_udpcast_error400(self, mock_popen, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Enviar una Imagen mediante UDPcast"
|
||||
(en caso de error "400").
|
||||
"""
|
||||
print("Testing endpoint 'Enviar una Imagen mediante UDPcast' (error 400)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.post('/ogrepository/v1/udpcast', data=json.dumps({"ID_img": "test_image_id", "port": "9000", "method": "mcast", "ip": "224.0.0.1", "bitrate": "10M", "nclients": "5", "maxtime": "60"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found', json.loads(response.data)['error'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Enviar una Imagen mediante UFTP"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.search_process', side_effect=mock_search_process) # Esto hace que pille el retorno de la función "search_process" de la API llamando al método "mock_search_process" de esta clase.
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
|
@ -254,6 +697,36 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Sending image...', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_send_uftp_error500(self, mock_popen, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Enviar una Imagen mediante UFTP"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Enviar una Imagen mediante UFTP' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_popen.side_effect = Exception("Error al enviar la imagen mediante UFTP")
|
||||
response = self.app.post('/ogrepository/v1/uftp', data=json.dumps({"ID_img": "test_image_id", "port": "9000", "ip": "224.0.0.1", "bitrate": "10M"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al enviar la imagen mediante UFTP', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_send_uftp_error400(self, mock_popen, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Enviar una Imagen mediante UFTP"
|
||||
(en caso de error "400").
|
||||
"""
|
||||
print("Testing endpoint 'Enviar una Imagen mediante UFTP' (error 400)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.post('/ogrepository/v1/uftp', data=json.dumps({"ID_img": "test_image_id", "port": "9000", "ip": "224.0.0.1", "bitrate": "10M"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found', json.loads(response.data)['error'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Enviar una Imagen mediante P2P"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.search_process')
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
|
@ -269,6 +742,37 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Tracker and Seeder serving image correctly', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.search_process', side_effect=mock_search_process_error)
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_send_p2p_error500(self, mock_popen, mock_get_image_params, mock_search_process):
|
||||
""" Método de prueba del endpoint "Enviar una Imagen mediante P2P"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Enviar una Imagen mediante P2P' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_popen.return_value = MagicMock(returncode=None)
|
||||
response = self.app.post('/ogrepository/v1/p2p', data=json.dumps({"ID_img": "test_image_id"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn("Tracker or Seeder (or both) not running", json.loads(response.data)['error'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.Popen')
|
||||
def test_send_p2p_error400(self, mock_popen, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Enviar una Imagen mediante P2P"
|
||||
(en caso de error "400").
|
||||
"""
|
||||
print("Testing endpoint 'Enviar una Imagen mediante P2P' (error 400)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.post('/ogrepository/v1/p2p', data=json.dumps({"ID_img": "test_image_id"}), content_type='application/json')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found', json.loads(response.data)['error'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Ver Estado de Transmisiones UDPcast"
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_udpcast_info(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Ver Estado de Transmisiones UDPcast".
|
||||
|
@ -280,6 +784,33 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('udpcast', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_udpcast_info_error500(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Ver Estado de Transmisiones UDPcast"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Ver Estado de Transmisiones UDPcast' (error 500)...")
|
||||
mock_subprocess.side_effect = Exception("Error al comprobar las transmisiones UDPcast")
|
||||
response = self.app.get('/ogrepository/v1/udpcast')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al comprobar las transmisiones UDPcast', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_udpcast_info_error400(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Ver Estado de Transmisiones UDPcast"
|
||||
(en caso de error "400").
|
||||
"""
|
||||
print("Testing endpoint 'Ver Estado de Transmisiones UDPcast' (error 400)...")
|
||||
mock_subprocess.side_effect = Exception("exit status 1")
|
||||
response = self.app.get('/ogrepository/v1/udpcast')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('No UDPCast active transmissions', json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Ver Estado de Transmisiones UFTP"
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_uftp_info(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Ver Estado de Transmisiones UFTP".
|
||||
|
@ -291,6 +822,33 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('uftp', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_uftp_info_error500(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Ver Estado de Transmisiones UFTP"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Ver Estado de Transmisiones UFTP' (error 500)...")
|
||||
mock_subprocess.side_effect = Exception("Error al comprobar las transmisiones UFTP")
|
||||
response = self.app.get('/ogrepository/v1/uftp')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al comprobar las transmisiones UFTP', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_get_uftp_info_error400(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Ver Estado de Transmisiones UFTP"
|
||||
(en caso de error "400").
|
||||
"""
|
||||
print("Testing endpoint 'Ver Estado de Transmisiones UFTP' (error 400)...")
|
||||
mock_subprocess.side_effect = Exception("exit status 1")
|
||||
response = self.app.get('/ogrepository/v1/uftp')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('No UFTP active transmissions', json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Cancelar Transmisión UDPcast"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_udpcast(self, mock_subprocess, mock_get_image_params):
|
||||
|
@ -304,6 +862,50 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Image transmission canceled successfully', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_udpcast_error500(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Cancelar Transmisión UDPcast"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Cancelar Transmisión UDPcast' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.side_effect = Exception("Error al cancelar la transmision UDPcast")
|
||||
response = self.app.delete('/ogrepository/v1/udpcast/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al cancelar la transmision UDPcast', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_udpcast_error400_image(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Cancelar Transmisión UDPcast"
|
||||
(en caso de error "400", por imagen inexistente).
|
||||
"""
|
||||
print("Testing endpoint 'Cancelar Transmisión UDPcast' (error 400, no image)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.delete('/ogrepository/v1/udpcast/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found', json.loads(response.data)['error'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_udpcast_error400_transmission(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Cancelar Transmisión UDPcast"
|
||||
(en caso de error "400", por transmisión inexistente).
|
||||
"""
|
||||
print("Testing endpoint 'Cancelar Transmisión UDPcast' (error 400, no transmission)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.side_effect = Exception("exit status 3")
|
||||
response = self.app.delete('/ogrepository/v1/udpcast/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('No UDPCast active transmissions for specified image', json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Cancelar Transmisión UFTP"
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_uftp(self, mock_subprocess, mock_get_image_params):
|
||||
|
@ -317,6 +919,50 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('Image transmission canceled successfully', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_uftp_error500(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Cancelar Transmisión UFTP"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
print("Testing endpoint 'Cancelar Transmisión UFTP' (error 500)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.side_effect = Exception("Error al cancelar la transmision UFTP")
|
||||
response = self.app.delete('/ogrepository/v1/uftp/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al cancelar la transmision UFTP', response.data.decode())
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_uftp_error400_image(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Cancelar Transmisión UFTP"
|
||||
(en caso de error "400", por imagen inexistente).
|
||||
"""
|
||||
print("Testing endpoint 'Cancelar Transmisión UFTP' (error 400, no image)...")
|
||||
mock_get_image_params.return_value = None
|
||||
response = self.app.delete('/ogrepository/v1/uftp/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('Image not found', json.loads(response.data)['error'])
|
||||
|
||||
|
||||
@patch('repo_api.get_image_params')
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_uftp_error400_transmission(self, mock_subprocess, mock_get_image_params):
|
||||
""" Método de prueba del endpoint "Cancelar Transmisión UFTP"
|
||||
(en caso de error "400", por transmisión inexistente).
|
||||
"""
|
||||
print("Testing endpoint 'Cancelar Transmisión UFTP' (error 400, no transmission)...")
|
||||
mock_get_image_params.return_value = {'name': 'test4unittest', 'extension': 'img'}
|
||||
mock_subprocess.side_effect = Exception("exit status 3")
|
||||
response = self.app.delete('/ogrepository/v1/uftp/images/test_image_id')
|
||||
self.assertEqual(response.status_code, 400)
|
||||
self.assertIn('No UFTP active transmissions for specified image', json.loads(response.data)['exception'])
|
||||
|
||||
|
||||
# ------------------------------------------------------------------- Tests "Cancelar Transmisiones P2P"
|
||||
|
||||
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_p2p(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Cancelar Transmisiones P2P".
|
||||
|
@ -328,14 +974,18 @@ class RepoApiTestCase(unittest.TestCase):
|
|||
self.assertIn('P2P transmissions canceled successfully', json.loads(response.data)['output'])
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
""" Método para limpiar el entorno de pruebas.
|
||||
En este caso, elimina el archivo "test4unittest.img" (imagen de prueba).
|
||||
@patch('repo_api.subprocess.run')
|
||||
def test_stop_p2p_error500(self, mock_subprocess):
|
||||
""" Método de prueba del endpoint "Cancelar Transmisiones P2P"
|
||||
(en caso de error "500").
|
||||
"""
|
||||
if os.path.exists(f"{repo_path}/test4unittest.img"):
|
||||
os.remove(f"{repo_path}/test4unittest.img")
|
||||
print("Testing endpoint 'Cancelar Transmisiones P2P' (error 500)...")
|
||||
mock_subprocess.side_effect = Exception("Error al cancelar las transmisiones P2P")
|
||||
response = self.app.delete('/ogrepository/v1/p2p')
|
||||
self.assertEqual(response.status_code, 500)
|
||||
self.assertIn('Error al cancelar las transmisiones P2P', response.data.decode())
|
||||
|
||||
|
||||
|
||||
|
||||
# --------------------------------------------------------------------------------------------
|
||||
|
||||
|
|
Loading…
Reference in New Issue