oggit/packages/libarchive-c/opengnsys-libarchive-c-5.1/libarchive/extract.py

89 lines
2.6 KiB
Python

from contextlib import contextmanager
from ctypes import byref, c_longlong, c_size_t, c_void_p
import os
from .ffi import (
write_disk_new, write_disk_set_options, write_free, write_header,
read_data_block, write_data_block, write_finish_entry, ARCHIVE_EOF
)
from .read import fd_reader, file_reader, memory_reader
EXTRACT_OWNER = 0x0001
EXTRACT_PERM = 0x0002
EXTRACT_TIME = 0x0004
EXTRACT_NO_OVERWRITE = 0x0008
EXTRACT_UNLINK = 0x0010
EXTRACT_ACL = 0x0020
EXTRACT_FFLAGS = 0x0040
EXTRACT_XATTR = 0x0080
EXTRACT_SECURE_SYMLINKS = 0x0100
EXTRACT_SECURE_NODOTDOT = 0x0200
EXTRACT_NO_AUTODIR = 0x0400
EXTRACT_NO_OVERWRITE_NEWER = 0x0800
EXTRACT_SPARSE = 0x1000
EXTRACT_MAC_METADATA = 0x2000
EXTRACT_NO_HFS_COMPRESSION = 0x4000
EXTRACT_HFS_COMPRESSION_FORCED = 0x8000
EXTRACT_SECURE_NOABSOLUTEPATHS = 0x10000
EXTRACT_CLEAR_NOCHANGE_FFLAGS = 0x20000
PREVENT_ESCAPE = (
EXTRACT_SECURE_NOABSOLUTEPATHS |
EXTRACT_SECURE_NODOTDOT |
EXTRACT_SECURE_SYMLINKS
)
@contextmanager
def new_archive_write_disk(flags):
archive_p = write_disk_new()
write_disk_set_options(archive_p, flags)
try:
yield archive_p
finally:
write_free(archive_p)
def extract_entries(entries, flags=None):
"""Extracts the given archive entries into the current directory.
"""
if flags is None:
if os.getcwd() == '/':
# If the current directory is the root, then trying to prevent
# escaping is probably undesirable.
flags = 0
else:
flags = PREVENT_ESCAPE
buff, size, offset = c_void_p(), c_size_t(), c_longlong()
buff_p, size_p, offset_p = byref(buff), byref(size), byref(offset)
with new_archive_write_disk(flags) as write_p:
for entry in entries:
write_header(write_p, entry._entry_p)
read_p = entry._archive_p
while 1:
r = read_data_block(read_p, buff_p, size_p, offset_p)
if r == ARCHIVE_EOF:
break
write_data_block(write_p, buff, size, offset)
write_finish_entry(write_p)
def extract_fd(fd, flags=None):
"""Extracts an archive from a file descriptor into the current directory.
"""
with fd_reader(fd) as archive:
extract_entries(archive, flags)
def extract_file(filepath, flags=None):
"""Extracts an archive from a file into the current directory."""
with file_reader(filepath) as archive:
extract_entries(archive, flags)
def extract_memory(buffer_, flags=None):
"""Extracts an archive from memory into the current directory."""
with memory_reader(buffer_) as archive:
extract_entries(archive, flags)