280 lines
10 KiB
Python
280 lines
10 KiB
Python
from contextlib import contextmanager
|
|
from ctypes import byref, cast, c_char, c_size_t, c_void_p, POINTER
|
|
from posixpath import join
|
|
import warnings
|
|
|
|
from . import ffi
|
|
from .entry import ArchiveEntry, FileType
|
|
from .ffi import (
|
|
OPEN_CALLBACK, WRITE_CALLBACK, CLOSE_CALLBACK, NO_OPEN_CB, NO_CLOSE_CB,
|
|
ARCHIVE_EOF,
|
|
page_size, entry_sourcepath, entry_clear, read_disk_new, read_disk_open_w,
|
|
read_next_header2, read_disk_descend, read_free, write_header, write_data,
|
|
write_finish_entry,
|
|
read_disk_set_behavior
|
|
)
|
|
|
|
|
|
@contextmanager
|
|
def new_archive_read_disk(path, flags=0, lookup=False):
|
|
archive_p = read_disk_new()
|
|
read_disk_set_behavior(archive_p, flags)
|
|
if lookup:
|
|
ffi.read_disk_set_standard_lookup(archive_p)
|
|
read_disk_open_w(archive_p, path)
|
|
try:
|
|
yield archive_p
|
|
finally:
|
|
read_free(archive_p)
|
|
|
|
|
|
class ArchiveWrite:
|
|
|
|
def __init__(self, archive_p, header_codec='utf-8'):
|
|
self._pointer = archive_p
|
|
self.header_codec = header_codec
|
|
|
|
def add_entries(self, entries):
|
|
"""Add the given entries to the archive.
|
|
"""
|
|
write_p = self._pointer
|
|
for entry in entries:
|
|
write_header(write_p, entry._entry_p)
|
|
for block in entry.get_blocks():
|
|
write_data(write_p, block, len(block))
|
|
write_finish_entry(write_p)
|
|
|
|
def add_files(
|
|
self, *paths, flags=0, lookup=False, pathname=None, recursive=True,
|
|
**attributes
|
|
):
|
|
"""Read files through the OS and add them to the archive.
|
|
|
|
Args:
|
|
paths (str): the paths of the files to add to the archive
|
|
flags (int):
|
|
passed to the C function `archive_read_disk_set_behavior`;
|
|
use the `libarchive.flags.READDISK_*` constants
|
|
lookup (bool):
|
|
when True, the C function `archive_read_disk_set_standard_lookup`
|
|
is called to enable the lookup of user and group names
|
|
pathname (str | None):
|
|
the path of the file in the archive, defaults to the source path
|
|
recursive (bool):
|
|
when False, if a path in `paths` is a directory,
|
|
only the directory itself is added.
|
|
attributes (dict): passed to `ArchiveEntry.modify()`
|
|
|
|
Raises:
|
|
ArchiveError: if a file doesn't exist or can't be accessed, or if
|
|
adding it to the archive fails
|
|
"""
|
|
write_p = self._pointer
|
|
|
|
block_size = ffi.write_get_bytes_per_block(write_p)
|
|
if block_size <= 0:
|
|
block_size = 10240 # pragma: no cover
|
|
|
|
entry = ArchiveEntry(header_codec=self.header_codec)
|
|
entry_p = entry._entry_p
|
|
destination_path = attributes.pop('pathname', None)
|
|
for path in paths:
|
|
with new_archive_read_disk(path, flags, lookup) as read_p:
|
|
while 1:
|
|
r = read_next_header2(read_p, entry_p)
|
|
if r == ARCHIVE_EOF:
|
|
break
|
|
entry_path = entry.pathname
|
|
if destination_path:
|
|
if entry_path == path:
|
|
entry_path = destination_path
|
|
else:
|
|
assert entry_path.startswith(path)
|
|
entry_path = join(
|
|
destination_path,
|
|
entry_path[len(path):].lstrip('/')
|
|
)
|
|
entry.pathname = entry_path.lstrip('/')
|
|
if attributes:
|
|
entry.modify(**attributes)
|
|
read_disk_descend(read_p)
|
|
write_header(write_p, entry_p)
|
|
if entry.isreg:
|
|
with open(entry_sourcepath(entry_p), 'rb') as f:
|
|
while 1:
|
|
data = f.read(block_size)
|
|
if not data:
|
|
break
|
|
write_data(write_p, data, len(data))
|
|
write_finish_entry(write_p)
|
|
entry_clear(entry_p)
|
|
if not recursive:
|
|
break
|
|
|
|
def add_file(self, path, **kw):
|
|
"Single-path alias of `add_files()`"
|
|
return self.add_files(path, **kw)
|
|
|
|
def add_file_from_memory(
|
|
self, entry_path, entry_size, entry_data,
|
|
filetype=FileType.REGULAR_FILE, permission=0o664,
|
|
**other_attributes
|
|
):
|
|
""""Add file from memory to archive.
|
|
|
|
Args:
|
|
entry_path (str | bytes): the file's path
|
|
entry_size (int): the file's size, in bytes
|
|
entry_data (bytes | Iterable[bytes]): the file's content
|
|
filetype (int): see `libarchive.entry.ArchiveEntry.modify()`
|
|
permission (int): see `libarchive.entry.ArchiveEntry.modify()`
|
|
other_attributes: see `libarchive.entry.ArchiveEntry.modify()`
|
|
"""
|
|
archive_pointer = self._pointer
|
|
|
|
if isinstance(entry_data, bytes):
|
|
entry_data = (entry_data,)
|
|
elif isinstance(entry_data, str):
|
|
raise TypeError(
|
|
"entry_data: expected bytes, got %r" % type(entry_data)
|
|
)
|
|
|
|
entry = ArchiveEntry(
|
|
pathname=entry_path, size=entry_size, filetype=filetype,
|
|
perm=permission, header_codec=self.header_codec,
|
|
**other_attributes
|
|
)
|
|
write_header(archive_pointer, entry._entry_p)
|
|
|
|
for chunk in entry_data:
|
|
if not chunk:
|
|
break
|
|
write_data(archive_pointer, chunk, len(chunk))
|
|
|
|
write_finish_entry(archive_pointer)
|
|
|
|
|
|
@contextmanager
|
|
def new_archive_write(format_name, filter_name=None, options='', passphrase=None):
|
|
archive_p = ffi.write_new()
|
|
try:
|
|
ffi.get_write_format_function(format_name)(archive_p)
|
|
if filter_name:
|
|
ffi.get_write_filter_function(filter_name)(archive_p)
|
|
if passphrase and 'encryption' not in options:
|
|
if format_name == 'zip':
|
|
warnings.warn(
|
|
"The default encryption scheme of zip archives is weak. "
|
|
"Use `options='encryption=$type'` to specify the encryption "
|
|
"type you want to use. The supported values are 'zipcrypt' "
|
|
"(the weak default), 'aes128' and 'aes256'."
|
|
)
|
|
options += ',encryption' if options else 'encryption'
|
|
if options:
|
|
if not isinstance(options, bytes):
|
|
options = options.encode('utf-8')
|
|
ffi.write_set_options(archive_p, options)
|
|
if passphrase:
|
|
if not isinstance(passphrase, bytes):
|
|
passphrase = passphrase.encode('utf-8')
|
|
try:
|
|
ffi.write_set_passphrase(archive_p, passphrase)
|
|
except AttributeError:
|
|
raise NotImplementedError(
|
|
f"the libarchive being used (version {ffi.version_number()}, "
|
|
f"path {ffi.libarchive_path}) doesn't support encryption"
|
|
)
|
|
yield archive_p
|
|
ffi.write_close(archive_p)
|
|
ffi.write_free(archive_p)
|
|
except Exception:
|
|
ffi.write_fail(archive_p)
|
|
ffi.write_free(archive_p)
|
|
raise
|
|
|
|
@property
|
|
def bytes_written(self):
|
|
return ffi.filter_bytes(self._pointer, -1)
|
|
|
|
|
|
@contextmanager
|
|
def custom_writer(
|
|
write_func, format_name, filter_name=None,
|
|
open_func=None, close_func=None, block_size=page_size,
|
|
archive_write_class=ArchiveWrite, options='', passphrase=None,
|
|
header_codec='utf-8',
|
|
):
|
|
"""Create an archive and send it in chunks to the `write_func` function.
|
|
|
|
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
|
|
`libarchive.ffi` module.
|
|
"""
|
|
|
|
def write_cb_internal(archive_p, context, buffer_, length):
|
|
data = cast(buffer_, POINTER(c_char * length))[0]
|
|
return write_func(data)
|
|
|
|
open_cb = OPEN_CALLBACK(open_func) if open_func else NO_OPEN_CB
|
|
write_cb = WRITE_CALLBACK(write_cb_internal)
|
|
close_cb = CLOSE_CALLBACK(close_func) if close_func else NO_CLOSE_CB
|
|
|
|
with new_archive_write(format_name, filter_name, options,
|
|
passphrase) as archive_p:
|
|
ffi.write_set_bytes_in_last_block(archive_p, 1)
|
|
ffi.write_set_bytes_per_block(archive_p, block_size)
|
|
ffi.write_open(archive_p, None, open_cb, write_cb, close_cb)
|
|
yield archive_write_class(archive_p, header_codec)
|
|
|
|
|
|
@contextmanager
|
|
def fd_writer(
|
|
fd, format_name, filter_name=None,
|
|
archive_write_class=ArchiveWrite, options='', passphrase=None,
|
|
header_codec='utf-8',
|
|
):
|
|
"""Create an archive and write it into a file descriptor.
|
|
|
|
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
|
|
`libarchive.ffi` module.
|
|
"""
|
|
with new_archive_write(format_name, filter_name, options,
|
|
passphrase) as archive_p:
|
|
ffi.write_open_fd(archive_p, fd)
|
|
yield archive_write_class(archive_p, header_codec)
|
|
|
|
|
|
@contextmanager
|
|
def file_writer(
|
|
filepath, format_name, filter_name=None,
|
|
archive_write_class=ArchiveWrite, options='', passphrase=None,
|
|
header_codec='utf-8',
|
|
):
|
|
"""Create an archive and write it into a file.
|
|
|
|
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
|
|
`libarchive.ffi` module.
|
|
"""
|
|
with new_archive_write(format_name, filter_name, options,
|
|
passphrase) as archive_p:
|
|
ffi.write_open_filename_w(archive_p, filepath)
|
|
yield archive_write_class(archive_p, header_codec)
|
|
|
|
|
|
@contextmanager
|
|
def memory_writer(
|
|
buf, format_name, filter_name=None,
|
|
archive_write_class=ArchiveWrite, options='', passphrase=None,
|
|
header_codec='utf-8',
|
|
):
|
|
"""Create an archive and write it into a buffer.
|
|
|
|
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
|
|
`libarchive.ffi` module.
|
|
"""
|
|
with new_archive_write(format_name, filter_name, options,
|
|
passphrase) as archive_p:
|
|
used = byref(c_size_t())
|
|
buf_p = cast(buf, c_void_p)
|
|
ffi.write_open_memory(archive_p, buf_p, len(buf), used)
|
|
yield archive_write_class(archive_p, header_codec)
|