Initial package contents

fixes
Vadim vtroshchinskiy 2024-11-12 13:36:01 +01:00
parent 1472ccbce6
commit 5daeb8200f
43 changed files with 3348 additions and 0 deletions

View File

@ -0,0 +1 @@
version.py export-subst

View File

@ -0,0 +1 @@
liberapay: Changaco

View File

@ -0,0 +1,36 @@
name: CI
on:
# Trigger the workflow on push or pull request events but only for the master branch
push:
branches: [ master ]
pull_request:
branches: [ master ]
# Allow running this workflow manually from the Actions tab
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install libarchive
run: sudo apt-get install -y libarchive13
- name: Install Python 3.11
uses: actions/setup-python@v2
with:
python-version: '3.11'
- name: Install Python 3.10
uses: actions/setup-python@v2
with:
python-version: '3.10'
- name: Install Python 3.9
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install Python 3.8
uses: actions/setup-python@v2
with:
python-version: '3.8'
- name: Install tox
run: pip install tox
- name: Run the tests
run: tox

View File

@ -0,0 +1,8 @@
*.egg-info/
/build/
/dist/
/env/
/htmlcov/
.coverage
*.pyc
.tox/

View File

@ -0,0 +1 @@
https://creativecommons.org/publicdomain/zero/1.0/

View File

@ -0,0 +1 @@
include version.py

View File

@ -0,0 +1,147 @@
Metadata-Version: 2.1
Name: libarchive-c
Version: 5.1
Summary: Python interface to libarchive
Home-page: https://github.com/Changaco/python-libarchive-c
Author: Changaco
Author-email: changaco@changaco.oy.lc
License: CC0
Keywords: archive libarchive 7z tar bz2 zip gz
Description-Content-Type: text/x-rst
License-File: LICENSE.md
A Python interface to libarchive. It uses the standard ctypes_ module to
dynamically load and access the C library.
.. _ctypes: https://docs.python.org/3/library/ctypes.html
Installation
============
pip install libarchive-c
Compatibility
=============
python
------
python-libarchive-c is currently tested with python 3.8, 3.9, 3.10 and 3.11.
If you find an incompatibility with older versions you can send us a small patch,
but we won't accept big changes.
libarchive
----------
python-libarchive-c may not work properly with obsolete versions of libarchive such as the ones included in MacOS. In that case you can install a recent version of libarchive (e.g. with ``brew install libarchive`` on MacOS) and use the ``LIBARCHIVE`` environment variable to point python-libarchive-c to it::
export LIBARCHIVE=/usr/local/Cellar/libarchive/3.3.3/lib/libarchive.13.dylib
Usage
=====
Import::
import libarchive
Extracting archives
-------------------
To extract an archive, use the ``extract_file`` function::
os.chdir('/path/to/target/directory')
libarchive.extract_file('test.zip')
Alternatively, the ``extract_memory`` function can be used to extract from a buffer,
and ``extract_fd`` from a file descriptor.
The ``extract_*`` functions all have an integer ``flags`` argument which is passed
directly to the C function ``archive_write_disk_set_options()``. You can import
the ``EXTRACT_*`` constants from the ``libarchive.extract`` module and see the
official description of each flag in the ``archive_write_disk(3)`` man page.
By default, when the ``flags`` argument is ``None``, the ``SECURE_NODOTDOT``,
``SECURE_NOABSOLUTEPATHS`` and ``SECURE_SYMLINKS`` flags are passed to
libarchive, unless the current directory is the root (``/``).
Reading archives
----------------
To read an archive, use the ``file_reader`` function::
with libarchive.file_reader('test.7z') as archive:
for entry in archive:
for block in entry.get_blocks():
...
Alternatively, the ``memory_reader`` function can be used to read from a buffer,
``fd_reader`` from a file descriptor, ``stream_reader`` from a stream object
(which must support the standard ``readinto`` method), and ``custom_reader``
from anywhere using callbacks.
To learn about the attributes of the ``entry`` object, see the ``libarchive/entry.py``
source code or run ``help(libarchive.entry.ArchiveEntry)`` in a Python shell.
Displaying progress
~~~~~~~~~~~~~~~~~~~
If your program processes large archives, you can keep track of its progress
with the ``bytes_read`` attribute. Here's an example of a progress bar using
`tqdm <https://pypi.org/project/tqdm/>`_::
with tqdm(total=os.stat(archive_path).st_size, unit='bytes') as pbar, \
libarchive.file_reader(archive_path) as archive:
for entry in archive:
...
pbar.update(archive.bytes_read - pbar.n)
Creating archives
-----------------
To create an archive, use the ``file_writer`` function::
from libarchive.entry import FileType
with libarchive.file_writer('test.tar.gz', 'ustar', 'gzip') as archive:
# Add the `libarchive/` directory and everything in it (recursively),
# then the `README.rst` file.
archive.add_files('libarchive/', 'README.rst')
# Add a regular file defined from scratch.
data = b'foobar'
archive.add_file_from_memory('../escape-test', len(data), data)
# Add a directory defined from scratch.
early_epoch = (42, 42) # 1970-01-01 00:00:42.000000042
archive.add_file_from_memory(
'metadata-test', 0, b'',
filetype=FileType.DIRECTORY, permission=0o755, uid=4242, gid=4242,
atime=early_epoch, mtime=early_epoch, ctime=early_epoch, birthtime=early_epoch,
)
Alternatively, the ``memory_writer`` function can be used to write to a memory buffer,
``fd_writer`` to a file descriptor, and ``custom_writer`` to a callback function.
For each of those functions, the mandatory second argument is the archive format,
and the optional third argument is the compression format (called “filter” in
libarchive). The acceptable values are listed in ``libarchive.ffi.WRITE_FORMATS``
and ``libarchive.ffi.WRITE_FILTERS``.
File metadata codecs
--------------------
By default, UTF-8 is used to read and write file attributes from and to archives.
A different codec can be specified through the ``header_codec`` arguments of the
``*_reader`` and ``*_writer`` functions. Example::
with libarchive.file_writer('test.tar', 'ustar', header_codec='cp037') as archive:
...
with file_reader('test.tar', header_codec='cp037') as archive:
...
In addition to file paths (``pathname`` and ``linkpath``), the specified codec is
used to encode and decode user and group names (``uname`` and ``gname``).
License
=======
`CC0 Public Domain Dedication <http://creativecommons.org/publicdomain/zero/1.0/>`_

View File

@ -0,0 +1,135 @@
A Python interface to libarchive. It uses the standard ctypes_ module to
dynamically load and access the C library.
.. _ctypes: https://docs.python.org/3/library/ctypes.html
Installation
============
pip install libarchive-c
Compatibility
=============
python
------
python-libarchive-c is currently tested with python 3.8, 3.9, 3.10 and 3.11.
If you find an incompatibility with older versions you can send us a small patch,
but we won't accept big changes.
libarchive
----------
python-libarchive-c may not work properly with obsolete versions of libarchive such as the ones included in MacOS. In that case you can install a recent version of libarchive (e.g. with ``brew install libarchive`` on MacOS) and use the ``LIBARCHIVE`` environment variable to point python-libarchive-c to it::
export LIBARCHIVE=/usr/local/Cellar/libarchive/3.3.3/lib/libarchive.13.dylib
Usage
=====
Import::
import libarchive
Extracting archives
-------------------
To extract an archive, use the ``extract_file`` function::
os.chdir('/path/to/target/directory')
libarchive.extract_file('test.zip')
Alternatively, the ``extract_memory`` function can be used to extract from a buffer,
and ``extract_fd`` from a file descriptor.
The ``extract_*`` functions all have an integer ``flags`` argument which is passed
directly to the C function ``archive_write_disk_set_options()``. You can import
the ``EXTRACT_*`` constants from the ``libarchive.extract`` module and see the
official description of each flag in the ``archive_write_disk(3)`` man page.
By default, when the ``flags`` argument is ``None``, the ``SECURE_NODOTDOT``,
``SECURE_NOABSOLUTEPATHS`` and ``SECURE_SYMLINKS`` flags are passed to
libarchive, unless the current directory is the root (``/``).
Reading archives
----------------
To read an archive, use the ``file_reader`` function::
with libarchive.file_reader('test.7z') as archive:
for entry in archive:
for block in entry.get_blocks():
...
Alternatively, the ``memory_reader`` function can be used to read from a buffer,
``fd_reader`` from a file descriptor, ``stream_reader`` from a stream object
(which must support the standard ``readinto`` method), and ``custom_reader``
from anywhere using callbacks.
To learn about the attributes of the ``entry`` object, see the ``libarchive/entry.py``
source code or run ``help(libarchive.entry.ArchiveEntry)`` in a Python shell.
Displaying progress
~~~~~~~~~~~~~~~~~~~
If your program processes large archives, you can keep track of its progress
with the ``bytes_read`` attribute. Here's an example of a progress bar using
`tqdm <https://pypi.org/project/tqdm/>`_::
with tqdm(total=os.stat(archive_path).st_size, unit='bytes') as pbar, \
libarchive.file_reader(archive_path) as archive:
for entry in archive:
...
pbar.update(archive.bytes_read - pbar.n)
Creating archives
-----------------
To create an archive, use the ``file_writer`` function::
from libarchive.entry import FileType
with libarchive.file_writer('test.tar.gz', 'ustar', 'gzip') as archive:
# Add the `libarchive/` directory and everything in it (recursively),
# then the `README.rst` file.
archive.add_files('libarchive/', 'README.rst')
# Add a regular file defined from scratch.
data = b'foobar'
archive.add_file_from_memory('../escape-test', len(data), data)
# Add a directory defined from scratch.
early_epoch = (42, 42) # 1970-01-01 00:00:42.000000042
archive.add_file_from_memory(
'metadata-test', 0, b'',
filetype=FileType.DIRECTORY, permission=0o755, uid=4242, gid=4242,
atime=early_epoch, mtime=early_epoch, ctime=early_epoch, birthtime=early_epoch,
)
Alternatively, the ``memory_writer`` function can be used to write to a memory buffer,
``fd_writer`` to a file descriptor, and ``custom_writer`` to a callback function.
For each of those functions, the mandatory second argument is the archive format,
and the optional third argument is the compression format (called “filter” in
libarchive). The acceptable values are listed in ``libarchive.ffi.WRITE_FORMATS``
and ``libarchive.ffi.WRITE_FILTERS``.
File metadata codecs
--------------------
By default, UTF-8 is used to read and write file attributes from and to archives.
A different codec can be specified through the ``header_codec`` arguments of the
``*_reader`` and ``*_writer`` functions. Example::
with libarchive.file_writer('test.tar', 'ustar', header_codec='cp037') as archive:
...
with file_reader('test.tar', header_codec='cp037') as archive:
...
In addition to file paths (``pathname`` and ``linkpath``), the specified codec is
used to encode and decode user and group names (``uname`` and ``gname``).
License
=======
`CC0 Public Domain Dedication <http://creativecommons.org/publicdomain/zero/1.0/>`_

