#!/usr/bin/python3 import shutil import subprocess import os import os.path import re from pathlib import Path import DiskLib import FileSystemLib import SystemLib import ogGlobals ## ProtocolLib.ogUcastSyntax() calls ogCreateImageSyntax() ## in ogCreateImageSyntax(), param2 may contain a pipe or may be empty ## if param2 is empty, then ogUcastSyntax(): ## - does a .split() ## - accesses the third element of the resulting array (ie. does "parts[2]") ## - promptly gets an IndexError exception ## ## param2 in ogCreateImageSyntax() only contains a pipe if 'mbuffer' is installed ## therefore, a hard dependency on mbuffer is created ## ## raise an Exception at import time if mbuffer is not present if not shutil.which ('mbuffer'): raise FileNotFoundError ('"mbuffer" utility must be present') #/** #@file ImageLib.py #@brief Librería o clase Image #@class Image #@brief Funciones para creación, restauración y clonación de imágenes de sistemas. #@warning License: GNU GPLv3+ #*/ #/** # ogCreateImageSyntax path_device path_filename [str_tool] [str_compressionlevel] #@brief Genera una cadena de texto con la instrucción para crear un fichero imagen #@param path_device dispositivo Linux del sistema de archivos #@param path_fileneme path absoluto del fichero imagen #@param [opcional] str_tool herrmaienta de clonacion [partimage, partclone, ntfsclone] #@param [opcional] str_compressionlevel nivel de compresion. [0 -none-, 1-lzop-, 2-gzip] #@return str_command - cadena con el comando que se debe ejecutar. #@warning Salida nula si se producen errores. #@TODO introducir las herramientas fsarchiver, dd #*/ ## #ogCreateImageSyntax /dev/sda1 /opt/opengnsys/images/prueba.img partclone lzop #ogCreateImageSyntax /dev/sda1 /opt/opengnsys/images/prueba.img def ogCreateImageSyntax (dev, imgfile, tool='partclone', level='gzip'): if 'ntfsclone' == tool: param1 = f'ntfsclone --force --save-image -O - {dev}' elif 'partimage' == tool or 'default' == tool: param1 = f'partimage -M -f3 -o -d -B gui=no -c -z0 --volume=0 save {dev} stdout' elif 'partclone' == tool: disk, part = DiskLib.ogDevToDisk (dev).split() fs = FileSystemLib.ogGetFsType (disk, part) param1 = { 'EXT2': 'partclone.extfs', 'EXT3': 'partclone.extfs', 'EXT4': 'partclone.extfs', 'BTRFS': 'partclone.btrfs', 'REISERFS': 'partclone.reiserfs', 'REISER4': 'partclone.reiser4', 'JFS': 'partclone.jfs', 'XFS': 'partclone.xfs', 'F2FS': 'partclone.f2fs', 'NILFS2': 'partclone.nilfs2', 'NTFS': 'partclone.ntfs', 'EXFAT': 'partclone.exfat', 'FAT16': 'partclone.fat', 'FAT32': 'partclone.fat', 'HFS': 'partclone.hfsp', 'HFSPLUS': 'partclone.hfsp', 'UFS': 'partclone.ufs', 'VMFS': 'partclone.vmfs', }.get (fs, 'partclone.imager') dash_c = '-c' if not shutil.which (param1): param1 = 'partclone.dd' # Algunas versiones de partclone.dd no tienen opción "-c". out = subprocess.run (['partclone.dd', '--help'], capture_output=True, text=True).stdout if ' -c' not in out: dash_c = '' param1=f'{param1} -d0 -F {dash_c} -s {dev}' else: raise Exception (f'unknown tool "{tool}"') param2 = '| mbuffer -q -m 40M ' if shutil.which ('mbuffer') else ' ' param3 = { 0: ' > ', 'none': ' > ', 1: ' | lzop > ', 'lzop': ' | lzop > ', 2: ' | gzip -c > ', 'gzip': ' | gzip -c > ', 3: ' | bzip -c > ', 'bzip': ' | bzip -c > ', }.get (level, ' > ') print (f'param1 ({param1}) param2 ({param2}) param3 ({param3}) imgfile ({imgfile})') if param1: return f'{param1} {param2} {param3} {imgfile}' #/** # ogRestoreImageSyntax path_filename path_device [str_tools] [str_compressionlevel] #@brief Genera una cadena de texto con la instrucción para crear un fichero imagen #@param path_device dispositivo Linux del sistema de archivos #@param path_fileneme path absoluto del fichero imagen #@param [opcional] str_tools herrmaienta de clonacion [partimage, partclone, ntfsclone] #@param [opcional] str_compressionlevel nivel de compresion. [0 -none-, 1-lzop-, 2-gzip] #@return cadena con el comando que se debe ejecutar. #@exception OG_ERR_FORMAT formato incorrecto. #@warning En pruebas iniciales #@TODO introducir las herramientas fsarchiver, dd #@TODO introducir el nivel de compresion gzip #*/ ## #ogRestoreImageSyntax /opt/opengnsys/images/prueba.img /dev/sda1 [partclone] [lzop] def ogRestoreImageSyntax (imgfile, part, tool=None, level=None): if not os.path.exists (imgfile): SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, imgfile) ## original bash code is broken: 'return' is never called #return if tool is None or level is None: infoimg = ogGetImageInfo (imgfile) if not infoimg: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, f'no image {imgfile}') ## original bash code is broken: 'return' is never called #return try: tool, level = infoimg.split (':')[0:2] except: tool, level = '', '' return ogRestoreImageSyntax (imgfile, part, tool, level) tool = tool.lower() level = level.lower() compressor = { 0: ' ', 'none': ' ', 1: ' lzop -dc ', 'lzop': ' lzop -dc ', 2: ' gzip -dc ', 'gzip': ' gzip -dc ', 3: ' bzip -dc ', 'bzip': ' bzip -dc ', }.get (level, '') print (f'tool ({tool}) level ({level}) compressor ({compressor})') if compressor is '': SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, f'Compressor no valid {level}') ## original bash code is broken: 'return' is never called #return mbuffer = '| mbuffer -q -m 40M ' if shutil.which ('mbuffer') else ' ' print (f'mbuffer ({mbuffer})') if 'ntfsclone' == tool: tool = f'| ntfsclone --restore-image --overwrite {part} -' elif 'partimage' == tool: tool = f'| partimage -f3 -B gui=no restore {part} stdin' elif 'partclone' in tool: # -C para que no compruebe tamaños tool = f'| partclone.restore -d0 -C -I -o {part}' elif 'dd' == tool: tool = f'| pv | dd conv=sync,noerror bs=1M of={part}' else: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, f'Tools imaging no valid {tool}') ## original bash code is broken: 'return' is never called #return print (f'tool ({tool})') return f'{compressor} {imgfile} {mbuffer} {tool}'.strip() #/** # ogGetImageInfo filename #@brief muestra información sobre la imagen monolitica. #@param 1 filename path absoluto del fichero imagen #@return cadena compuesta por clonacion:compresor:sistemaarchivos:tamañoKB #@exception OG_ERR_FORMAT formato incorrecto. #@exception OG_ERR_NOTFOUND fichero no encontrado. #@exception OG_ERR_IMAGE "Image format is not valid $IMGFILE" #@warning En pruebas iniciales #@TODO Definir sintaxis de salida (herramienta y compresor en minuscula) #@TODO Arreglar loop para ntfsclone #@TODO insertar parametros entrada tipo OG #*/ ## #ogGetImageInfo /opt/opengnsys/images/prueba.img ==> PARTCLONE:LZOP:NTFS:5642158" def ogGetImageInfo (imgfile): if not os.path.exists (imgfile): SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_NOTFOUND, imgfile) return imgdetect = False filehead = f'/tmp/{os.path.basename (imgfile)}.infohead' compressor = subprocess.run (['file', imgfile], capture_output=True, text=True).stdout.split()[1] if compressor not in ['gzip', 'lzop']: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_IMAGE, f'Image format is not valid {imgfile}') return ## original bash idiom is: $($COMPRESSOR -dc $IMGFILE 2>/dev/null | head -n 40 > $FILEHEAD) || ogRaiseError ## the purpose of which I can't fully comprehend print (f'shelling out "{compressor} -dc {imgfile} |head -n 40 > {filehead}"') if subprocess.run (f'{compressor} -dc {imgfile} |head -n 40 > {filehead}', shell=True).returncode: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_IMAGE, f'Image format is not valid {imgfile}') return tools = fs = size = None if False == imgdetect: lc_all = os.getenv ('LC_ALL') os.environ['LC_ALL'] = 'C' partclone_info = subprocess.run (['partclone.info', filehead], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True).stdout if lc_all is not None: os.environ["LC_ALL"] = lc_all else: del os.environ["LC_ALL"] if 'size' in partclone_info: tools = 'PARTCLONE' sizefactor = 1000000 if 'GB' in partclone_info else 1024 fs_lines = list (filter (lambda l: 'File system' in l, partclone_info.splitlines())) fs = fs_lines[0].split (':')[1].strip() if fs in ['HFS', 'HFSPLUS', 'FAT32']: ## TODO #FSPLUS=$(echo $PARTCLONEINFO | awk '{gsub(/\: /,"\n"); print toupper($9);}') #echo $PARTCLONEINFO | grep GB > /dev/null && SIZEFACTOR=1000000 || SIZEFACTOR=1024 fsplus = 'PLUS' if fsplus: fs += fsplus size = 42 else: size = 42 ## /TODO size = int (size * sizefactor) else: m = re.search (r'Device size *: *(\S+)', partclone_info) size = float (m.group(1)) if m else 0 size = int (size * sizefactor) imgdetect = True if False == imgdetect and not os.path.exists ('/dev/loop2'): filehead_contents = Path (filehead).read_bytes() if b'ntfsclone-image' in filehead_contents: print (f'shelling out "cat {filenead} | ntfsclone --restore --overwrite /dev/loop2 - 2>&1"') ntfscloneinfo = subprocess.run (f'cat {filenead} | ntfsclone --restore --overwrite /dev/loop2 - 2>&1', shell=True, capture_output=True, text=True).stdout if 'ntfsclone' in ntfscloneinfo: tools = 'NTFSCLONE' size_lines = list (filter (lambda l: '__TODO__' in l, ntfscloneinfo.splitlines())) ## TODO size = 42 #int (size_lines[0].split (':')[1].strip()) ## TODO fs = 'NTFS' imgdetect = True if False == imgdetect: partimageinfo = subprocess.run (['partimage', '-B', 'gui=no', 'imginfo', filehead], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True).stdout if 'Partition' in partimageinfo: tools = 'TOOLS=PARTIMAGE' fs_lines = list (filter (lambda l: '__TODO__' in l, partimageinfo.splitlines())) ## TODO fs = 'EXTFS' #fs_lines[0].split (':')[1].strip() ## TODO size_lines = list (filter (lambda l: '__TODO__' in l, partimageinfo.splitlines())) ## TODO size = 42 #int (size_lines[0].split (':')[1].strip()) ## TODO imgdetect = True if 'boot sector' in subprocess.run (['file', filehead], capture_output=True, text=True).stdout: tools = 'partclone.dd' fs = '' size = 0 imgdetect = True if not tools or not compressor or False == imgdetect: SystemLib.ogRaiseError ([], ogGlobals.OG_ERR_IMAGE, f'Image format is not valid {imgfile}') return compressor = compressor.lower() return ':'.join ([tools, compressor, fs, str (size)])