View File

@ -0,0 +1,17 @@
from .entry import ArchiveEntry
from .exception import ArchiveError
from .extract import extract_fd, extract_file, extract_memory
from .read import (
custom_reader, fd_reader, file_reader, memory_reader, stream_reader,
seekable_stream_reader
)
from .write import custom_writer, fd_writer, file_writer, memory_writer
__all__ = [x.__name__ for x in (
ArchiveEntry,
ArchiveError,
extract_fd, extract_file, extract_memory,
custom_reader, fd_reader, file_reader, memory_reader, stream_reader,
seekable_stream_reader,
custom_writer, fd_writer, file_writer, memory_writer
)]

View File

@ -0,0 +1,450 @@
from contextlib import contextmanager
from ctypes import create_string_buffer
from enum import IntEnum
import math
from . import ffi
class FileType(IntEnum):
NAMED_PIPE = AE_IFIFO = 0o010000 # noqa: E221
CHAR_DEVICE = AE_IFCHR = 0o020000 # noqa: E221
DIRECTORY = AE_IFDIR = 0o040000 # noqa: E221
BLOCK_DEVICE = AE_IFBLK = 0o060000 # noqa: E221
REGULAR_FILE = AE_IFREG = 0o100000 # noqa: E221
SYMBOLINK_LINK = AE_IFLNK = 0o120000 # noqa: E221
SOCKET = AE_IFSOCK = 0o140000 # noqa: E221
@contextmanager
def new_archive_entry():
entry_p = ffi.entry_new()
try:
yield entry_p
finally:
ffi.entry_free(entry_p)
def format_time(seconds, nanos):
""" return float of seconds.nanos when nanos set, or seconds when not """
if nanos:
return float(seconds) + float(nanos) / 1000000000.0
return int(seconds)
class ArchiveEntry:
__slots__ = ('_archive_p', '_entry_p', 'header_codec')
def __init__(self, archive_p=None, header_codec='utf-8', **attributes):
"""Allocate memory for an `archive_entry` struct.
The `header_codec` is used to decode and encode file paths and other
attributes.
The `**attributes` are passed to the `modify` method.
"""
self._archive_p = archive_p
self._entry_p = ffi.entry_new()
self.header_codec = header_codec
if attributes:
self.modify(**attributes)
def __del__(self):
"""Free the C struct"""
ffi.entry_free(self._entry_p)
def __str__(self):
"""Returns the file's path"""
return self.pathname
def modify(self, header_codec=None, **attributes):
"""Convenience method to modify the entry's attributes.
Args:
filetype (int): the file's type, see the `FileType` class for values
pathname (str): the file's path
linkpath (str): the other path of the file, if the file is a link
size (int | None): the file's size, in bytes
perm (int): the file's permissions in standard Unix format, e.g. 0o640
uid (int): the file owner's numerical identifier
gid (int): the file group's numerical identifier
uname (str | bytes): the file owner's name
gname (str | bytes): the file group's name
atime (int | Tuple[int, int] | float | None):
the file's most recent access time,
either in seconds or as a tuple (seconds, nanoseconds)
mtime (int | Tuple[int, int] | float | None):
the file's most recent modification time,
either in seconds or as a tuple (seconds, nanoseconds)
ctime (int | Tuple[int, int] | float | None):
the file's most recent metadata change time,
either in seconds or as a tuple (seconds, nanoseconds)
birthtime (int | Tuple[int, int] | float | None):
the file's creation time (for archive formats that support it),
either in seconds or as a tuple (seconds, nanoseconds)
rdev (int | Tuple[int, int]): device number, if the file is a device
rdevmajor (int): major part of the device number
rdevminor (int): minor part of the device number
"""
if header_codec:
self.header_codec = header_codec
for name, value in attributes.items():
setattr(self, name, value)
@property
def filetype(self):
return ffi.entry_filetype(self._entry_p)
@filetype.setter
def filetype(self, value):
ffi.entry_set_filetype(self._entry_p, value)
@property
def uid(self):
return ffi.entry_uid(self._entry_p)
@uid.setter
def uid(self, uid):
ffi.entry_set_uid(self._entry_p, uid)
@property
def gid(self):
return ffi.entry_gid(self._entry_p)
@gid.setter
def gid(self, gid):
ffi.entry_set_gid(self._entry_p, gid)
@property
def uname(self):
uname = ffi.entry_uname_w(self._entry_p)
if not uname:
uname = ffi.entry_uname(self._entry_p)
if uname is not None:
try:
uname = uname.decode(self.header_codec)
except UnicodeError:
pass
return uname
@uname.setter
def uname(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_uname_utf8(self._entry_p, value)
else:
ffi.entry_copy_uname(self._entry_p, value)
@property
def gname(self):
gname = ffi.entry_gname_w(self._entry_p)
if not gname:
gname = ffi.entry_gname(self._entry_p)
if gname is not None:
try:
gname = gname.decode(self.header_codec)
except UnicodeError:
pass
return gname
@gname.setter
def gname(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_gname_utf8(self._entry_p, value)
else:
ffi.entry_copy_gname(self._entry_p, value)
def get_blocks(self, block_size=ffi.page_size):
"""Read the file's content, keeping only one chunk in memory at a time.
Don't do anything like `list(entry.get_blocks())`, it would silently fail.
Args:
block_size (int): the buffer's size, in bytes
"""
archive_p = self._archive_p
if not archive_p:
raise TypeError("this entry isn't linked to any content")
buf = create_string_buffer(block_size)
read = ffi.read_data
while 1:
r = read(archive_p, buf, block_size)
if r == 0:
break
yield buf.raw[0:r]
self.__class__ = ConsumedArchiveEntry
@property
def isblk(self):
return self.filetype & 0o170000 == 0o060000
@property
def ischr(self):
return self.filetype & 0o170000 == 0o020000
@property
def isdir(self):
return self.filetype & 0o170000 == 0o040000
@property
def isfifo(self):
return self.filetype & 0o170000 == 0o010000
@property
def islnk(self):
return bool(ffi.entry_hardlink_w(self._entry_p) or
ffi.entry_hardlink(self._entry_p))
@property
def issym(self):
return self.filetype & 0o170000 == 0o120000
@property
def isreg(self):
return self.filetype & 0o170000 == 0o100000
@property
def isfile(self):
return self.isreg
@property
def issock(self):
return self.filetype & 0o170000 == 0o140000
@property
def isdev(self):
return self.ischr or self.isblk or self.isfifo or self.issock
@property
def atime(self):
if not ffi.entry_atime_is_set(self._entry_p):
return None
sec_val = ffi.entry_atime(self._entry_p)
nsec_val = ffi.entry_atime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@atime.setter
def atime(self, value):
if value is None:
ffi.entry_unset_atime(self._entry_p)
elif isinstance(value, int):
self.set_atime(value)
elif isinstance(value, tuple):
self.set_atime(*value)
else:
seconds, fraction = math.modf(value)
self.set_atime(int(seconds), int(fraction * 1_000_000_000))
def set_atime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.atime = ...` is supported now."
return ffi.entry_set_atime(self._entry_p, timestamp_sec, timestamp_nsec)
@property
def mtime(self):
if not ffi.entry_mtime_is_set(self._entry_p):
return None
sec_val = ffi.entry_mtime(self._entry_p)
nsec_val = ffi.entry_mtime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@mtime.setter
def mtime(self, value):
if value is None:
ffi.entry_unset_mtime(self._entry_p)
elif isinstance(value, int):
self.set_mtime(value)
elif isinstance(value, tuple):
self.set_mtime(*value)
else:
seconds, fraction = math.modf(value)
self.set_mtime(int(seconds), int(fraction * 1_000_000_000))
def set_mtime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.mtime = ...` is supported now."
return ffi.entry_set_mtime(self._entry_p, timestamp_sec, timestamp_nsec)
@property
def ctime(self):
if not ffi.entry_ctime_is_set(self._entry_p):
return None
sec_val = ffi.entry_ctime(self._entry_p)
nsec_val = ffi.entry_ctime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@ctime.setter
def ctime(self, value):
if value is None:
ffi.entry_unset_ctime(self._entry_p)
elif isinstance(value, int):
self.set_ctime(value)
elif isinstance(value, tuple):
self.set_ctime(*value)
else:
seconds, fraction = math.modf(value)
self.set_ctime(int(seconds), int(fraction * 1_000_000_000))
def set_ctime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.ctime = ...` is supported now."
return ffi.entry_set_ctime(self._entry_p, timestamp_sec, timestamp_nsec)
@property
def birthtime(self):
if not ffi.entry_birthtime_is_set(self._entry_p):
return None
sec_val = ffi.entry_birthtime(self._entry_p)
nsec_val = ffi.entry_birthtime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@birthtime.setter
def birthtime(self, value):
if value is None:
ffi.entry_unset_birthtime(self._entry_p)
elif isinstance(value, int):
self.set_birthtime(value)
elif isinstance(value, tuple):
self.set_birthtime(*value)
else:
seconds, fraction = math.modf(value)
self.set_birthtime(int(seconds), int(fraction * 1_000_000_000))
def set_birthtime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.birthtime = ...` is supported now."
return ffi.entry_set_birthtime(
self._entry_p, timestamp_sec, timestamp_nsec
)
@property
def pathname(self):
path = ffi.entry_pathname_w(self._entry_p)
if not path:
path = ffi.entry_pathname(self._entry_p)
if path is not None:
try:
path = path.decode(self.header_codec)
except UnicodeError:
pass
return path
@pathname.setter
def pathname(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_pathname_utf8(self._entry_p, value)
else:
ffi.entry_copy_pathname(self._entry_p, value)
@property
def linkpath(self):
path = (
(
ffi.entry_symlink_w(self._entry_p) or
ffi.entry_symlink(self._entry_p)
) if self.issym else (
ffi.entry_hardlink_w(self._entry_p) or
ffi.entry_hardlink(self._entry_p)
)
)
if isinstance(path, bytes):
try:
path = path.decode(self.header_codec)
except UnicodeError:
pass
return path
@linkpath.setter
def linkpath(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_link_utf8(self._entry_p, value)
else:
ffi.entry_copy_link(self._entry_p, value)
# aliases for compatibility with the standard `tarfile` module
path = property(pathname.fget, pathname.fset, doc="alias of pathname")
name = path
linkname = property(linkpath.fget, linkpath.fset, doc="alias of linkpath")
@property
def size(self):
if ffi.entry_size_is_set(self._entry_p):
return ffi.entry_size(self._entry_p)
@size.setter
def size(self, value):
if value is None:
ffi.entry_unset_size(self._entry_p)
else:
ffi.entry_set_size(self._entry_p, value)
@property
def mode(self):
return ffi.entry_mode(self._entry_p)
@mode.setter
def mode(self, value):
ffi.entry_set_mode(self._entry_p, value)
@property
def strmode(self):
"""The file's mode as a string, e.g. '?rwxrwx---'"""
# note we strip the mode because archive_entry_strmode
# returns a trailing space: strcpy(bp, "?rwxrwxrwx ");
return ffi.entry_strmode(self._entry_p).strip()
@property
def perm(self):
return ffi.entry_perm(self._entry_p)
@perm.setter
def perm(self, value):
ffi.entry_set_perm(self._entry_p, value)
@property
def rdev(self):
return ffi.entry_rdev(self._entry_p)
@rdev.setter
def rdev(self, value):
if isinstance(value, tuple):
ffi.entry_set_rdevmajor(self._entry_p, value[0])
ffi.entry_set_rdevminor(self._entry_p, value[1])
else:
ffi.entry_set_rdev(self._entry_p, value)
@property
def rdevmajor(self):
return ffi.entry_rdevmajor(self._entry_p)
@rdevmajor.setter
def rdevmajor(self, value):
ffi.entry_set_rdevmajor(self._entry_p, value)
@property
def rdevminor(self):
return ffi.entry_rdevminor(self._entry_p)
@rdevminor.setter
def rdevminor(self, value):
ffi.entry_set_rdevminor(self._entry_p, value)
class ConsumedArchiveEntry(ArchiveEntry):
__slots__ = ()
def get_blocks(self, **kw):
raise TypeError("the content of this entry has already been read")
class PassedArchiveEntry(ArchiveEntry):
__slots__ = ()
def get_blocks(self, **kw):
raise TypeError("this entry is passed, it's too late to read its content")

View File

@ -0,0 +1,12 @@
class ArchiveError(Exception):
def __init__(self, msg, errno=None, retcode=None, archive_p=None):
self.msg = msg
self.errno = errno
self.retcode = retcode
self.archive_p = archive_p
def __str__(self):
p = '%s (errno=%s, retcode=%s, archive_p=%s)'
return p % (self.msg, self.errno, self.retcode, self.archive_p)

View File

@ -0,0 +1,88 @@
from contextlib import contextmanager
from ctypes import byref, c_longlong, c_size_t, c_void_p
import os
from .ffi import (
write_disk_new, write_disk_set_options, write_free, write_header,
read_data_block, write_data_block, write_finish_entry, ARCHIVE_EOF
)
from .read import fd_reader, file_reader, memory_reader
EXTRACT_OWNER = 0x0001
EXTRACT_PERM = 0x0002
EXTRACT_TIME = 0x0004
EXTRACT_NO_OVERWRITE = 0x0008
EXTRACT_UNLINK = 0x0010
EXTRACT_ACL = 0x0020
EXTRACT_FFLAGS = 0x0040
EXTRACT_XATTR = 0x0080
EXTRACT_SECURE_SYMLINKS = 0x0100
EXTRACT_SECURE_NODOTDOT = 0x0200
EXTRACT_NO_AUTODIR = 0x0400
EXTRACT_NO_OVERWRITE_NEWER = 0x0800
EXTRACT_SPARSE = 0x1000
EXTRACT_MAC_METADATA = 0x2000
EXTRACT_NO_HFS_COMPRESSION = 0x4000
EXTRACT_HFS_COMPRESSION_FORCED = 0x8000
EXTRACT_SECURE_NOABSOLUTEPATHS = 0x10000
EXTRACT_CLEAR_NOCHANGE_FFLAGS = 0x20000
PREVENT_ESCAPE = (
EXTRACT_SECURE_NOABSOLUTEPATHS |
EXTRACT_SECURE_NODOTDOT |
EXTRACT_SECURE_SYMLINKS
)
@contextmanager
def new_archive_write_disk(flags):
archive_p = write_disk_new()
write_disk_set_options(archive_p, flags)
try:
yield archive_p
finally:
write_free(archive_p)
def extract_entries(entries, flags=None):
"""Extracts the given archive entries into the current directory.
"""
if flags is None:
if os.getcwd() == '/':
# If the current directory is the root, then trying to prevent
# escaping is probably undesirable.
flags = 0
else:
flags = PREVENT_ESCAPE
buff, size, offset = c_void_p(), c_size_t(), c_longlong()
buff_p, size_p, offset_p = byref(buff), byref(size), byref(offset)
with new_archive_write_disk(flags) as write_p:
for entry in entries:
write_header(write_p, entry._entry_p)
read_p = entry._archive_p
while 1:
r = read_data_block(read_p, buff_p, size_p, offset_p)
if r == ARCHIVE_EOF:
break
write_data_block(write_p, buff, size, offset)
write_finish_entry(write_p)
def extract_fd(fd, flags=None):
"""Extracts an archive from a file descriptor into the current directory.
"""
with fd_reader(fd) as archive:
extract_entries(archive, flags)
def extract_file(filepath, flags=None):
"""Extracts an archive from a file into the current directory."""
with file_reader(filepath) as archive:
extract_entries(archive, flags)
def extract_memory(buffer_, flags=None):
"""Extracts an archive from memory into the current directory."""
with memory_reader(buffer_) as archive:
extract_entries(archive, flags)

View File

@ -0,0 +1,364 @@
from ctypes import (
c_char_p, c_int, c_uint, c_long, c_longlong, c_size_t, c_int64,
c_void_p, c_wchar_p, CFUNCTYPE, POINTER,
)
try:
from ctypes import c_ssize_t
except ImportError:
from ctypes import c_longlong as c_ssize_t
import ctypes
from ctypes.util import find_library
import logging
import mmap
import os
import sysconfig
from .exception import ArchiveError
logger = logging.getLogger('libarchive')
page_size = mmap.PAGESIZE
libarchive_path = os.environ.get('LIBARCHIVE') or find_library('archive')
libarchive = ctypes.cdll.LoadLibrary(libarchive_path)
# Constants
ARCHIVE_EOF = 1 # Found end of archive.
ARCHIVE_OK = 0 # Operation was successful.
ARCHIVE_RETRY = -10 # Retry might succeed.
ARCHIVE_WARN = -20 # Partial success.
ARCHIVE_FAILED = -25 # Current operation cannot complete.
ARCHIVE_FATAL = -30 # No more operations are possible.
# Callback types
WRITE_CALLBACK = CFUNCTYPE(
c_ssize_t, c_void_p, c_void_p, POINTER(c_void_p), c_size_t
)
READ_CALLBACK = CFUNCTYPE(
c_ssize_t, c_void_p, c_void_p, POINTER(c_void_p)
)
SEEK_CALLBACK = CFUNCTYPE(
c_longlong, c_void_p, c_void_p, c_longlong, c_int
)
OPEN_CALLBACK = CFUNCTYPE(c_int, c_void_p, c_void_p)
CLOSE_CALLBACK = CFUNCTYPE(c_int, c_void_p, c_void_p)
NO_OPEN_CB = ctypes.cast(None, OPEN_CALLBACK)
NO_CLOSE_CB = ctypes.cast(None, CLOSE_CALLBACK)
# Type aliases, for readability
c_archive_p = c_void_p
c_archive_entry_p = c_void_p
if sysconfig.get_config_var('SIZEOF_TIME_T') == 8:
c_time_t = c_int64
else:
c_time_t = c_long
# Helper functions
def _error_string(archive_p):
msg = error_string(archive_p)
if msg is None:
return
try:
return msg.decode('ascii')
except UnicodeDecodeError:
return msg
def archive_error(archive_p, retcode):
msg = _error_string(archive_p)
return ArchiveError(msg, errno(archive_p), retcode, archive_p)
def check_null(ret, func, args):
if ret is None:
raise ArchiveError(func.__name__+' returned NULL')
return ret
def check_int(retcode, func, args):
if retcode >= 0:
return retcode
elif retcode == ARCHIVE_WARN:
logger.warning(_error_string(args[0]))
return retcode
else:
raise archive_error(args[0], retcode)
def ffi(name, argtypes, restype, errcheck=None):
f = getattr(libarchive, 'archive_'+name)
f.argtypes = argtypes
f.restype = restype
if errcheck:
f.errcheck = errcheck
globals()[name] = f
return f
def get_read_format_function(format_name):
function_name = 'read_support_format_' + format_name
func = globals().get(function_name)
if func:
return func
try:
return ffi(function_name, [c_archive_p], c_int, check_int)
except AttributeError:
raise ValueError('the read format %r is not available' % format_name)
def get_read_filter_function(filter_name):
function_name = 'read_support_filter_' + filter_name
func = globals().get(function_name)
if func:
return func
try:
return ffi(function_name, [c_archive_p], c_int, check_int)
except AttributeError:
raise ValueError('the read filter %r is not available' % filter_name)
def get_write_format_function(format_name):
function_name = 'write_set_format_' + format_name
func = globals().get(function_name)
if func:
return func
try:
return ffi(function_name, [c_archive_p], c_int, check_int)
except AttributeError:
raise ValueError('the write format %r is not available' % format_name)
def get_write_filter_function(filter_name):
function_name = 'write_add_filter_' + filter_name
func = globals().get(function_name)
if func:
return func
try:
return ffi(function_name, [c_archive_p], c_int, check_int)
except AttributeError:
raise ValueError('the write filter %r is not available' % filter_name)
# FFI declarations
# library version
version_number = ffi('version_number', [], c_int, check_int)
# archive_util
errno = ffi('errno', [c_archive_p], c_int)
error_string = ffi('error_string', [c_archive_p], c_char_p)
ffi('filter_bytes', [c_archive_p, c_int], c_longlong)
ffi('filter_count', [c_archive_p], c_int)
ffi('filter_name', [c_archive_p, c_int], c_char_p)
ffi('format_name', [c_archive_p], c_char_p)
# archive_entry
ffi('entry_new', [], c_archive_entry_p, check_null)
ffi('entry_filetype', [c_archive_entry_p], c_int)
ffi('entry_atime', [c_archive_entry_p], c_time_t)
ffi('entry_birthtime', [c_archive_entry_p], c_time_t)
ffi('entry_mtime', [c_archive_entry_p], c_time_t)
ffi('entry_ctime', [c_archive_entry_p], c_time_t)
ffi('entry_atime_nsec', [c_archive_entry_p], c_long)
ffi('entry_birthtime_nsec', [c_archive_entry_p], c_long)
ffi('entry_mtime_nsec', [c_archive_entry_p], c_long)
ffi('entry_ctime_nsec', [c_archive_entry_p], c_long)
ffi('entry_atime_is_set', [c_archive_entry_p], c_int)
ffi('entry_birthtime_is_set', [c_archive_entry_p], c_int)
ffi('entry_mtime_is_set', [c_archive_entry_p], c_int)
ffi('entry_ctime_is_set', [c_archive_entry_p], c_int)
ffi('entry_pathname', [c_archive_entry_p], c_char_p)
ffi('entry_pathname_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_sourcepath', [c_archive_entry_p], c_char_p)
ffi('entry_size', [c_archive_entry_p], c_longlong)
ffi('entry_size_is_set', [c_archive_entry_p], c_int)
ffi('entry_mode', [c_archive_entry_p], c_int)
ffi('entry_strmode', [c_archive_entry_p], c_char_p)
ffi('entry_perm', [c_archive_entry_p], c_int)
ffi('entry_hardlink', [c_archive_entry_p], c_char_p)
ffi('entry_hardlink_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_symlink', [c_archive_entry_p], c_char_p)
ffi('entry_symlink_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_rdev', [c_archive_entry_p], c_uint)
ffi('entry_rdevmajor', [c_archive_entry_p], c_uint)
ffi('entry_rdevminor', [c_archive_entry_p], c_uint)
ffi('entry_uid', [c_archive_entry_p], c_longlong)
ffi('entry_gid', [c_archive_entry_p], c_longlong)
ffi('entry_uname', [c_archive_entry_p], c_char_p)
ffi('entry_gname', [c_archive_entry_p], c_char_p)
ffi('entry_uname_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_gname_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_set_size', [c_archive_entry_p, c_longlong], None)
ffi('entry_set_filetype', [c_archive_entry_p, c_uint], None)
ffi('entry_set_uid', [c_archive_entry_p, c_longlong], None)
ffi('entry_set_gid', [c_archive_entry_p, c_longlong], None)
ffi('entry_set_mode', [c_archive_entry_p, c_int], None)
ffi('entry_set_perm', [c_archive_entry_p, c_int], None)
ffi('entry_set_atime', [c_archive_entry_p, c_time_t, c_long], None)
ffi('entry_set_mtime', [c_archive_entry_p, c_time_t, c_long], None)
ffi('entry_set_ctime', [c_archive_entry_p, c_time_t, c_long], None)
ffi('entry_set_birthtime', [c_archive_entry_p, c_time_t, c_long], None)
ffi('entry_set_rdev', [c_archive_entry_p, c_uint], None)
ffi('entry_set_rdevmajor', [c_archive_entry_p, c_uint], None)
ffi('entry_set_rdevminor', [c_archive_entry_p, c_uint], None)
ffi('entry_unset_size', [c_archive_entry_p], None)
ffi('entry_unset_atime', [c_archive_entry_p], None)
ffi('entry_unset_mtime', [c_archive_entry_p], None)
ffi('entry_unset_ctime', [c_archive_entry_p], None)
ffi('entry_unset_birthtime', [c_archive_entry_p], None)
ffi('entry_copy_pathname', [c_archive_entry_p, c_char_p], None)
ffi('entry_update_pathname_utf8', [c_archive_entry_p, c_char_p], c_int, check_int)
ffi('entry_copy_link', [c_archive_entry_p, c_char_p], None)
ffi('entry_update_link_utf8', [c_archive_entry_p, c_char_p], c_int, check_int)
ffi('entry_copy_uname', [c_archive_entry_p, c_char_p], None)
ffi('entry_update_uname_utf8', [c_archive_entry_p, c_char_p], c_int, check_int)
ffi('entry_copy_gname', [c_archive_entry_p, c_char_p], None)
ffi('entry_update_gname_utf8', [c_archive_entry_p, c_char_p], c_int, check_int)
ffi('entry_clear', [c_archive_entry_p], c_archive_entry_p)
ffi('entry_free', [c_archive_entry_p], None)
# archive_read
ffi('read_new', [], c_archive_p, check_null)
READ_FORMATS = set((
'7zip', 'all', 'ar', 'cab', 'cpio', 'empty', 'iso9660', 'lha', 'mtree',
'rar', 'raw', 'tar', 'xar', 'zip', 'warc'
))
for f_name in list(READ_FORMATS):
try:
get_read_format_function(f_name)
except ValueError as e: # pragma: no cover
logger.info(str(e))
READ_FORMATS.remove(f_name)
READ_FILTERS = set((
'all', 'bzip2', 'compress', 'grzip', 'gzip', 'lrzip', 'lzip', 'lzma',
'lzop', 'none', 'rpm', 'uu', 'xz', 'lz4', 'zstd'
))
for f_name in list(READ_FILTERS):
try:
get_read_filter_function(f_name)
except ValueError as e: # pragma: no cover
logger.info(str(e))
READ_FILTERS.remove(f_name)
ffi('read_set_seek_callback', [c_archive_p, SEEK_CALLBACK], c_int, check_int)
ffi('read_open',
[c_archive_p, c_void_p, OPEN_CALLBACK, READ_CALLBACK, CLOSE_CALLBACK],
c_int, check_int)
ffi('read_open_fd', [c_archive_p, c_int, c_size_t], c_int, check_int)
ffi('read_open_filename_w', [c_archive_p, c_wchar_p, c_size_t],
c_int, check_int)
ffi('read_open_memory', [c_archive_p, c_void_p, c_size_t], c_int, check_int)
ffi('read_next_header', [c_archive_p, POINTER(c_void_p)], c_int, check_int)
ffi('read_next_header2', [c_archive_p, c_void_p], c_int, check_int)
ffi('read_close', [c_archive_p], c_int, check_int)
ffi('read_free', [c_archive_p], c_int, check_int)
# archive_read_disk
ffi('read_disk_new', [], c_archive_p, check_null)
ffi('read_disk_set_behavior', [c_archive_p, c_int], c_int, check_int)
ffi('read_disk_set_standard_lookup', [c_archive_p], c_int, check_int)
ffi('read_disk_open', [c_archive_p, c_char_p], c_int, check_int)
ffi('read_disk_open_w', [c_archive_p, c_wchar_p], c_int, check_int)
ffi('read_disk_descend', [c_archive_p], c_int, check_int)
# archive_read_data
ffi('read_data_block',
[c_archive_p, POINTER(c_void_p), POINTER(c_size_t), POINTER(c_longlong)],
c_int, check_int)
ffi('read_data', [c_archive_p, c_void_p, c_size_t], c_ssize_t, check_int)
ffi('read_data_skip', [c_archive_p], c_int, check_int)
# archive_write
ffi('write_new', [], c_archive_p, check_null)
ffi('write_set_options', [c_archive_p, c_char_p], c_int, check_int)
ffi('write_disk_new', [], c_archive_p, check_null)
ffi('write_disk_set_options', [c_archive_p, c_int], c_int, check_int)
WRITE_FORMATS = set((
'7zip', 'ar_bsd', 'ar_svr4', 'cpio', 'cpio_newc', 'gnutar', 'iso9660',
'mtree', 'mtree_classic', 'pax', 'pax_restricted', 'shar', 'shar_dump',
'ustar', 'v7tar', 'xar', 'zip', 'warc'
))
for f_name in list(WRITE_FORMATS):
try:
get_write_format_function(f_name)
except ValueError as e: # pragma: no cover
logger.info(str(e))
WRITE_FORMATS.remove(f_name)
WRITE_FILTERS = set((
'b64encode', 'bzip2', 'compress', 'grzip', 'gzip', 'lrzip', 'lzip', 'lzma',
'lzop', 'uuencode', 'xz', 'lz4', 'zstd'
))
for f_name in list(WRITE_FILTERS):
try:
get_write_filter_function(f_name)
except ValueError as e: # pragma: no cover
logger.info(str(e))
WRITE_FILTERS.remove(f_name)
ffi('write_open',
[c_archive_p, c_void_p, OPEN_CALLBACK, WRITE_CALLBACK, CLOSE_CALLBACK],
c_int, check_int)
ffi('write_open_fd', [c_archive_p, c_int], c_int, check_int)
ffi('write_open_filename', [c_archive_p, c_char_p], c_int, check_int)
ffi('write_open_filename_w', [c_archive_p, c_wchar_p], c_int, check_int)
ffi('write_open_memory',
[c_archive_p, c_void_p, c_size_t, POINTER(c_size_t)],
c_int, check_int)
ffi('write_get_bytes_in_last_block', [c_archive_p], c_int, check_int)
ffi('write_get_bytes_per_block', [c_archive_p], c_int, check_int)
ffi('write_set_bytes_in_last_block', [c_archive_p, c_int], c_int, check_int)
ffi('write_set_bytes_per_block', [c_archive_p, c_int], c_int, check_int)
ffi('write_header', [c_archive_p, c_void_p], c_int, check_int)
ffi('write_data', [c_archive_p, c_void_p, c_size_t], c_ssize_t, check_int)
ffi('write_data_block', [c_archive_p, c_void_p, c_size_t, c_longlong],
c_int, check_int)
ffi('write_finish_entry', [c_archive_p], c_int, check_int)
ffi('write_fail', [c_archive_p], c_int, check_int)
ffi('write_close', [c_archive_p], c_int, check_int)
ffi('write_free', [c_archive_p], c_int, check_int)
# archive encryption
try:
ffi('read_add_passphrase', [c_archive_p, c_char_p], c_int, check_int)
ffi('write_set_passphrase', [c_archive_p, c_char_p], c_int, check_int)
except AttributeError:
logger.info(
f"the libarchive being used (version {version_number()}, "
f"path {libarchive_path}) doesn't support encryption"
)

View File

@ -0,0 +1,7 @@
READDISK_RESTORE_ATIME = 0x0001
READDISK_HONOR_NODUMP = 0x0002
READDISK_MAC_COPYFILE = 0x0004
READDISK_NO_TRAVERSE_MOUNTS = 0x0008
READDISK_NO_XATTR = 0x0010
READDISK_NO_ACL = 0x0020
READDISK_NO_FFLAGS = 0x0040

View File

@ -0,0 +1,176 @@
from contextlib import contextmanager
from ctypes import cast, c_void_p, POINTER, create_string_buffer
from os import fstat, stat
from . import ffi
from .ffi import (
ARCHIVE_EOF, OPEN_CALLBACK, READ_CALLBACK, CLOSE_CALLBACK, SEEK_CALLBACK,
NO_OPEN_CB, NO_CLOSE_CB, page_size,
)
from .entry import ArchiveEntry, PassedArchiveEntry
class ArchiveRead:
def __init__(self, archive_p, header_codec='utf-8'):
self._pointer = archive_p
self.header_codec = header_codec
def __iter__(self):
"""Iterates through an archive's entries.
"""
archive_p = self._pointer
header_codec = self.header_codec
read_next_header2 = ffi.read_next_header2
while 1:
entry = ArchiveEntry(archive_p, header_codec)
r = read_next_header2(archive_p, entry._entry_p)
if r == ARCHIVE_EOF:
return
yield entry
entry.__class__ = PassedArchiveEntry
@property
def bytes_read(self):
return ffi.filter_bytes(self._pointer, -1)
@property
def filter_names(self):
count = ffi.filter_count(self._pointer)
return [ffi.filter_name(self._pointer, i) for i in range(count - 1)]
@property
def format_name(self):
return ffi.format_name(self._pointer)
@contextmanager
def new_archive_read(format_name='all', filter_name='all', passphrase=None):
"""Creates an archive struct suitable for reading from an archive.
Returns a pointer if successful. Raises ArchiveError on error.
"""
archive_p = ffi.read_new()
try:
if passphrase:
if not isinstance(passphrase, bytes):
passphrase = passphrase.encode('utf-8')
try:
ffi.read_add_passphrase(archive_p, passphrase)
except AttributeError:
raise NotImplementedError(
f"the libarchive being used (version {ffi.version_number()}, "
f"path {ffi.libarchive_path}) doesn't support encryption"
)
ffi.get_read_filter_function(filter_name)(archive_p)
ffi.get_read_format_function(format_name)(archive_p)
yield archive_p
finally:
ffi.read_free(archive_p)
@contextmanager
def custom_reader(
read_func, format_name='all', filter_name='all',
open_func=None, seek_func=None, close_func=None,
block_size=page_size, archive_read_class=ArchiveRead, passphrase=None,
header_codec='utf-8',
):
"""Read an archive using a custom function.
"""
open_cb = OPEN_CALLBACK(open_func) if open_func else NO_OPEN_CB
read_cb = READ_CALLBACK(read_func)
close_cb = CLOSE_CALLBACK(close_func) if close_func else NO_CLOSE_CB
seek_cb = SEEK_CALLBACK(seek_func)
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
if seek_func:
ffi.read_set_seek_callback(archive_p, seek_cb)
ffi.read_open(archive_p, None, open_cb, read_cb, close_cb)
yield archive_read_class(archive_p, header_codec)
@contextmanager
def fd_reader(
fd, format_name='all', filter_name='all', block_size=4096, passphrase=None,
header_codec='utf-8',
):
"""Read an archive from a file descriptor.
"""
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
try:
block_size = fstat(fd).st_blksize
except (OSError, AttributeError): # pragma: no cover
pass
ffi.read_open_fd(archive_p, fd, block_size)
yield ArchiveRead(archive_p, header_codec)
@contextmanager
def file_reader(
path, format_name='all', filter_name='all', block_size=4096, passphrase=None,
header_codec='utf-8',
):
"""Read an archive from a file.
"""
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
try:
block_size = stat(path).st_blksize
except (OSError, AttributeError): # pragma: no cover
pass
ffi.read_open_filename_w(archive_p, path, block_size)
yield ArchiveRead(archive_p, header_codec)
@contextmanager
def memory_reader(
buf, format_name='all', filter_name='all', passphrase=None,
header_codec='utf-8',
):
"""Read an archive from memory.
"""
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
ffi.read_open_memory(archive_p, cast(buf, c_void_p), len(buf))
yield ArchiveRead(archive_p, header_codec)
@contextmanager
def stream_reader(
stream, format_name='all', filter_name='all', block_size=page_size,
passphrase=None, header_codec='utf-8',
):
"""Read an archive from a stream.
The `stream` object must support the standard `readinto` method.
If `stream.seekable()` returns `True`, then an appropriate seek callback is
passed to libarchive.
"""
buf = create_string_buffer(block_size)
buf_p = cast(buf, c_void_p)
def read_func(archive_p, context, ptrptr):
# readinto the buffer, returns number of bytes read
length = stream.readinto(buf)
# write the address of the buffer into the pointer
ptrptr = cast(ptrptr, POINTER(c_void_p))
ptrptr[0] = buf_p
# tell libarchive how much data was written into the buffer
return length
def seek_func(archive_p, context, offset, whence):
stream.seek(offset, whence)
# tell libarchive the current position
return stream.tell()
open_cb = NO_OPEN_CB
read_cb = READ_CALLBACK(read_func)
close_cb = NO_CLOSE_CB
seek_cb = SEEK_CALLBACK(seek_func)
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
if stream.seekable():
ffi.read_set_seek_callback(archive_p, seek_cb)
ffi.read_open(archive_p, None, open_cb, read_cb, close_cb)
yield ArchiveRead(archive_p, header_codec)
seekable_stream_reader = stream_reader

View File

@ -0,0 +1,279 @@
from contextlib import contextmanager
from ctypes import byref, cast, c_char, c_size_t, c_void_p, POINTER
from posixpath import join
import warnings
from . import ffi
from .entry import ArchiveEntry, FileType
from .ffi import (
OPEN_CALLBACK, WRITE_CALLBACK, CLOSE_CALLBACK, NO_OPEN_CB, NO_CLOSE_CB,
ARCHIVE_EOF,
page_size, entry_sourcepath, entry_clear, read_disk_new, read_disk_open_w,
read_next_header2, read_disk_descend, read_free, write_header, write_data,
write_finish_entry,
read_disk_set_behavior
)
@contextmanager
def new_archive_read_disk(path, flags=0, lookup=False):
archive_p = read_disk_new()
read_disk_set_behavior(archive_p, flags)
if lookup:
ffi.read_disk_set_standard_lookup(archive_p)
read_disk_open_w(archive_p, path)
try:
yield archive_p
finally:
read_free(archive_p)
class ArchiveWrite:
def __init__(self, archive_p, header_codec='utf-8'):
self._pointer = archive_p
self.header_codec = header_codec
def add_entries(self, entries):
"""Add the given entries to the archive.
"""
write_p = self._pointer
for entry in entries:
write_header(write_p, entry._entry_p)
for block in entry.get_blocks():
write_data(write_p, block, len(block))
write_finish_entry(write_p)
def add_files(
self, *paths, flags=0, lookup=False, pathname=None, recursive=True,
**attributes
):
"""Read files through the OS and add them to the archive.
Args:
paths (str): the paths of the files to add to the archive
flags (int):
passed to the C function `archive_read_disk_set_behavior`;
use the `libarchive.flags.READDISK_*` constants
lookup (bool):
when True, the C function `archive_read_disk_set_standard_lookup`
is called to enable the lookup of user and group names
pathname (str | None):
the path of the file in the archive, defaults to the source path
recursive (bool):
when False, if a path in `paths` is a directory,
only the directory itself is added.
attributes (dict): passed to `ArchiveEntry.modify()`
Raises:
ArchiveError: if a file doesn't exist or can't be accessed, or if
adding it to the archive fails
"""
write_p = self._pointer
block_size = ffi.write_get_bytes_per_block(write_p)
if block_size <= 0:
block_size = 10240 # pragma: no cover
entry = ArchiveEntry(header_codec=self.header_codec)
entry_p = entry._entry_p
destination_path = attributes.pop('pathname', None)
for path in paths:
with new_archive_read_disk(path, flags, lookup) as read_p:
while 1:
r = read_next_header2(read_p, entry_p)
if r == ARCHIVE_EOF:
break
entry_path = entry.pathname
if destination_path:
if entry_path == path:
entry_path = destination_path
else:
assert entry_path.startswith(path)
entry_path = join(
destination_path,
entry_path[len(path):].lstrip('/')
)
entry.pathname = entry_path.lstrip('/')
if attributes:
entry.modify(**attributes)
read_disk_descend(read_p)
write_header(write_p, entry_p)
if entry.isreg:
with open(entry_sourcepath(entry_p), 'rb') as f:
while 1:
data = f.read(block_size)
if not data:
break
write_data(write_p, data, len(data))
write_finish_entry(write_p)
entry_clear(entry_p)
if not recursive:
break
def add_file(self, path, **kw):
"Single-path alias of `add_files()`"
return self.add_files(path, **kw)
def add_file_from_memory(
self, entry_path, entry_size, entry_data,
filetype=FileType.REGULAR_FILE, permission=0o664,
**other_attributes
):
""""Add file from memory to archive.
Args:
entry_path (str | bytes): the file's path
entry_size (int): the file's size, in bytes
entry_data (bytes | Iterable[bytes]): the file's content
filetype (int): see `libarchive.entry.ArchiveEntry.modify()`
permission (int): see `libarchive.entry.ArchiveEntry.modify()`
other_attributes: see `libarchive.entry.ArchiveEntry.modify()`
"""
archive_pointer = self._pointer
if isinstance(entry_data, bytes):
entry_data = (entry_data,)
elif isinstance(entry_data, str):
raise TypeError(
"entry_data: expected bytes, got %r" % type(entry_data)
)
entry = ArchiveEntry(
pathname=entry_path, size=entry_size, filetype=filetype,
perm=permission, header_codec=self.header_codec,
**other_attributes
)
write_header(archive_pointer, entry._entry_p)
for chunk in entry_data:
if not chunk:
break
write_data(archive_pointer, chunk, len(chunk))
write_finish_entry(archive_pointer)
@contextmanager
def new_archive_write(format_name, filter_name=None, options='', passphrase=None):
archive_p = ffi.write_new()
try:
ffi.get_write_format_function(format_name)(archive_p)
if filter_name:
ffi.get_write_filter_function(filter_name)(archive_p)
if passphrase and 'encryption' not in options:
if format_name == 'zip':
warnings.warn(
"The default encryption scheme of zip archives is weak. "
"Use `options='encryption=$type'` to specify the encryption "
"type you want to use. The supported values are 'zipcrypt' "
"(the weak default), 'aes128' and 'aes256'."
)
options += ',encryption' if options else 'encryption'
if options:
if not isinstance(options, bytes):
options = options.encode('utf-8')
ffi.write_set_options(archive_p, options)
if passphrase:
if not isinstance(passphrase, bytes):
passphrase = passphrase.encode('utf-8')
try:
ffi.write_set_passphrase(archive_p, passphrase)
except AttributeError:
raise NotImplementedError(
f"the libarchive being used (version {ffi.version_number()}, "
f"path {ffi.libarchive_path}) doesn't support encryption"
)
yield archive_p
ffi.write_close(archive_p)
ffi.write_free(archive_p)
except Exception:
ffi.write_fail(archive_p)
ffi.write_free(archive_p)
raise
@property
def bytes_written(self):
return ffi.filter_bytes(self._pointer, -1)
@contextmanager
def custom_writer(
write_func, format_name, filter_name=None,
open_func=None, close_func=None, block_size=page_size,
archive_write_class=ArchiveWrite, options='', passphrase=None,
header_codec='utf-8',
):
"""Create an archive and send it in chunks to the `write_func` function.
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
`libarchive.ffi` module.
"""
def write_cb_internal(archive_p, context, buffer_, length):
data = cast(buffer_, POINTER(c_char * length))[0]
return write_func(data)
open_cb = OPEN_CALLBACK(open_func) if open_func else NO_OPEN_CB
write_cb = WRITE_CALLBACK(write_cb_internal)
close_cb = CLOSE_CALLBACK(close_func) if close_func else NO_CLOSE_CB
with new_archive_write(format_name, filter_name, options,
passphrase) as archive_p:
ffi.write_set_bytes_in_last_block(archive_p, 1)
ffi.write_set_bytes_per_block(archive_p, block_size)
ffi.write_open(archive_p, None, open_cb, write_cb, close_cb)
yield archive_write_class(archive_p, header_codec)
@contextmanager
def fd_writer(
fd, format_name, filter_name=None,
archive_write_class=ArchiveWrite, options='', passphrase=None,
header_codec='utf-8',
):
"""Create an archive and write it into a file descriptor.
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
`libarchive.ffi` module.
"""
with new_archive_write(format_name, filter_name, options,
passphrase) as archive_p:
ffi.write_open_fd(archive_p, fd)
yield archive_write_class(archive_p, header_codec)
@contextmanager
def file_writer(
filepath, format_name, filter_name=None,
archive_write_class=ArchiveWrite, options='', passphrase=None,
header_codec='utf-8',
):
"""Create an archive and write it into a file.
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
`libarchive.ffi` module.
"""
with new_archive_write(format_name, filter_name, options,
passphrase) as archive_p:
ffi.write_open_filename_w(archive_p, filepath)
yield archive_write_class(archive_p, header_codec)
@contextmanager
def memory_writer(
buf, format_name, filter_name=None,
archive_write_class=ArchiveWrite, options='', passphrase=None,
header_codec='utf-8',
):
"""Create an archive and write it into a buffer.
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
`libarchive.ffi` module.
"""
with new_archive_write(format_name, filter_name, options,
passphrase) as archive_p:
used = byref(c_size_t())
buf_p = cast(buf, c_void_p)
ffi.write_open_memory(archive_p, buf_p, len(buf), used)
yield archive_write_class(archive_p, header_codec)

View File

@ -0,0 +1,12 @@
[wheel]
universal = 1
[flake8]
exclude = .?*,env*/
ignore = E226,E731,W504
max-line-length = 85
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,25 @@
import os
from os.path import join, dirname
from setuptools import setup, find_packages
from version import get_version
os.umask(0o022)
with open(join(dirname(__file__), 'README.rst'), encoding="utf-8") as f:
README = f.read()
setup(
name='libarchive-c',
version=get_version(),
description='Python interface to libarchive',
author='Changaco',
author_email='changaco@changaco.oy.lc',
url='https://github.com/Changaco/python-libarchive-c',
license='CC0',
packages=find_packages(exclude=['tests']),
long_description=README,
long_description_content_type='text/x-rst',
keywords='archive libarchive 7z tar bz2 zip gz',
)

View File

@ -0,0 +1,136 @@
from contextlib import closing, contextmanager
from copy import copy
from os import chdir, getcwd, stat, walk
from os.path import abspath, dirname, join
from stat import S_ISREG
import tarfile
try:
from stat import filemode
except ImportError: # Python 2
filemode = tarfile.filemode
from libarchive import file_reader
data_dir = join(dirname(__file__), 'data')
def check_archive(archive, tree):
tree2 = copy(tree)
for e in archive:
epath = str(e).rstrip('/')
assert epath in tree2
estat = tree2.pop(epath)
assert e.mtime == int(estat['mtime'])
if not e.isdir:
size = e.size
if size is not None:
assert size == estat['size']
with open(epath, 'rb') as f:
for block in e.get_blocks():
assert f.read(len(block)) == block
leftover = f.read()
assert not leftover
# Check that there are no missing directories or files
assert len(tree2) == 0
def get_entries(location):
"""
Using the archive file at `location`, return an iterable of name->value
mappings for each libarchive.ArchiveEntry objects essential attributes.
Paths are base64-encoded because JSON is UTF-8 and cannot handle
arbitrary binary pathdata.
"""
with file_reader(location) as arch:
for entry in arch:
# libarchive introduces prefixes such as h prefix for
# hardlinks: tarfile does not, so we ignore the first char
mode = entry.strmode[1:].decode('ascii')
yield {
'path': surrogate_decode(entry.pathname),
'mtime': entry.mtime,
'size': entry.size,
'mode': mode,
'isreg': entry.isreg,
'isdir': entry.isdir,
'islnk': entry.islnk,
'issym': entry.issym,
'linkpath': surrogate_decode(entry.linkpath),
'isblk': entry.isblk,
'ischr': entry.ischr,
'isfifo': entry.isfifo,
'isdev': entry.isdev,
'uid': entry.uid,
'gid': entry.gid
}
def get_tarinfos(location):
"""
Using the tar archive file at `location`, return an iterable of
name->value mappings for each tarfile.TarInfo objects essential
attributes.
Paths are base64-encoded because JSON is UTF-8 and cannot handle
arbitrary binary pathdata.
"""
with closing(tarfile.open(location)) as tar:
for entry in tar:
path = surrogate_decode(entry.path or '')
if entry.isdir() and not path.endswith('/'):
path += '/'
# libarchive introduces prefixes such as h prefix for
# hardlinks: tarfile does not, so we ignore the first char
mode = filemode(entry.mode)[1:]
yield {
'path': path,
'mtime': entry.mtime,
'size': entry.size,
'mode': mode,
'isreg': entry.isreg(),
'isdir': entry.isdir(),
'islnk': entry.islnk(),
'issym': entry.issym(),
'linkpath': surrogate_decode(entry.linkpath or None),
'isblk': entry.isblk(),
'ischr': entry.ischr(),
'isfifo': entry.isfifo(),
'isdev': entry.isdev(),
'uid': entry.uid,
'gid': entry.gid
}
@contextmanager
def in_dir(dirpath):
prev = abspath(getcwd())
chdir(dirpath)
try:
yield
finally:
chdir(prev)
def stat_dict(path):
keys = set(('uid', 'gid', 'mtime'))
mode, _, _, _, uid, gid, size, _, mtime, _ = stat(path)
if S_ISREG(mode):
keys.add('size')
return {k: v for k, v in locals().items() if k in keys}
def treestat(d, stat_dict=stat_dict):
r = {}
for dirpath, dirnames, filenames in walk(d):
r[dirpath] = stat_dict(dirpath)
for fname in filenames:
fpath = join(dirpath, fname)
r[fpath] = stat_dict(fpath)
return r
def surrogate_decode(o):
if isinstance(o, bytes):
return o.decode('utf8', errors='surrogateescape')
return o

View File

@ -0,0 +1,3 @@
This test file is borrowed from Python codebase and test suite.
This is a trick Tar with several weird and malformed entries:
https://hg.python.org/cpython/file/bff88c866886/Lib/test/testtar.tar

View File

@ -0,0 +1,665 @@
[
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/conttype",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/regtype",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxr-xr-x",
"mtime": 1041808783,
"path": "ustar/dirtype/",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxr-xr-x",
"mtime": 1041808783,
"path": "ustar/dirtype-with-size/",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": true,
"isreg": false,
"issym": false,
"linkpath": "ustar/regtype",
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/lnktype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": true,
"linkpath": "regtype",
"mode": "rwxrwxrwx",
"mtime": 1041808783,
"path": "ustar/symtype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": true,
"ischr": false,
"isdev": true,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rw-rw----",
"mtime": 1041808783,
"path": "ustar/blktype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": true,
"isdev": true,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rw-rw-rw-",
"mtime": 1041808783,
"path": "ustar/chrtype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": true,
"isdir": false,
"isfifo": true,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/fifotype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/sparse",
"size": 86016,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/umlauts-\udcc4\udcd6\udcdc\udce4\udcf6\udcfc\udcdf",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/1234567/longname",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": true,
"linkpath": "../linktest1/regtype",
"mode": "rwxrwxrwx",
"mtime": 1041808783,
"path": "./ustar/linktest2/symtype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/linktest1/regtype",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": true,
"isreg": false,
"issym": false,
"linkpath": "./ustar/linktest1/regtype",
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "./ustar/linktest2/lnktype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": true,
"linkpath": "ustar/regtype",
"mode": "rwxrwxrwx",
"mtime": 1041808783,
"path": "symtype2",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longname",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": true,
"isreg": false,
"issym": false,
"linkpath": "gnu/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longname",
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longlink",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/sparse",
"size": 86016,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/sparse-0.0",
"size": 86016,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/sparse-0.1",
"size": 86016,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/sparse-1.0",
"size": 86016,
"uid": 1000
},
{
"gid": 4294967295,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/regtype-gnu-uid",
"size": 7011,
"uid": 4294967295
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-old-v7",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-hpux-signed-chksum-\udcc4\udcd6\udcdc\udce4\udcf6\udcfc\udcdf",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-old-v7-signed-chksum-\udcc4\udcd6\udcdc\udce4\udcf6\udcfc\udcdf",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxr-xr-x",
"mtime": 1041808783,
"path": "misc/dirtype-old-v7/",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-suntar",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-xstar",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longname",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": true,
"isreg": false,
"issym": false,
"linkpath": "pax/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longname",
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longlink",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/umlauts-\u00c4\u00d6\u00dc\u00e4\u00f6\u00fc\u00df",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/regtype1",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/regtype2",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/regtype3",
"size": 7011,
"uid": 1000
},
{
"gid": 123,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/regtype4",
"size": 7011,
"uid": 123
},
{
"gid": 1000,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/bad-pax-\udce4\udcf6\udcfc",
"size": 7011,
"uid": 1000
},
{
"gid": 0,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/hdrcharset-\udce4\udcf6\udcfc",
"size": 7011,
"uid": 0
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/eof",
"size": 0,
"uid": 1000
}
]

View File

@ -0,0 +1,53 @@
[
{
"gid": 513,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwx------",
"mtime": 1319027321,
"path": "2859/",
"size": 0,
"uid": 500
},
{
"gid": 513,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rwx------",
"mtime": 1319027194,
"path": "2859/Copy of h\u00e0nz\u00ec-somefile.txt",
"size": 0,
"uid": 500
},
{
"gid": 513,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rwx------",
"mtime": 1319027194,
"path": "2859/h\u00e0nz\u00ec?-somefile.txt ",
"size": 0,
"uid": 500
}
]

View File

@ -0,0 +1,36 @@
[
{
"gid": 1000,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxr-xr-x",
"mtime": 1268678396,
"path": "a/",
"size": 0,
"uid": 1000
},
{
"gid": 1000,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1268678259,
"path": "a/gr\u00fcn.png",
"size": 362,
"uid": 1000
}
]

View File

@ -0,0 +1,36 @@
[
{
"gid": 0,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxrwxr-x",
"mtime": 1381752672,
"path": "a/",
"size": 0,
"uid": 0
},
{
"gid": 0,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-rw-r--",
"mtime": 1268681860,
"path": "a/gru\u0308n.png",
"size": 362,
"uid": 0
}
]

View File

@ -0,0 +1,3 @@
Test file from borrowed from
https://github.com/libarchive/libarchive/issues/459
http://libarchive.github.io/google-code/issue-350/comment-0/%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%A8.zip

View File

@ -0,0 +1,36 @@
[
{
"gid": 502,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-rw-r--",
"mtime": 1390485689,
"path": "hello.txt",
"size": 14,
"uid": 502
},
{
"gid": 502,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-rw-r--",
"mtime": 1390485651,
"path": "\ud504\ub85c\uadf8\ub7a8.txt",
"size": 13,
"uid": 502
}
]

View File

@ -0,0 +1,127 @@
from copy import copy
from os import stat
from libarchive import (file_reader, file_writer, memory_reader, memory_writer)
import pytest
from . import treestat
# NOTE: zip does not support high resolution time data, but pax and others do
def check_atime_ctime(archive, tree, timefmt=int):
tree2 = copy(tree)
for entry in archive:
epath = str(entry).rstrip('/')
assert epath in tree2
estat = tree2.pop(epath)
assert entry.atime == timefmt(estat.st_atime)
assert entry.ctime == timefmt(estat.st_ctime)
def stat_dict(path):
# return the raw stat output, the tuple output only returns ints
return stat(path)
def time_check(time_tuple, timefmt):
seconds, nanos = time_tuple
maths = float(seconds) + float(nanos) / 1000000000.0
return timefmt(maths)
@pytest.mark.parametrize('archfmt,timefmt', [('zip', int), ('pax', float)])
def test_memory_atime_ctime(archfmt, timefmt):
# Collect information on what should be in the archive
tree = treestat('libarchive', stat_dict)
# Create an archive of our libarchive/ directory
buf = bytes(bytearray(1000000))
with memory_writer(buf, archfmt) as archive1:
archive1.add_files('libarchive/')
# Check the data
with memory_reader(buf) as archive2:
check_atime_ctime(archive2, tree, timefmt=timefmt)
@pytest.mark.parametrize('archfmt,timefmt', [('zip', int), ('pax', float)])
def test_file_atime_ctime(archfmt, timefmt, tmpdir):
archive_path = "{0}/test.{1}".format(tmpdir.strpath, archfmt)
# Collect information on what should be in the archive
tree = treestat('libarchive', stat_dict)
# Create an archive of our libarchive/ directory
with file_writer(archive_path, archfmt) as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
with file_reader(archive_path) as archive:
check_atime_ctime(archive, tree, timefmt=timefmt)
@pytest.mark.parametrize('archfmt,timefmt', [('zip', int), ('pax', float)])
def test_memory_time_setters(archfmt, timefmt):
has_birthtime = archfmt != 'zip'
# Create an archive of our libarchive/ directory
buf = bytes(bytearray(1000000))
with memory_writer(buf, archfmt) as archive1:
archive1.add_files('libarchive/')
atimestamp = (1482144741, 495628118)
mtimestamp = (1482155417, 659017086)
ctimestamp = (1482145211, 536858081)
btimestamp = (1482144740, 495628118)
buf2 = bytes(bytearray(1000000))
with memory_reader(buf) as archive1:
with memory_writer(buf2, archfmt) as archive2:
for entry in archive1:
entry.set_atime(*atimestamp)
entry.set_mtime(*mtimestamp)
entry.set_ctime(*ctimestamp)
if has_birthtime:
entry.set_birthtime(*btimestamp)
archive2.add_entries([entry])
with memory_reader(buf2) as archive2:
for entry in archive2:
assert entry.atime == time_check(atimestamp, timefmt)
assert entry.mtime == time_check(mtimestamp, timefmt)
assert entry.ctime == time_check(ctimestamp, timefmt)
if has_birthtime:
assert entry.birthtime == time_check(btimestamp, timefmt)
@pytest.mark.parametrize('archfmt,timefmt', [('zip', int), ('pax', float)])
def test_file_time_setters(archfmt, timefmt, tmpdir):
has_birthtime = archfmt != 'zip'
# Create an archive of our libarchive/ directory
archive_path = tmpdir.join('/test.{0}'.format(archfmt)).strpath
archive2_path = tmpdir.join('/test2.{0}'.format(archfmt)).strpath
with file_writer(archive_path, archfmt) as archive1:
archive1.add_files('libarchive/')
atimestamp = (1482144741, 495628118)
mtimestamp = (1482155417, 659017086)
ctimestamp = (1482145211, 536858081)
btimestamp = (1482144740, 495628118)
with file_reader(archive_path) as archive1:
with file_writer(archive2_path, archfmt) as archive2:
for entry in archive1:
entry.set_atime(*atimestamp)
entry.set_mtime(*mtimestamp)
entry.set_ctime(*ctimestamp)
if has_birthtime:
entry.set_birthtime(*btimestamp)
archive2.add_entries([entry])
with file_reader(archive2_path) as archive2:
for entry in archive2:
assert entry.atime == time_check(atimestamp, timefmt)
assert entry.mtime == time_check(mtimestamp, timefmt)
assert entry.ctime == time_check(ctimestamp, timefmt)
if has_birthtime:
assert entry.birthtime == time_check(btimestamp, timefmt)

View File

@ -0,0 +1,24 @@
from libarchive import memory_reader, memory_writer
from . import check_archive, treestat
def test_convert():
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
buf = bytes(bytearray(1000000))
with memory_writer(buf, 'gnutar', 'xz') as archive1:
archive1.add_files('libarchive/')
# Convert the archive to another format
buf2 = bytes(bytearray(1000000))
with memory_reader(buf) as archive1:
with memory_writer(buf2, 'zip') as archive2:
archive2.add_entries(archive1)
# Check the data
with memory_reader(buf2) as archive2:
check_archive(archive2, tree)

View File

@ -0,0 +1,151 @@
# -*- coding: utf-8 -*-
from codecs import open
import json
import locale
from os import environ, stat
from os.path import join
import unicodedata
import pytest
from libarchive import memory_reader, memory_writer
from libarchive.entry import ArchiveEntry, ConsumedArchiveEntry, PassedArchiveEntry
from . import data_dir, get_entries, get_tarinfos
text_type = unicode if str is bytes else str # noqa: F821
locale.setlocale(locale.LC_ALL, '')
# needed for sane time stamp comparison
environ['TZ'] = 'UTC'
def test_entry_properties():
buf = bytes(bytearray(1000000))
with memory_writer(buf, 'gnutar') as archive:
archive.add_files('README.rst')
readme_stat = stat('README.rst')
with memory_reader(buf) as archive:
for entry in archive:
assert entry.uid == readme_stat.st_uid
assert entry.gid == readme_stat.st_gid
assert entry.mode == readme_stat.st_mode
assert not entry.isblk
assert not entry.ischr
assert not entry.isdir
assert not entry.isfifo
assert not entry.islnk
assert not entry.issym
assert not entry.linkpath
assert entry.linkpath == entry.linkname
assert entry.isreg
assert entry.isfile
assert not entry.issock
assert not entry.isdev
assert b'rw' in entry.strmode
assert entry.pathname == entry.path
assert entry.pathname == entry.name
def test_check_ArchiveEntry_against_TarInfo():
for name in ('special.tar', 'tar_relative.tar'):
path = join(data_dir, name)
tarinfos = list(get_tarinfos(path))
entries = list(get_entries(path))
for tarinfo, entry in zip(tarinfos, entries):
assert tarinfo == entry
assert len(tarinfos) == len(entries)
def test_check_archiveentry_using_python_testtar():
check_entries(join(data_dir, 'testtar.tar'))
def test_check_archiveentry_with_unicode_and_binary_entries_tar():
check_entries(join(data_dir, 'unicode.tar'))
def test_check_archiveentry_with_unicode_and_binary_entries_zip():
check_entries(join(data_dir, 'unicode.zip'))
def test_check_archiveentry_with_unicode_and_binary_entries_zip2():
check_entries(join(data_dir, 'unicode2.zip'), ignore='mode')
def test_check_archiveentry_with_unicode_entries_and_name_zip():
check_entries(join(data_dir, '\ud504\ub85c\uadf8\ub7a8.zip'))
def check_entries(test_file, regen=False, ignore=''):
ignore = ignore.split()
fixture_file = test_file + '.json'
if regen:
entries = list(get_entries(test_file))
with open(fixture_file, 'w', encoding='UTF-8') as ex:
json.dump(entries, ex, indent=2, sort_keys=True)
with open(fixture_file, encoding='UTF-8') as ex:
expected = json.load(ex)
actual = list(get_entries(test_file))
for e1, e2 in zip(actual, expected):
for key in ignore:
e1.pop(key)
e2.pop(key)
# Normalize all unicode (can vary depending on the system)
for d in (e1, e2):
for key in d:
if isinstance(d[key], text_type):
d[key] = unicodedata.normalize('NFC', d[key])
assert e1 == e2
def test_the_life_cycle_of_archive_entries():
"""Check that `get_blocks` only works on the current entry, and only once.
"""
# Create a test archive in memory
buf = bytes(bytearray(10_000_000))
with memory_writer(buf, 'gnutar') as archive:
archive.add_files(
'README.rst',
'libarchive/__init__.py',
'libarchive/entry.py',
)
# Read multiple entries of the test archive and check how the evolve
with memory_reader(buf) as archive:
archive_iter = iter(archive)
entry1 = next(archive_iter)
assert type(entry1) is ArchiveEntry
for block in entry1.get_blocks():
pass
assert type(entry1) is ConsumedArchiveEntry
with pytest.raises(TypeError):
entry1.get_blocks()
entry2 = next(archive_iter)
assert type(entry2) is ArchiveEntry
assert type(entry1) is PassedArchiveEntry
with pytest.raises(TypeError):
entry1.get_blocks()
entry3 = next(archive_iter)
assert type(entry3) is ArchiveEntry
assert type(entry2) is PassedArchiveEntry
assert type(entry1) is PassedArchiveEntry
def test_non_ASCII_encoding_of_file_metadata():
buf = bytes(bytearray(100_000))
file_name = 'README.rst'
encoded_file_name = 'README.rst'.encode('cp037')
with memory_writer(buf, 'ustar', header_codec='cp037') as archive:
archive.add_file(file_name)
with memory_reader(buf) as archive:
entry = next(iter(archive))
assert entry.pathname == encoded_file_name
with memory_reader(buf, header_codec='cp037') as archive:
entry = next(iter(archive))
assert entry.pathname == file_name

View File

@ -0,0 +1,40 @@
from errno import ENOENT
import pytest
from libarchive import ArchiveError, ffi, memory_writer
def test_add_files_nonexistent():
with memory_writer(bytes(bytearray(4096)), 'zip') as archive:
with pytest.raises(ArchiveError) as e:
archive.add_files('nonexistent')
assert e.value.msg
assert e.value.errno == ENOENT
assert e.value.retcode == -25
def test_check_int_logs_warnings(monkeypatch):
calls = []
monkeypatch.setattr(ffi.logger, 'warning', lambda *_: calls.append(1))
archive_p = ffi.write_new()
ffi.check_int(ffi.ARCHIVE_WARN, print, [archive_p])
assert calls == [1]
def test_check_null():
with pytest.raises(ArchiveError) as e:
ffi.check_null(None, print, [])
assert str(e)
def test_error_string_decoding(monkeypatch):
monkeypatch.setattr(ffi, 'error_string', lambda *_: None)
r = ffi._error_string(None)
assert r is None
monkeypatch.setattr(ffi, 'error_string', lambda *_: b'a')
r = ffi._error_string(None)
assert isinstance(r, type(''))
monkeypatch.setattr(ffi, 'error_string', lambda *_: '\xe9'.encode('utf8'))
r = ffi._error_string(None)
assert isinstance(r, bytes)

View File

@ -0,0 +1,183 @@
"""Test reading, writing and extracting archives."""
import io
import json
import libarchive
from libarchive.entry import format_time
from libarchive.extract import EXTRACT_OWNER, EXTRACT_PERM, EXTRACT_TIME
from libarchive.write import memory_writer
from unittest.mock import patch
import pytest
from . import check_archive, in_dir, treestat
def test_buffers(tmpdir):
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
buf = bytes(bytearray(1000000))
with libarchive.memory_writer(buf, 'gnutar', 'xz') as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
with libarchive.memory_reader(buf) as archive:
check_archive(archive, tree)
assert archive.format_name == b'GNU tar format'
assert archive.filter_names == [b'xz']
# Extract the archive in tmpdir and check that the data is intact
with in_dir(tmpdir.strpath):
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
libarchive.extract_memory(buf, flags)
tree2 = treestat('libarchive')
assert tree2 == tree
def test_fd(tmpdir):
archive_file = open(tmpdir.strpath+'/test.tar.bz2', 'w+b')
fd = archive_file.fileno()
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
with libarchive.fd_writer(fd, 'gnutar', 'bzip2') as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
archive_file.seek(0)
with libarchive.fd_reader(fd) as archive:
check_archive(archive, tree)
assert archive.format_name == b'GNU tar format'
assert archive.filter_names == [b'bzip2']
# Extract the archive in tmpdir and check that the data is intact
archive_file.seek(0)
with in_dir(tmpdir.strpath):
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
libarchive.extract_fd(fd, flags)
tree2 = treestat('libarchive')
assert tree2 == tree
def test_files(tmpdir):
archive_path = tmpdir.strpath+'/test.tar.gz'
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
with libarchive.file_writer(archive_path, 'ustar', 'gzip') as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
with libarchive.file_reader(archive_path) as archive:
check_archive(archive, tree)
assert archive.format_name == b'POSIX ustar format'
assert archive.filter_names == [b'gzip']
# Extract the archive in tmpdir and check that the data is intact
with in_dir(tmpdir.strpath):
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
libarchive.extract_file(archive_path, flags)
tree2 = treestat('libarchive')
assert tree2 == tree
def test_custom_writer_and_stream_reader():
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
stream = io.BytesIO()
with libarchive.custom_writer(stream.write, 'zip') as archive:
archive.add_files('libarchive/')
stream.seek(0)
# Read the archive and check that the data is correct
with libarchive.stream_reader(stream, 'zip') as archive:
check_archive(archive, tree)
assert archive.format_name == b'ZIP 2.0 (deflation)'
assert archive.filter_names == []
@patch('libarchive.ffi.write_fail')
def test_write_fail(write_fail_mock):
buf = bytes(bytearray(1000000))
try:
with memory_writer(buf, 'gnutar', 'xz') as archive:
archive.add_files('libarchive/')
raise TypeError
except TypeError:
pass
assert write_fail_mock.called
@patch('libarchive.ffi.write_fail')
def test_write_not_fail(write_fail_mock):
buf = bytes(bytearray(1000000))
with memory_writer(buf, 'gnutar', 'xz') as archive:
archive.add_files('libarchive/')
assert not write_fail_mock.called
def test_adding_nonexistent_file_to_archive():
stream = io.BytesIO()
with libarchive.custom_writer(stream.write, 'zip') as archive:
with pytest.raises(libarchive.ArchiveError):
archive.add_files('nonexistent')
archive.add_files('libarchive/')
@pytest.mark.parametrize(
'archfmt,data_bytes',
[('zip', b'content'),
('gnutar', b''),
('pax', json.dumps({'a': 1, 'b': 2, 'c': 3}).encode()),
('7zip', b'lorem\0ipsum')])
def test_adding_entry_from_memory(archfmt, data_bytes):
entry_path = 'testfile.data'
entry_data = data_bytes
entry_size = len(data_bytes)
blocks = []
archfmt = 'zip'
has_birthtime = archfmt != 'zip'
atime = (1482144741, 495628118)
mtime = (1482155417, 659017086)
ctime = (1482145211, 536858081)
btime = (1482144740, 495628118) if has_birthtime else None
def write_callback(data):
blocks.append(data[:])
return len(data)
with libarchive.custom_writer(write_callback, archfmt) as archive:
archive.add_file_from_memory(
entry_path, entry_size, entry_data,
atime=atime, mtime=mtime, ctime=ctime, birthtime=btime,
uid=1000, gid=1000,
)
buf = b''.join(blocks)
with libarchive.memory_reader(buf) as memory_archive:
for archive_entry in memory_archive:
expected = entry_data
actual = b''.join(archive_entry.get_blocks())
assert expected == actual
assert archive_entry.path == entry_path
assert archive_entry.atime in (atime[0], format_time(*atime))
assert archive_entry.mtime in (mtime[0], format_time(*mtime))
assert archive_entry.ctime in (ctime[0], format_time(*ctime))
if has_birthtime:
assert archive_entry.birthtime in (
btime[0], format_time(*btime)
)
assert archive_entry.uid == 1000
assert archive_entry.gid == 1000

View File

@ -0,0 +1,36 @@
"""Test security-related extraction flags."""
import pytest
import os
from libarchive import extract_file, file_reader
from libarchive.extract import (
EXTRACT_SECURE_NOABSOLUTEPATHS, EXTRACT_SECURE_NODOTDOT,
)
from libarchive.exception import ArchiveError
from . import data_dir
def run_test(flags):
archive_path = os.path.join(data_dir, 'flags.tar')
try:
extract_file(archive_path, 0)
with pytest.raises(ArchiveError):
extract_file(archive_path, flags)
finally:
with file_reader(archive_path) as archive:
for entry in archive:
if os.path.exists(entry.pathname):
os.remove(entry.pathname)
def test_extraction_is_secure_by_default():
run_test(None)
def test_explicit_no_dot_dot():
run_test(EXTRACT_SECURE_NODOTDOT)
def test_explicit_no_absolute_paths():
run_test(EXTRACT_SECURE_NOABSOLUTEPATHS)

View File

@ -0,0 +1,14 @@
[tox]
envlist=py38,py39,py310,py311
skipsdist=True
[testenv]
passenv = LIBARCHIVE
commands=
python -m pytest -Wd -vv --forked --cov libarchive --cov-report term-missing {toxinidir}/tests {posargs}
flake8 {toxinidir}
deps=
flake8
pytest
pytest-cov
pytest-forked

View File

@ -0,0 +1,45 @@
# Source: https://github.com/Changaco/version.py
from os.path import dirname, isdir, join
import re
from subprocess import CalledProcessError, check_output
PREFIX = ''
tag_re = re.compile(r'\btag: %s([0-9][^,]*)\b' % PREFIX)
version_re = re.compile('^Version: (.+)$', re.M)
def get_version():
# Return the version if it has been injected into the file by git-archive
version = tag_re.search('$Format:%D$')
if version:
return version.group(1)
d = dirname(__file__)
if isdir(join(d, '.git')):
# Get the version using "git describe".
cmd = 'git describe --tags --match %s[0-9]* --dirty' % PREFIX
try:
version = check_output(cmd.split()).decode().strip()[len(PREFIX):]
except CalledProcessError:
raise RuntimeError('Unable to get version number from git tags')
# PEP 440 compatibility
if '-' in version:
if version.endswith('-dirty'):
raise RuntimeError('The working tree is dirty')
version = '.post'.join(version.split('-')[:2])
else:
# Extract the version from the PKG-INFO file.
with open(join(d, 'PKG-INFO'), encoding='utf-8', errors='replace') as f:
version = version_re.search(f.read()).group(1)
return version
if __name__ == '__main__':
print(get_version())