Compare commits

...

2 Commits

Author SHA1 Message Date
Vadim vtroshchinskiy fc2cf5cd45 Add Debian packaging 2024-11-12 13:37:20 +01:00
Vadim vtroshchinskiy 5daeb8200f Initial package contents 2024-11-12 13:36:01 +01:00
53 changed files with 3636 additions and 0 deletions

View File

@ -0,0 +1 @@
version.py export-subst

View File

@ -0,0 +1 @@
liberapay: Changaco

View File

@ -0,0 +1,36 @@
name: CI
on:
# Trigger the workflow on push or pull request events but only for the master branch
push:
branches: [ master ]
pull_request:
branches: [ master ]
# Allow running this workflow manually from the Actions tab
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install libarchive
run: sudo apt-get install -y libarchive13
- name: Install Python 3.11
uses: actions/setup-python@v2
with:
python-version: '3.11'
- name: Install Python 3.10
uses: actions/setup-python@v2
with:
python-version: '3.10'
- name: Install Python 3.9
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install Python 3.8
uses: actions/setup-python@v2
with:
python-version: '3.8'
- name: Install tox
run: pip install tox
- name: Run the tests
run: tox

View File

@ -0,0 +1,8 @@
*.egg-info/
/build/
/dist/
/env/
/htmlcov/
.coverage
*.pyc
.tox/

View File

@ -0,0 +1 @@
https://creativecommons.org/publicdomain/zero/1.0/

View File

@ -0,0 +1 @@
include version.py

View File

@ -0,0 +1,147 @@
Metadata-Version: 2.1
Name: libarchive-c
Version: 5.1
Summary: Python interface to libarchive
Home-page: https://github.com/Changaco/python-libarchive-c
Author: Changaco
Author-email: changaco@changaco.oy.lc
License: CC0
Keywords: archive libarchive 7z tar bz2 zip gz
Description-Content-Type: text/x-rst
License-File: LICENSE.md
A Python interface to libarchive. It uses the standard ctypes_ module to
dynamically load and access the C library.
.. _ctypes: https://docs.python.org/3/library/ctypes.html
Installation
============
pip install libarchive-c
Compatibility
=============
python
------
python-libarchive-c is currently tested with python 3.8, 3.9, 3.10 and 3.11.
If you find an incompatibility with older versions you can send us a small patch,
but we won't accept big changes.
libarchive
----------
python-libarchive-c may not work properly with obsolete versions of libarchive such as the ones included in MacOS. In that case you can install a recent version of libarchive (e.g. with ``brew install libarchive`` on MacOS) and use the ``LIBARCHIVE`` environment variable to point python-libarchive-c to it::
export LIBARCHIVE=/usr/local/Cellar/libarchive/3.3.3/lib/libarchive.13.dylib
Usage
=====
Import::
import libarchive
Extracting archives
-------------------
To extract an archive, use the ``extract_file`` function::
os.chdir('/path/to/target/directory')
libarchive.extract_file('test.zip')
Alternatively, the ``extract_memory`` function can be used to extract from a buffer,
and ``extract_fd`` from a file descriptor.
The ``extract_*`` functions all have an integer ``flags`` argument which is passed
directly to the C function ``archive_write_disk_set_options()``. You can import
the ``EXTRACT_*`` constants from the ``libarchive.extract`` module and see the
official description of each flag in the ``archive_write_disk(3)`` man page.
By default, when the ``flags`` argument is ``None``, the ``SECURE_NODOTDOT``,
``SECURE_NOABSOLUTEPATHS`` and ``SECURE_SYMLINKS`` flags are passed to
libarchive, unless the current directory is the root (``/``).
Reading archives
----------------
To read an archive, use the ``file_reader`` function::
with libarchive.file_reader('test.7z') as archive:
for entry in archive:
for block in entry.get_blocks():
...
Alternatively, the ``memory_reader`` function can be used to read from a buffer,
``fd_reader`` from a file descriptor, ``stream_reader`` from a stream object
(which must support the standard ``readinto`` method), and ``custom_reader``
from anywhere using callbacks.
To learn about the attributes of the ``entry`` object, see the ``libarchive/entry.py``
source code or run ``help(libarchive.entry.ArchiveEntry)`` in a Python shell.
Displaying progress
~~~~~~~~~~~~~~~~~~~
If your program processes large archives, you can keep track of its progress
with the ``bytes_read`` attribute. Here's an example of a progress bar using
`tqdm <https://pypi.org/project/tqdm/>`_::
with tqdm(total=os.stat(archive_path).st_size, unit='bytes') as pbar, \
libarchive.file_reader(archive_path) as archive:
for entry in archive:
...
pbar.update(archive.bytes_read - pbar.n)
Creating archives
-----------------
To create an archive, use the ``file_writer`` function::
from libarchive.entry import FileType
with libarchive.file_writer('test.tar.gz', 'ustar', 'gzip') as archive:
# Add the `libarchive/` directory and everything in it (recursively),
# then the `README.rst` file.
archive.add_files('libarchive/', 'README.rst')
# Add a regular file defined from scratch.
data = b'foobar'
archive.add_file_from_memory('../escape-test', len(data), data)
# Add a directory defined from scratch.
early_epoch = (42, 42) # 1970-01-01 00:00:42.000000042
archive.add_file_from_memory(
'metadata-test', 0, b'',
filetype=FileType.DIRECTORY, permission=0o755, uid=4242, gid=4242,
atime=early_epoch, mtime=early_epoch, ctime=early_epoch, birthtime=early_epoch,
)
Alternatively, the ``memory_writer`` function can be used to write to a memory buffer,
``fd_writer`` to a file descriptor, and ``custom_writer`` to a callback function.
For each of those functions, the mandatory second argument is the archive format,
and the optional third argument is the compression format (called “filter” in
libarchive). The acceptable values are listed in ``libarchive.ffi.WRITE_FORMATS``
and ``libarchive.ffi.WRITE_FILTERS``.
File metadata codecs
--------------------
By default, UTF-8 is used to read and write file attributes from and to archives.
A different codec can be specified through the ``header_codec`` arguments of the
``*_reader`` and ``*_writer`` functions. Example::
with libarchive.file_writer('test.tar', 'ustar', header_codec='cp037') as archive:
...
with file_reader('test.tar', header_codec='cp037') as archive:
...
In addition to file paths (``pathname`` and ``linkpath``), the specified codec is
used to encode and decode user and group names (``uname`` and ``gname``).
License
=======
`CC0 Public Domain Dedication <http://creativecommons.org/publicdomain/zero/1.0/>`_

View File

@ -0,0 +1,135 @@
A Python interface to libarchive. It uses the standard ctypes_ module to
dynamically load and access the C library.
.. _ctypes: https://docs.python.org/3/library/ctypes.html
Installation
============
pip install libarchive-c
Compatibility
=============
python
------
python-libarchive-c is currently tested with python 3.8, 3.9, 3.10 and 3.11.
If you find an incompatibility with older versions you can send us a small patch,
but we won't accept big changes.
libarchive
----------
python-libarchive-c may not work properly with obsolete versions of libarchive such as the ones included in MacOS. In that case you can install a recent version of libarchive (e.g. with ``brew install libarchive`` on MacOS) and use the ``LIBARCHIVE`` environment variable to point python-libarchive-c to it::
export LIBARCHIVE=/usr/local/Cellar/libarchive/3.3.3/lib/libarchive.13.dylib
Usage
=====
Import::
import libarchive
Extracting archives
-------------------
To extract an archive, use the ``extract_file`` function::
os.chdir('/path/to/target/directory')
libarchive.extract_file('test.zip')
Alternatively, the ``extract_memory`` function can be used to extract from a buffer,
and ``extract_fd`` from a file descriptor.
The ``extract_*`` functions all have an integer ``flags`` argument which is passed
directly to the C function ``archive_write_disk_set_options()``. You can import
the ``EXTRACT_*`` constants from the ``libarchive.extract`` module and see the
official description of each flag in the ``archive_write_disk(3)`` man page.
By default, when the ``flags`` argument is ``None``, the ``SECURE_NODOTDOT``,
``SECURE_NOABSOLUTEPATHS`` and ``SECURE_SYMLINKS`` flags are passed to
libarchive, unless the current directory is the root (``/``).
Reading archives
----------------
To read an archive, use the ``file_reader`` function::
with libarchive.file_reader('test.7z') as archive:
for entry in archive:
for block in entry.get_blocks():
...
Alternatively, the ``memory_reader`` function can be used to read from a buffer,
``fd_reader`` from a file descriptor, ``stream_reader`` from a stream object
(which must support the standard ``readinto`` method), and ``custom_reader``
from anywhere using callbacks.
To learn about the attributes of the ``entry`` object, see the ``libarchive/entry.py``
source code or run ``help(libarchive.entry.ArchiveEntry)`` in a Python shell.
Displaying progress
~~~~~~~~~~~~~~~~~~~
If your program processes large archives, you can keep track of its progress
with the ``bytes_read`` attribute. Here's an example of a progress bar using
`tqdm <https://pypi.org/project/tqdm/>`_::
with tqdm(total=os.stat(archive_path).st_size, unit='bytes') as pbar, \
libarchive.file_reader(archive_path) as archive:
for entry in archive:
...
pbar.update(archive.bytes_read - pbar.n)
Creating archives
-----------------
To create an archive, use the ``file_writer`` function::
from libarchive.entry import FileType
with libarchive.file_writer('test.tar.gz', 'ustar', 'gzip') as archive:
# Add the `libarchive/` directory and everything in it (recursively),
# then the `README.rst` file.
archive.add_files('libarchive/', 'README.rst')
# Add a regular file defined from scratch.
data = b'foobar'
archive.add_file_from_memory('../escape-test', len(data), data)
# Add a directory defined from scratch.
early_epoch = (42, 42) # 1970-01-01 00:00:42.000000042
archive.add_file_from_memory(
'metadata-test', 0, b'',
filetype=FileType.DIRECTORY, permission=0o755, uid=4242, gid=4242,
atime=early_epoch, mtime=early_epoch, ctime=early_epoch, birthtime=early_epoch,
)
Alternatively, the ``memory_writer`` function can be used to write to a memory buffer,
``fd_writer`` to a file descriptor, and ``custom_writer`` to a callback function.
For each of those functions, the mandatory second argument is the archive format,
and the optional third argument is the compression format (called “filter” in
libarchive). The acceptable values are listed in ``libarchive.ffi.WRITE_FORMATS``
and ``libarchive.ffi.WRITE_FILTERS``.
File metadata codecs
--------------------
By default, UTF-8 is used to read and write file attributes from and to archives.
A different codec can be specified through the ``header_codec`` arguments of the
``*_reader`` and ``*_writer`` functions. Example::
with libarchive.file_writer('test.tar', 'ustar', header_codec='cp037') as archive:
...
with file_reader('test.tar', header_codec='cp037') as archive:
...
In addition to file paths (``pathname`` and ``linkpath``), the specified codec is
used to encode and decode user and group names (``uname`` and ``gname``).
License
=======
`CC0 Public Domain Dedication <http://creativecommons.org/publicdomain/zero/1.0/>`_

View File

@ -0,0 +1,5 @@
opengnsys-libarchive-c (5.1) UNRELEASED; urgency=medium
* Initial release. (Closes: #XXXXXX)
-- root <opengnsys@opengnsys.com> Mon, 11 Nov 2024 17:11:16 +0000

View File

@ -0,0 +1,29 @@
Source: opengnsys-libarchive-c
Maintainer: OpenGnsys <opengnsys@opengnsys.org>
XSBC-Original-Maintainer: Jérémy Bobbio <lunar@debian.org>
Section: python
Priority: optional
Build-Depends: debhelper-compat (= 12),
dh-python,
libarchive-dev,
python3-all,
python3-mock,
python3-pytest,
python3-setuptools
Standards-Version: 4.5.0
Rules-Requires-Root: no
Homepage: https://github.com/Changaco/python-libarchive-c
Vcs-Browser: https://salsa.debian.org/debian/python-libarchive-c
Vcs-Git: https://salsa.debian.org/debian/python-libarchive-c.git
Package: opengnsys-libarchive-c
Architecture: all
Depends: ${lib:Depends}, ${misc:Depends}, ${python3:Depends}
Description: Python3 interface to libarchive
The libarchive library provides a flexible interface for reading and writing
archives in various formats such as tar and cpio. libarchive also supports
reading and writing archives compressed using various compression filters such
as gzip and bzip2.
.
This package contains a Python3 interface to libarchive written using the
standard ctypes module to dynamically load and access the C library.

View File

@ -0,0 +1,208 @@
Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: python-libarchive-c
Source: https://github.com/Changaco/python-libarchive-c
Files: *
Copyright: 2014-2018 Changaco <changaco@changaco.oy.lc>
License: CC-0
Files: tests/surrogateescape.py
Copyright: 2015 Changaco <changaco@changaco.oy.lc>
2011-2013 Victor Stinner <victor.stinner@gmail.com>
License: BSD-2-clause or PSF-2
Files: debian/*
Copyright: 2015 Jerémy Bobbio <lunar@debian.org>
2019 Mattia Rizzolo <mattia@debian.org>
License: permissive
Copying and distribution of this package, with or without
modification, are permitted in any medium without royalty
provided the copyright notice and this notice are
preserved.
License: BSD-2-clause
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in
the documentation and/or other materials provided with the
distribution.
.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
License: PSF-2
1. This LICENSE AGREEMENT is between the Python Software Foundation ("PSF"),
and the Individual or Organization ("Licensee") accessing and otherwise using
this software ("Python") in source or binary form and its associated
documentation.
.
2. Subject to the terms and conditions of this License Agreement, PSF hereby
grants Licensee a nonexclusive, royalty-free, world-wide license to
reproduce, analyze, test, perform and/or display publicly, prepare derivative
works, distribute, and otherwise use Python alone or in any derivative
version, provided, however, that PSF's License Agreement and PSF's notice of
copyright, i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006 Python
Software Foundation; All Rights Reserved" are retained in Python alone or in
any derivative version prepared by Licensee.
.
3. In the event Licensee prepares a derivative work that is based on or
incorporates Python or any part thereof, and wants to make the derivative
work available to others as provided herein, then Licensee hereby agrees to
include in any such work a brief summary of the changes made to Python.
.
4. PSF is making Python available to Licensee on an "AS IS" basis. PSF MAKES
NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF EXAMPLE, BUT
NOT LIMITATION, PSF MAKES NO AND DISCLAIMS ANY REPRESENTATION OR WARRANTY OF
MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF
PYTHON WILL NOT INFRINGE ANY THIRD PARTY RIGHTS.
.
5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON FOR ANY
INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF
MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON, OR ANY DERIVATIVE
THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
.
6. This License Agreement will automatically terminate upon a material breach
of its terms and conditions.
.
7. Nothing in this License Agreement shall be deemed to create any
relationship of agency, partnership, or joint venture between PSF and
Licensee. This License Agreement does not grant permission to use PSF
trademarks or trade name in a trademark sense to endorse or promote products
or services of Licensee, or any third party.
.
8. By copying, installing or otherwise using Python, Licensee agrees to be
bound by the terms and conditions of this License Agreement.
License: CC-0
Statement of Purpose
.
The laws of most jurisdictions throughout the world automatically
confer exclusive Copyright and Related Rights (defined below) upon
the creator and subsequent owner(s) (each and all, an "owner") of an
original work of authorship and/or a database (each, a "Work").
.
Certain owners wish to permanently relinquish those rights to a Work
for the purpose of contributing to a commons of creative, cultural
and scientific works ("Commons") that the public can reliably and
without fear of later claims of infringement build upon, modify,
incorporate in other works, reuse and redistribute as freely as
possible in any form whatsoever and for any purposes, including
without limitation commercial purposes. These owners may contribute
to the Commons to promote the ideal of a free culture and the further
production of creative, cultural and scientific works, or to gain
reputation or greater distribution for their Work in part through the
use and efforts of others.
.
For these and/or other purposes and motivations, and without any
expectation of additional consideration or compensation, the person
associating CC0 with a Work (the "Affirmer"), to the extent that he
or she is an owner of Copyright and Related Rights in the Work,
voluntarily elects to apply CC0 to the Work and publicly distribute
the Work under its terms, with knowledge of his or her Copyright and
Related Rights in the Work and the meaning and intended legal effect
of CC0 on those rights.
.
1. Copyright and Related Rights. A Work made available under CC0 may
be protected by copyright and related or neighboring rights
("Copyright and Related Rights"). Copyright and Related Rights
include, but are not limited to, the following:
.
i. the right to reproduce, adapt, distribute, perform, display,
communicate, and translate a Work;
ii. moral rights retained by the original author(s) and/or
performer(s);
iii. publicity and privacy rights pertaining to a person's image
or likeness depicted in a Work;
iv. rights protecting against unfair competition in regards to a
Work, subject to the limitations in paragraph 4(a), below;
v. rights protecting the extraction, dissemination, use and
reuse of data in a Work;
vi. database rights (such as those arising under Directive
96/9/EC of the European Parliament and of the Council of 11
March 1996 on the legal protection of databases, and under
any national implementation thereof, including any amended or
successor version of such directive); and
vii. other similar, equivalent or corresponding rights throughout
the world based on applicable law or treaty, and any national
implementations thereof.
.
2. Waiver. To the greatest extent permitted by, but not in
contravention of, applicable law, Affirmer hereby overtly, fully,
permanently, irrevocably and unconditionally waives, abandons, and
surrenders all of Affirmer's Copyright and Related Rights and
associated claims and causes of action, whether now known or
unknown (including existing as well as future claims and causes of
action), in the Work (i) in all territories worldwide, (ii) for
the maximum duration provided by applicable law or treaty
(including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose
whatsoever, including without limitation commercial, advertising
or promotional purposes (the "Waiver"). Affirmer makes the Waiver
for the benefit of each member of the public at large and to the
detriment of Affirmer's heirs and successors, fully intending that
such Waiver shall not be subject to revocation, rescission,
cancellation, termination, or any other legal or equitable action
to disrupt the quiet enjoyment of the Work by the public as
contemplated by Affirmer's express Statement of Purpose.
.
3. Public License Fallback. Should any part of the Waiver for any
reason be judged legally invalid or ineffective under applicable law,
then the Waiver shall be preserved to the maximum extent permitted
taking into account Affirmer's express Statement of Purpose. In
addition, to the extent the Waiver is so judged Affirmer hereby
grants to each affected person a royalty-free, non transferable, non
sublicensable, non exclusive, irrevocable and unconditional license
to exercise Affirmer's Copyright and Related Rights in the Work (i)
in all territories worldwide, (ii) for the maximum duration provided
by applicable law or treaty (including future time extensions), (iii)
in any current or future medium and for any number of copies, and
(iv) for any purpose whatsoever, including without limitation
commercial, advertising or promotional purposes (the "License"). The
License shall be deemed effective as of the date CC0 was applied by
Affirmer to the Work. Should any part of the License for any reason
be judged legally invalid or ineffective under applicable law, such
partial invalidity or ineffectiveness shall not invalidate the
remainder of the License, and in such case Affirmer hereby affirms
that he or she will not (i) exercise any of his or her remaining
Copyright and Related Rights in the Work or (ii) assert any
associated claims and causes of action with respect to the Work, in
either case contrary to Affirmer's express Statement of Purpose.
.
4. Limitations and Disclaimers.
.
a. No trademark or patent rights held by Affirmer are waived,
abandoned, surrendered, licensed or otherwise affected by
this document.
b. Affirmer offers the Work as-is and makes no representations
or warranties of any kind concerning the Work, express,
implied, statutory or otherwise, including without limitation
warranties of title, merchantability, fitness for a
particular purpose, non infringement, or the absence of
latent or other defects, accuracy, or the present or absence
of errors, whether or not discoverable, all to the greatest
extent permissible under applicable law.
c. Affirmer disclaims responsibility for clearing rights of
other persons that may apply to the Work or any use thereof,
including without limitation any person's Copyright and
Related Rights in the Work. Further, Affirmer disclaims
responsibility for obtaining any necessary consents,
permissions or other rights required for any use of the
Work.
d. Affirmer understands and acknowledges that Creative Commons
is not a party to this document and has no duty or obligation
with respect to this CC0 or use of the Work.

View File

@ -0,0 +1,2 @@
opengnsys-libarchive-c_5.1_all.deb python optional
opengnsys-libarchive-c_5.1_amd64.buildinfo python optional

View File

@ -0,0 +1,2 @@
misc:Depends=
misc:Pre-Depends=

View File

@ -0,0 +1,22 @@
#!/usr/bin/make -f
export LC_ALL=C.UTF-8
export PYBUILD_NAME = libarchive-c
export PYBUILD_BEFORE_TEST = cp -av README.rst {build_dir}
export PYBUILD_TEST_ARGS = -vv -s
export PYBUILD_AFTER_TEST = rm -v {build_dir}/README.rst
# ./usr/lib/python3/dist-packages/libarchive/
export PYBUILD_INSTALL_ARGS=--install-lib=/opt/opengnsys/python3/dist-packages/
%:
dh $@ --with python3 --buildsystem=pybuild
override_dh_gencontrol:
dh_gencontrol -- \
-Vlib:Depends=$(shell dpkg-query -W -f '$${Depends}' libarchive-dev \
| sed -E 's/.*(libarchive[[:alnum:].-]+).*/\1/')
override_dh_installdocs:
# Nothing, we don't want docs
override_dh_installchangelogs:
# Nothing, we don't want the changelog

View File

@ -0,0 +1 @@
3.0 (quilt)

View File

@ -0,0 +1,2 @@
Tests: upstream-tests
Depends: @, python3-mock, python3-pytest

View File

@ -0,0 +1,14 @@
#!/bin/sh
set -e
if ! [ -d "$AUTOPKGTEST_TMP" ]; then
echo "AUTOPKGTEST_TMP not set." >&2
exit 1
fi
cp -rv tests "$AUTOPKGTEST_TMP"
cd "$AUTOPKGTEST_TMP"
mkdir -v libarchive
touch README.rst
py.test-3 tests -vv -l -r a

View File

@ -0,0 +1,3 @@
version=3
https://pypi.python.org/simple/libarchive-c \
.*/libarchive-c-(.+)\.tar\.gz#.*

View File

@ -0,0 +1,17 @@
from .entry import ArchiveEntry
from .exception import ArchiveError
from .extract import extract_fd, extract_file, extract_memory
from .read import (
custom_reader, fd_reader, file_reader, memory_reader, stream_reader,
seekable_stream_reader
)
from .write import custom_writer, fd_writer, file_writer, memory_writer
__all__ = [x.__name__ for x in (
ArchiveEntry,
ArchiveError,
extract_fd, extract_file, extract_memory,
custom_reader, fd_reader, file_reader, memory_reader, stream_reader,
seekable_stream_reader,
custom_writer, fd_writer, file_writer, memory_writer
)]

View File

@ -0,0 +1,450 @@
from contextlib import contextmanager
from ctypes import create_string_buffer
from enum import IntEnum
import math
from . import ffi
class FileType(IntEnum):
NAMED_PIPE = AE_IFIFO = 0o010000 # noqa: E221
CHAR_DEVICE = AE_IFCHR = 0o020000 # noqa: E221
DIRECTORY = AE_IFDIR = 0o040000 # noqa: E221
BLOCK_DEVICE = AE_IFBLK = 0o060000 # noqa: E221
REGULAR_FILE = AE_IFREG = 0o100000 # noqa: E221
SYMBOLINK_LINK = AE_IFLNK = 0o120000 # noqa: E221
SOCKET = AE_IFSOCK = 0o140000 # noqa: E221
@contextmanager
def new_archive_entry():
entry_p = ffi.entry_new()
try:
yield entry_p
finally:
ffi.entry_free(entry_p)
def format_time(seconds, nanos):
""" return float of seconds.nanos when nanos set, or seconds when not """
if nanos:
return float(seconds) + float(nanos) / 1000000000.0
return int(seconds)
class ArchiveEntry:
__slots__ = ('_archive_p', '_entry_p', 'header_codec')
def __init__(self, archive_p=None, header_codec='utf-8', **attributes):
"""Allocate memory for an `archive_entry` struct.
The `header_codec` is used to decode and encode file paths and other
attributes.
The `**attributes` are passed to the `modify` method.
"""
self._archive_p = archive_p
self._entry_p = ffi.entry_new()
self.header_codec = header_codec
if attributes:
self.modify(**attributes)
def __del__(self):
"""Free the C struct"""
ffi.entry_free(self._entry_p)
def __str__(self):
"""Returns the file's path"""
return self.pathname
def modify(self, header_codec=None, **attributes):
"""Convenience method to modify the entry's attributes.
Args:
filetype (int): the file's type, see the `FileType` class for values
pathname (str): the file's path
linkpath (str): the other path of the file, if the file is a link
size (int | None): the file's size, in bytes
perm (int): the file's permissions in standard Unix format, e.g. 0o640
uid (int): the file owner's numerical identifier
gid (int): the file group's numerical identifier
uname (str | bytes): the file owner's name
gname (str | bytes): the file group's name
atime (int | Tuple[int, int] | float | None):
the file's most recent access time,
either in seconds or as a tuple (seconds, nanoseconds)
mtime (int | Tuple[int, int] | float | None):
the file's most recent modification time,
either in seconds or as a tuple (seconds, nanoseconds)
ctime (int | Tuple[int, int] | float | None):
the file's most recent metadata change time,
either in seconds or as a tuple (seconds, nanoseconds)
birthtime (int | Tuple[int, int] | float | None):
the file's creation time (for archive formats that support it),
either in seconds or as a tuple (seconds, nanoseconds)
rdev (int | Tuple[int, int]): device number, if the file is a device
rdevmajor (int): major part of the device number
rdevminor (int): minor part of the device number
"""
if header_codec:
self.header_codec = header_codec
for name, value in attributes.items():
setattr(self, name, value)
@property
def filetype(self):
return ffi.entry_filetype(self._entry_p)
@filetype.setter
def filetype(self, value):
ffi.entry_set_filetype(self._entry_p, value)
@property
def uid(self):
return ffi.entry_uid(self._entry_p)
@uid.setter
def uid(self, uid):
ffi.entry_set_uid(self._entry_p, uid)
@property
def gid(self):
return ffi.entry_gid(self._entry_p)
@gid.setter
def gid(self, gid):
ffi.entry_set_gid(self._entry_p, gid)
@property
def uname(self):
uname = ffi.entry_uname_w(self._entry_p)
if not uname:
uname = ffi.entry_uname(self._entry_p)
if uname is not None:
try:
uname = uname.decode(self.header_codec)
except UnicodeError:
pass
return uname
@uname.setter
def uname(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_uname_utf8(self._entry_p, value)
else:
ffi.entry_copy_uname(self._entry_p, value)
@property
def gname(self):
gname = ffi.entry_gname_w(self._entry_p)
if not gname:
gname = ffi.entry_gname(self._entry_p)
if gname is not None:
try:
gname = gname.decode(self.header_codec)
except UnicodeError:
pass
return gname
@gname.setter
def gname(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_gname_utf8(self._entry_p, value)
else:
ffi.entry_copy_gname(self._entry_p, value)
def get_blocks(self, block_size=ffi.page_size):
"""Read the file's content, keeping only one chunk in memory at a time.
Don't do anything like `list(entry.get_blocks())`, it would silently fail.
Args:
block_size (int): the buffer's size, in bytes
"""
archive_p = self._archive_p
if not archive_p:
raise TypeError("this entry isn't linked to any content")
buf = create_string_buffer(block_size)
read = ffi.read_data
while 1:
r = read(archive_p, buf, block_size)
if r == 0:
break
yield buf.raw[0:r]
self.__class__ = ConsumedArchiveEntry
@property
def isblk(self):
return self.filetype & 0o170000 == 0o060000
@property
def ischr(self):
return self.filetype & 0o170000 == 0o020000
@property
def isdir(self):
return self.filetype & 0o170000 == 0o040000
@property
def isfifo(self):
return self.filetype & 0o170000 == 0o010000
@property
def islnk(self):
return bool(ffi.entry_hardlink_w(self._entry_p) or
ffi.entry_hardlink(self._entry_p))
@property
def issym(self):
return self.filetype & 0o170000 == 0o120000
@property
def isreg(self):
return self.filetype & 0o170000 == 0o100000
@property
def isfile(self):
return self.isreg
@property
def issock(self):
return self.filetype & 0o170000 == 0o140000
@property
def isdev(self):
return self.ischr or self.isblk or self.isfifo or self.issock
@property
def atime(self):
if not ffi.entry_atime_is_set(self._entry_p):
return None
sec_val = ffi.entry_atime(self._entry_p)
nsec_val = ffi.entry_atime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@atime.setter
def atime(self, value):
if value is None:
ffi.entry_unset_atime(self._entry_p)
elif isinstance(value, int):
self.set_atime(value)
elif isinstance(value, tuple):
self.set_atime(*value)
else:
seconds, fraction = math.modf(value)
self.set_atime(int(seconds), int(fraction * 1_000_000_000))
def set_atime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.atime = ...` is supported now."
return ffi.entry_set_atime(self._entry_p, timestamp_sec, timestamp_nsec)
@property
def mtime(self):
if not ffi.entry_mtime_is_set(self._entry_p):
return None
sec_val = ffi.entry_mtime(self._entry_p)
nsec_val = ffi.entry_mtime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@mtime.setter
def mtime(self, value):
if value is None:
ffi.entry_unset_mtime(self._entry_p)
elif isinstance(value, int):
self.set_mtime(value)
elif isinstance(value, tuple):
self.set_mtime(*value)
else:
seconds, fraction = math.modf(value)
self.set_mtime(int(seconds), int(fraction * 1_000_000_000))
def set_mtime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.mtime = ...` is supported now."
return ffi.entry_set_mtime(self._entry_p, timestamp_sec, timestamp_nsec)
@property
def ctime(self):
if not ffi.entry_ctime_is_set(self._entry_p):
return None
sec_val = ffi.entry_ctime(self._entry_p)
nsec_val = ffi.entry_ctime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@ctime.setter
def ctime(self, value):
if value is None:
ffi.entry_unset_ctime(self._entry_p)
elif isinstance(value, int):
self.set_ctime(value)
elif isinstance(value, tuple):
self.set_ctime(*value)
else:
seconds, fraction = math.modf(value)
self.set_ctime(int(seconds), int(fraction * 1_000_000_000))
def set_ctime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.ctime = ...` is supported now."
return ffi.entry_set_ctime(self._entry_p, timestamp_sec, timestamp_nsec)
@property
def birthtime(self):
if not ffi.entry_birthtime_is_set(self._entry_p):
return None
sec_val = ffi.entry_birthtime(self._entry_p)
nsec_val = ffi.entry_birthtime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@birthtime.setter
def birthtime(self, value):
if value is None:
ffi.entry_unset_birthtime(self._entry_p)
elif isinstance(value, int):
self.set_birthtime(value)
elif isinstance(value, tuple):
self.set_birthtime(*value)
else:
seconds, fraction = math.modf(value)
self.set_birthtime(int(seconds), int(fraction * 1_000_000_000))
def set_birthtime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.birthtime = ...` is supported now."
return ffi.entry_set_birthtime(
self._entry_p, timestamp_sec, timestamp_nsec
)
@property
def pathname(self):
path = ffi.entry_pathname_w(self._entry_p)
if not path:
path = ffi.entry_pathname(self._entry_p)
if path is not None:
try:
path = path.decode(self.header_codec)
except UnicodeError:
pass
return path
@pathname.setter
def pathname(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_pathname_utf8(self._entry_p, value)
else:
ffi.entry_copy_pathname(self._entry_p, value)
@property
def linkpath(self):
path = (
(
ffi.entry_symlink_w(self._entry_p) or
ffi.entry_symlink(self._entry_p)
) if self.issym else (
ffi.entry_hardlink_w(self._entry_p) or
ffi.entry_hardlink(self._entry_p)
)
)
if isinstance(path, bytes):
try:
path = path.decode(self.header_codec)
except UnicodeError:
pass
return path
@linkpath.setter
def linkpath(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_link_utf8(self._entry_p, value)
else:
ffi.entry_copy_link(self._entry_p, value)
# aliases for compatibility with the standard `tarfile` module
path = property(pathname.fget, pathname.fset, doc="alias of pathname")
name = path
linkname = property(linkpath.fget, linkpath.fset, doc="alias of linkpath")
@property
def size(self):
if ffi.entry_size_is_set(self._entry_p):
return ffi.entry_size(self._entry_p)
@size.setter
def size(self, value):
if value is None:
ffi.entry_unset_size(self._entry_p)
else:
ffi.entry_set_size(self._entry_p, value)
@property
def mode(self):
return ffi.entry_mode(self._entry_p)
@mode.setter
def mode(self, value):
ffi.entry_set_mode(self._entry_p, value)
@property
def strmode(self):
"""The file's mode as a string, e.g. '?rwxrwx---'"""
# note we strip the mode because archive_entry_strmode
# returns a trailing space: strcpy(bp, "?rwxrwxrwx ");
return ffi.entry_strmode(self._entry_p).strip()
@property
def perm(self):
return ffi.entry_perm(self._entry_p)
@perm.setter
def perm(self, value):
ffi.entry_set_perm(self._entry_p, value)
@property
def rdev(self):
return ffi.entry_rdev(self._entry_p)
@rdev.setter
def rdev(self, value):
if isinstance(value, tuple):
ffi.entry_set_rdevmajor(self._entry_p, value[0])
ffi.entry_set_rdevminor(self._entry_p, value[1])
else:
ffi.entry_set_rdev(self._entry_p, value)
@property
def rdevmajor(self):
return ffi.entry_rdevmajor(self._entry_p)
@rdevmajor.setter
def rdevmajor(self, value):
ffi.entry_set_rdevmajor(self._entry_p, value)
@property
def rdevminor(self):
return ffi.entry_rdevminor(self._entry_p)
@rdevminor.setter
def rdevminor(self, value):
ffi.entry_set_rdevminor(self._entry_p, value)
class ConsumedArchiveEntry(ArchiveEntry):
__slots__ = ()
def get_blocks(self, **kw):
raise TypeError("the content of this entry has already been read")
class PassedArchiveEntry(ArchiveEntry):
__slots__ = ()
def get_blocks(self, **kw):
raise TypeError("this entry is passed, it's too late to read its content")

View File

@ -0,0 +1,12 @@
class ArchiveError(Exception):
def __init__(self, msg, errno=None, retcode=None, archive_p=None):
self.msg = msg
self.errno = errno
self.retcode = retcode
self.archive_p = archive_p
def __str__(self):
p = '%s (errno=%s, retcode=%s, archive_p=%s)'
return p % (self.msg, self.errno, self.retcode, self.archive_p)

View File

@ -0,0 +1,88 @@
from contextlib import contextmanager
from ctypes import byref, c_longlong, c_size_t, c_void_p
import os
from .ffi import (
write_disk_new, write_disk_set_options, write_free, write_header,
read_data_block, write_data_block, write_finish_entry, ARCHIVE_EOF
)
from .read import fd_reader, file_reader, memory_reader
EXTRACT_OWNER = 0x0001
EXTRACT_PERM = 0x0002
EXTRACT_TIME = 0x0004
EXTRACT_NO_OVERWRITE = 0x0008
EXTRACT_UNLINK = 0x0010
EXTRACT_ACL = 0x0020
EXTRACT_FFLAGS = 0x0040
EXTRACT_XATTR = 0x0080
EXTRACT_SECURE_SYMLINKS = 0x0100
EXTRACT_SECURE_NODOTDOT = 0x0200
EXTRACT_NO_AUTODIR = 0x0400
EXTRACT_NO_OVERWRITE_NEWER = 0x0800
EXTRACT_SPARSE = 0x1000
EXTRACT_MAC_METADATA = 0x2000
EXTRACT_NO_HFS_COMPRESSION = 0x4000
EXTRACT_HFS_COMPRESSION_FORCED = 0x8000
EXTRACT_SECURE_NOABSOLUTEPATHS = 0x10000
EXTRACT_CLEAR_NOCHANGE_FFLAGS = 0x20000
PREVENT_ESCAPE = (
EXTRACT_SECURE_NOABSOLUTEPATHS |
EXTRACT_SECURE_NODOTDOT |
EXTRACT_SECURE_SYMLINKS
)
@contextmanager
def new_archive_write_disk(flags):
archive_p = write_disk_new()
write_disk_set_options(archive_p, flags)
try:
yield archive_p
finally:
write_free(archive_p)
def extract_entries(entries, flags=None):
"""Extracts the given archive entries into the current directory.
"""
if flags is None:
if os.getcwd() == '/':
# If the current directory is the root, then trying to prevent
# escaping is probably undesirable.
flags = 0
else:
flags = PREVENT_ESCAPE
buff, size, offset = c_void_p(), c_size_t(), c_longlong()
buff_p, size_p, offset_p = byref(buff), byref(size), byref(offset)
with new_archive_write_disk(flags) as write_p:
for entry in entries:
write_header(write_p, entry._entry_p)
read_p = entry._archive_p
while 1:
r = read_data_block(read_p, buff_p, size_p, offset_p)
if r == ARCHIVE_EOF:
break
write_data_block(write_p, buff, size, offset)
write_finish_entry(write_p)
def extract_fd(fd, flags=None):
"""Extracts an archive from a file descriptor into the current directory.
"""
with fd_reader(fd) as archive:
extract_entries(archive, flags)
def extract_file(filepath, flags=None):
"""Extracts an archive from a file into the current directory."""
with file_reader(filepath) as archive:
extract_entries(archive, flags)
def extract_memory(buffer_, flags=None):
"""Extracts an archive from memory into the current directory."""
with memory_reader(buffer_) as archive:
extract_entries(archive, flags)

View File

@ -0,0 +1,364 @@
from ctypes import (
c_char_p, c_int, c_uint, c_long, c_longlong, c_size_t, c_int64,
c_void_p, c_wchar_p, CFUNCTYPE, POINTER,
)
try:
from ctypes import c_ssize_t
except ImportError:
from ctypes import c_longlong as c_ssize_t
import ctypes
from ctypes.util import find_library
import logging
import mmap
import os
import sysconfig
from .exception import ArchiveError
logger = logging.getLogger('libarchive')
page_size = mmap.PAGESIZE
libarchive_path = os.environ.get('LIBARCHIVE') or find_library('archive')
libarchive = ctypes.cdll.LoadLibrary(libarchive_path)
# Constants
ARCHIVE_EOF = 1 # Found end of archive.
ARCHIVE_OK = 0 # Operation was successful.
ARCHIVE_RETRY = -10 # Retry might succeed.
ARCHIVE_WARN = -20 # Partial success.
ARCHIVE_FAILED = -25 # Current operation cannot complete.
ARCHIVE_FATAL = -30 # No more operations are possible.
# Callback types
WRITE_CALLBACK = CFUNCTYPE(
c_ssize_t, c_void_p, c_void_p, POINTER(c_void_p), c_size_t
)
READ_CALLBACK = CFUNCTYPE(
c_ssize_t, c_void_p, c_void_p, POINTER(c_void_p)
)
SEEK_CALLBACK = CFUNCTYPE(
c_longlong, c_void_p, c_void_p, c_longlong, c_int
)
OPEN_CALLBACK = CFUNCTYPE(c_int, c_void_p, c_void_p)
CLOSE_CALLBACK = CFUNCTYPE(c_int, c_void_p, c_void_p)
NO_OPEN_CB = ctypes.cast(None, OPEN_CALLBACK)
NO_CLOSE_CB = ctypes.cast(None, CLOSE_CALLBACK)
# Type aliases, for readability
c_archive_p = c_void_p
c_archive_entry_p = c_void_p
if sysconfig.get_config_var('SIZEOF_TIME_T') == 8:
c_time_t = c_int64
else:
c_time_t = c_long
# Helper functions
def _error_string(archive_p):
msg = error_string(archive_p)
if msg is None:
return
try:
return msg.decode('ascii')
except UnicodeDecodeError:
return msg
def archive_error(archive_p, retcode):
msg = _error_string(archive_p)
return ArchiveError(msg, errno(archive_p), retcode, archive_p)
def check_null(ret, func, args):
if ret is None:
raise ArchiveError(func.__name__+' returned NULL')
return ret
def check_int(retcode, func, args):
if retcode >= 0:
return retcode
elif retcode == ARCHIVE_WARN:
logger.warning(_error_string(args[0]))
return retcode
else:
raise archive_error(args[0], retcode)
def ffi(name, argtypes, restype, errcheck=None):
f = getattr(libarchive, 'archive_'+name)
f.argtypes = argtypes
f.restype = restype
if errcheck:
f.errcheck = errcheck
globals()[name] = f
return f
def get_read_format_function(format_name):
function_name = 'read_support_format_' + format_name
func = globals().get(function_name)
if func:
return func
try:
return ffi(function_name, [c_archive_p], c_int, check_int)
except AttributeError:
raise ValueError('the read format %r is not available' % format_name)
def get_read_filter_function(filter_name):
function_name = 'read_support_filter_' + filter_name
func = globals().get(function_name)
if func:
return func
try:
return ffi(function_name, [c_archive_p], c_int, check_int)
except AttributeError:
raise ValueError('the read filter %r is not available' % filter_name)
def get_write_format_function(format_name):
function_name = 'write_set_format_' + format_name
func = globals().get(function_name)
if func:
return func
try:
return ffi(function_name, [c_archive_p], c_int, check_int)
except AttributeError:
raise ValueError('the write format %r is not available' % format_name)
def get_write_filter_function(filter_name):
function_name = 'write_add_filter_' + filter_name
func = globals().get(function_name)
if func:
return func
try:
return ffi(function_name, [c_archive_p], c_int, check_int)
except AttributeError:
raise ValueError('the write filter %r is not available' % filter_name)
# FFI declarations
# library version
version_number = ffi('version_number', [], c_int, check_int)
# archive_util
errno = ffi('errno', [c_archive_p], c_int)
error_string = ffi('error_string', [c_archive_p], c_char_p)
ffi('filter_bytes', [c_archive_p, c_int], c_longlong)
ffi('filter_count', [c_archive_p], c_int)
ffi('filter_name', [c_archive_p, c_int], c_char_p)
ffi('format_name', [c_archive_p], c_char_p)
# archive_entry
ffi('entry_new', [], c_archive_entry_p, check_null)
ffi('entry_filetype', [c_archive_entry_p], c_int)
ffi('entry_atime', [c_archive_entry_p], c_time_t)
ffi('entry_birthtime', [c_archive_entry_p], c_time_t)
ffi('entry_mtime', [c_archive_entry_p], c_time_t)
ffi('entry_ctime', [c_archive_entry_p], c_time_t)
ffi('entry_atime_nsec', [c_archive_entry_p], c_long)
ffi('entry_birthtime_nsec', [c_archive_entry_p], c_long)
ffi('entry_mtime_nsec', [c_archive_entry_p], c_long)
ffi('entry_ctime_nsec', [c_archive_entry_p], c_long)
ffi('entry_atime_is_set', [c_archive_entry_p], c_int)
ffi('entry_birthtime_is_set', [c_archive_entry_p], c_int)
ffi('entry_mtime_is_set', [c_archive_entry_p], c_int)
ffi('entry_ctime_is_set', [c_archive_entry_p], c_int)
ffi('entry_pathname', [c_archive_entry_p], c_char_p)
ffi('entry_pathname_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_sourcepath', [c_archive_entry_p], c_char_p)
ffi('entry_size', [c_archive_entry_p], c_longlong)
ffi('entry_size_is_set', [c_archive_entry_p], c_int)
ffi('entry_mode', [c_archive_entry_p], c_int)
ffi('entry_strmode', [c_archive_entry_p], c_char_p)
ffi('entry_perm', [c_archive_entry_p], c_int)
ffi('entry_hardlink', [c_archive_entry_p], c_char_p)
ffi('entry_hardlink_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_symlink', [c_archive_entry_p], c_char_p)
ffi('entry_symlink_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_rdev', [c_archive_entry_p], c_uint)
ffi('entry_rdevmajor', [c_archive_entry_p], c_uint)
ffi('entry_rdevminor', [c_archive_entry_p], c_uint)
ffi('entry_uid', [c_archive_entry_p], c_longlong)
ffi('entry_gid', [c_archive_entry_p], c_longlong)
ffi('entry_uname', [c_archive_entry_p], c_char_p)
ffi('entry_gname', [c_archive_entry_p], c_char_p)
ffi('entry_uname_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_gname_w', [c_archive_entry_p], c_wchar_p)
ffi('entry_set_size', [c_archive_entry_p, c_longlong], None)
ffi('entry_set_filetype', [c_archive_entry_p, c_uint], None)
ffi('entry_set_uid', [c_archive_entry_p, c_longlong], None)
ffi('entry_set_gid', [c_archive_entry_p, c_longlong], None)
ffi('entry_set_mode', [c_archive_entry_p, c_int], None)
ffi('entry_set_perm', [c_archive_entry_p, c_int], None)
ffi('entry_set_atime', [c_archive_entry_p, c_time_t, c_long], None)
ffi('entry_set_mtime', [c_archive_entry_p, c_time_t, c_long], None)
ffi('entry_set_ctime', [c_archive_entry_p, c_time_t, c_long], None)
ffi('entry_set_birthtime', [c_archive_entry_p, c_time_t, c_long], None)
ffi('entry_set_rdev', [c_archive_entry_p, c_uint], None)
ffi('entry_set_rdevmajor', [c_archive_entry_p, c_uint], None)
ffi('entry_set_rdevminor', [c_archive_entry_p, c_uint], None)
ffi('entry_unset_size', [c_archive_entry_p], None)
ffi('entry_unset_atime', [c_archive_entry_p], None)
ffi('entry_unset_mtime', [c_archive_entry_p], None)
ffi('entry_unset_ctime', [c_archive_entry_p], None)
ffi('entry_unset_birthtime', [c_archive_entry_p], None)
ffi('entry_copy_pathname', [c_archive_entry_p, c_char_p], None)
ffi('entry_update_pathname_utf8', [c_archive_entry_p, c_char_p], c_int, check_int)
ffi('entry_copy_link', [c_archive_entry_p, c_char_p], None)
ffi('entry_update_link_utf8', [c_archive_entry_p, c_char_p], c_int, check_int)
ffi('entry_copy_uname', [c_archive_entry_p, c_char_p], None)
ffi('entry_update_uname_utf8', [c_archive_entry_p, c_char_p], c_int, check_int)
ffi('entry_copy_gname', [c_archive_entry_p, c_char_p], None)
ffi('entry_update_gname_utf8', [c_archive_entry_p, c_char_p], c_int, check_int)
ffi('entry_clear', [c_archive_entry_p], c_archive_entry_p)
ffi('entry_free', [c_archive_entry_p], None)
# archive_read
ffi('read_new', [], c_archive_p, check_null)
READ_FORMATS = set((
'7zip', 'all', 'ar', 'cab', 'cpio', 'empty', 'iso9660', 'lha', 'mtree',
'rar', 'raw', 'tar', 'xar', 'zip', 'warc'
))
for f_name in list(READ_FORMATS):
try:
get_read_format_function(f_name)
except ValueError as e: # pragma: no cover
logger.info(str(e))
READ_FORMATS.remove(f_name)
READ_FILTERS = set((
'all', 'bzip2', 'compress', 'grzip', 'gzip', 'lrzip', 'lzip', 'lzma',
'lzop', 'none', 'rpm', 'uu', 'xz', 'lz4', 'zstd'
))
for f_name in list(READ_FILTERS):
try:
get_read_filter_function(f_name)
except ValueError as e: # pragma: no cover
logger.info(str(e))
READ_FILTERS.remove(f_name)
ffi('read_set_seek_callback', [c_archive_p, SEEK_CALLBACK], c_int, check_int)
ffi('read_open',
[c_archive_p, c_void_p, OPEN_CALLBACK, READ_CALLBACK, CLOSE_CALLBACK],
c_int, check_int)
ffi('read_open_fd', [c_archive_p, c_int, c_size_t], c_int, check_int)
ffi('read_open_filename_w', [c_archive_p, c_wchar_p, c_size_t],
c_int, check_int)
ffi('read_open_memory', [c_archive_p, c_void_p, c_size_t], c_int, check_int)
ffi('read_next_header', [c_archive_p, POINTER(c_void_p)], c_int, check_int)
ffi('read_next_header2', [c_archive_p, c_void_p], c_int, check_int)
ffi('read_close', [c_archive_p], c_int, check_int)
ffi('read_free', [c_archive_p], c_int, check_int)
# archive_read_disk
ffi('read_disk_new', [], c_archive_p, check_null)
ffi('read_disk_set_behavior', [c_archive_p, c_int], c_int, check_int)
ffi('read_disk_set_standard_lookup', [c_archive_p], c_int, check_int)
ffi('read_disk_open', [c_archive_p, c_char_p], c_int, check_int)
ffi('read_disk_open_w', [c_archive_p, c_wchar_p], c_int, check_int)
ffi('read_disk_descend', [c_archive_p], c_int, check_int)
# archive_read_data
ffi('read_data_block',
[c_archive_p, POINTER(c_void_p), POINTER(c_size_t), POINTER(c_longlong)],
c_int, check_int)
ffi('read_data', [c_archive_p, c_void_p, c_size_t], c_ssize_t, check_int)
ffi('read_data_skip', [c_archive_p], c_int, check_int)
# archive_write
ffi('write_new', [], c_archive_p, check_null)
ffi('write_set_options', [c_archive_p, c_char_p], c_int, check_int)
ffi('write_disk_new', [], c_archive_p, check_null)
ffi('write_disk_set_options', [c_archive_p, c_int], c_int, check_int)
WRITE_FORMATS = set((
'7zip', 'ar_bsd', 'ar_svr4', 'cpio', 'cpio_newc', 'gnutar', 'iso9660',
'mtree', 'mtree_classic', 'pax', 'pax_restricted', 'shar', 'shar_dump',
'ustar', 'v7tar', 'xar', 'zip', 'warc'
))
for f_name in list(WRITE_FORMATS):
try:
get_write_format_function(f_name)
except ValueError as e: # pragma: no cover
logger.info(str(e))
WRITE_FORMATS.remove(f_name)
WRITE_FILTERS = set((
'b64encode', 'bzip2', 'compress', 'grzip', 'gzip', 'lrzip', 'lzip', 'lzma',
'lzop', 'uuencode', 'xz', 'lz4', 'zstd'
))
for f_name in list(WRITE_FILTERS):
try:
get_write_filter_function(f_name)
except ValueError as e: # pragma: no cover
logger.info(str(e))
WRITE_FILTERS.remove(f_name)
ffi('write_open',
[c_archive_p, c_void_p, OPEN_CALLBACK, WRITE_CALLBACK, CLOSE_CALLBACK],
c_int, check_int)
ffi('write_open_fd', [c_archive_p, c_int], c_int, check_int)
ffi('write_open_filename', [c_archive_p, c_char_p], c_int, check_int)
ffi('write_open_filename_w', [c_archive_p, c_wchar_p], c_int, check_int)
ffi('write_open_memory',
[c_archive_p, c_void_p, c_size_t, POINTER(c_size_t)],
c_int, check_int)
ffi('write_get_bytes_in_last_block', [c_archive_p], c_int, check_int)
ffi('write_get_bytes_per_block', [c_archive_p], c_int, check_int)
ffi('write_set_bytes_in_last_block', [c_archive_p, c_int], c_int, check_int)
ffi('write_set_bytes_per_block', [c_archive_p, c_int], c_int, check_int)
ffi('write_header', [c_archive_p, c_void_p], c_int, check_int)
ffi('write_data', [c_archive_p, c_void_p, c_size_t], c_ssize_t, check_int)
ffi('write_data_block', [c_archive_p, c_void_p, c_size_t, c_longlong],
c_int, check_int)
ffi('write_finish_entry', [c_archive_p], c_int, check_int)
ffi('write_fail', [c_archive_p], c_int, check_int)
ffi('write_close', [c_archive_p], c_int, check_int)
ffi('write_free', [c_archive_p], c_int, check_int)
# archive encryption
try:
ffi('read_add_passphrase', [c_archive_p, c_char_p], c_int, check_int)
ffi('write_set_passphrase', [c_archive_p, c_char_p], c_int, check_int)
except AttributeError:
logger.info(
f"the libarchive being used (version {version_number()}, "
f"path {libarchive_path}) doesn't support encryption"
)

View File

@ -0,0 +1,7 @@
READDISK_RESTORE_ATIME = 0x0001
READDISK_HONOR_NODUMP = 0x0002
READDISK_MAC_COPYFILE = 0x0004
READDISK_NO_TRAVERSE_MOUNTS = 0x0008
READDISK_NO_XATTR = 0x0010
READDISK_NO_ACL = 0x0020
READDISK_NO_FFLAGS = 0x0040

View File

@ -0,0 +1,176 @@
from contextlib import contextmanager
from ctypes import cast, c_void_p, POINTER, create_string_buffer
from os import fstat, stat
from . import ffi
from .ffi import (
ARCHIVE_EOF, OPEN_CALLBACK, READ_CALLBACK, CLOSE_CALLBACK, SEEK_CALLBACK,
NO_OPEN_CB, NO_CLOSE_CB, page_size,
)
from .entry import ArchiveEntry, PassedArchiveEntry
class ArchiveRead:
def __init__(self, archive_p, header_codec='utf-8'):
self._pointer = archive_p
self.header_codec = header_codec
def __iter__(self):
"""Iterates through an archive's entries.
"""
archive_p = self._pointer
header_codec = self.header_codec
read_next_header2 = ffi.read_next_header2
while 1:
entry = ArchiveEntry(archive_p, header_codec)
r = read_next_header2(archive_p, entry._entry_p)
if r == ARCHIVE_EOF:
return
yield entry
entry.__class__ = PassedArchiveEntry
@property
def bytes_read(self):
return ffi.filter_bytes(self._pointer, -1)
@property
def filter_names(self):
count = ffi.filter_count(self._pointer)
return [ffi.filter_name(self._pointer, i) for i in range(count - 1)]
@property
def format_name(self):
return ffi.format_name(self._pointer)
@contextmanager
def new_archive_read(format_name='all', filter_name='all', passphrase=None):
"""Creates an archive struct suitable for reading from an archive.
Returns a pointer if successful. Raises ArchiveError on error.
"""
archive_p = ffi.read_new()
try:
if passphrase:
if not isinstance(passphrase, bytes):
passphrase = passphrase.encode('utf-8')
try:
ffi.read_add_passphrase(archive_p, passphrase)
except AttributeError:
raise NotImplementedError(
f"the libarchive being used (version {ffi.version_number()}, "
f"path {ffi.libarchive_path}) doesn't support encryption"
)
ffi.get_read_filter_function(filter_name)(archive_p)
ffi.get_read_format_function(format_name)(archive_p)
yield archive_p
finally:
ffi.read_free(archive_p)
@contextmanager
def custom_reader(
read_func, format_name='all', filter_name='all',
open_func=None, seek_func=None, close_func=None,
block_size=page_size, archive_read_class=ArchiveRead, passphrase=None,
header_codec='utf-8',
):
"""Read an archive using a custom function.
"""
open_cb = OPEN_CALLBACK(open_func) if open_func else NO_OPEN_CB
read_cb = READ_CALLBACK(read_func)
close_cb = CLOSE_CALLBACK(close_func) if close_func else NO_CLOSE_CB
seek_cb = SEEK_CALLBACK(seek_func)
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
if seek_func:
ffi.read_set_seek_callback(archive_p, seek_cb)
ffi.read_open(archive_p, None, open_cb, read_cb, close_cb)
yield archive_read_class(archive_p, header_codec)
@contextmanager
def fd_reader(
fd, format_name='all', filter_name='all', block_size=4096, passphrase=None,
header_codec='utf-8',
):
"""Read an archive from a file descriptor.
"""
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
try:
block_size = fstat(fd).st_blksize
except (OSError, AttributeError): # pragma: no cover
pass
ffi.read_open_fd(archive_p, fd, block_size)
yield ArchiveRead(archive_p, header_codec)
@contextmanager
def file_reader(
path, format_name='all', filter_name='all', block_size=4096, passphrase=None,
header_codec='utf-8',
):
"""Read an archive from a file.
"""
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
try:
block_size = stat(path).st_blksize
except (OSError, AttributeError): # pragma: no cover
pass
ffi.read_open_filename_w(archive_p, path, block_size)
yield ArchiveRead(archive_p, header_codec)
@contextmanager
def memory_reader(
buf, format_name='all', filter_name='all', passphrase=None,
header_codec='utf-8',
):
"""Read an archive from memory.
"""
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
ffi.read_open_memory(archive_p, cast(buf, c_void_p), len(buf))
yield ArchiveRead(archive_p, header_codec)
@contextmanager
def stream_reader(
stream, format_name='all', filter_name='all', block_size=page_size,
passphrase=None, header_codec='utf-8',
):
"""Read an archive from a stream.
The `stream` object must support the standard `readinto` method.
If `stream.seekable()` returns `True`, then an appropriate seek callback is
passed to libarchive.
"""
buf = create_string_buffer(block_size)
buf_p = cast(buf, c_void_p)
def read_func(archive_p, context, ptrptr):
# readinto the buffer, returns number of bytes read
length = stream.readinto(buf)
# write the address of the buffer into the pointer
ptrptr = cast(ptrptr, POINTER(c_void_p))
ptrptr[0] = buf_p
# tell libarchive how much data was written into the buffer
return length
def seek_func(archive_p, context, offset, whence):
stream.seek(offset, whence)
# tell libarchive the current position
return stream.tell()
open_cb = NO_OPEN_CB
read_cb = READ_CALLBACK(read_func)
close_cb = NO_CLOSE_CB
seek_cb = SEEK_CALLBACK(seek_func)
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
if stream.seekable():
ffi.read_set_seek_callback(archive_p, seek_cb)
ffi.read_open(archive_p, None, open_cb, read_cb, close_cb)
yield ArchiveRead(archive_p, header_codec)
seekable_stream_reader = stream_reader

View File

@ -0,0 +1,279 @@
from contextlib import contextmanager
from ctypes import byref, cast, c_char, c_size_t, c_void_p, POINTER
from posixpath import join
import warnings
from . import ffi
from .entry import ArchiveEntry, FileType
from .ffi import (
OPEN_CALLBACK, WRITE_CALLBACK, CLOSE_CALLBACK, NO_OPEN_CB, NO_CLOSE_CB,
ARCHIVE_EOF,
page_size, entry_sourcepath, entry_clear, read_disk_new, read_disk_open_w,
read_next_header2, read_disk_descend, read_free, write_header, write_data,
write_finish_entry,
read_disk_set_behavior
)
@contextmanager
def new_archive_read_disk(path, flags=0, lookup=False):
archive_p = read_disk_new()
read_disk_set_behavior(archive_p, flags)
if lookup:
ffi.read_disk_set_standard_lookup(archive_p)
read_disk_open_w(archive_p, path)
try:
yield archive_p
finally:
read_free(archive_p)
class ArchiveWrite:
def __init__(self, archive_p, header_codec='utf-8'):
self._pointer = archive_p
self.header_codec = header_codec
def add_entries(self, entries):
"""Add the given entries to the archive.
"""
write_p = self._pointer
for entry in entries:
write_header(write_p, entry._entry_p)
for block in entry.get_blocks():
write_data(write_p, block, len(block))
write_finish_entry(write_p)
def add_files(
self, *paths, flags=0, lookup=False, pathname=None, recursive=True,
**attributes
):
"""Read files through the OS and add them to the archive.
Args:
paths (str): the paths of the files to add to the archive
flags (int):
passed to the C function `archive_read_disk_set_behavior`;
use the `libarchive.flags.READDISK_*` constants
lookup (bool):
when True, the C function `archive_read_disk_set_standard_lookup`
is called to enable the lookup of user and group names
pathname (str | None):
the path of the file in the archive, defaults to the source path
recursive (bool):
when False, if a path in `paths` is a directory,
only the directory itself is added.
attributes (dict): passed to `ArchiveEntry.modify()`
Raises:
ArchiveError: if a file doesn't exist or can't be accessed, or if
adding it to the archive fails
"""
write_p = self._pointer
block_size = ffi.write_get_bytes_per_block(write_p)
if block_size <= 0:
block_size = 10240 # pragma: no cover
entry = ArchiveEntry(header_codec=self.header_codec)
entry_p = entry._entry_p
destination_path = attributes.pop('pathname', None)
for path in paths:
with new_archive_read_disk(path, flags, lookup) as read_p:
while 1:
r = read_next_header2(read_p, entry_p)
if r == ARCHIVE_EOF:
break
entry_path = entry.pathname
if destination_path:
if entry_path == path:
entry_path = destination_path
else:
assert entry_path.startswith(path)
entry_path = join(
destination_path,
entry_path[len(path):].lstrip('/')
)
entry.pathname = entry_path.lstrip('/')
if attributes:
entry.modify(**attributes)
read_disk_descend(read_p)
write_header(write_p, entry_p)
if entry.isreg:
with open(entry_sourcepath(entry_p), 'rb') as f:
while 1:
data = f.read(block_size)
if not data:
break
write_data(write_p, data, len(data))
write_finish_entry(write_p)
entry_clear(entry_p)
if not recursive:
break
def add_file(self, path, **kw):
"Single-path alias of `add_files()`"
return self.add_files(path, **kw)
def add_file_from_memory(
self, entry_path, entry_size, entry_data,
filetype=FileType.REGULAR_FILE, permission=0o664,
**other_attributes
):
""""Add file from memory to archive.
Args:
entry_path (str | bytes): the file's path
entry_size (int): the file's size, in bytes
entry_data (bytes | Iterable[bytes]): the file's content
filetype (int): see `libarchive.entry.ArchiveEntry.modify()`
permission (int): see `libarchive.entry.ArchiveEntry.modify()`
other_attributes: see `libarchive.entry.ArchiveEntry.modify()`
"""
archive_pointer = self._pointer
if isinstance(entry_data, bytes):
entry_data = (entry_data,)
elif isinstance(entry_data, str):
raise TypeError(
"entry_data: expected bytes, got %r" % type(entry_data)
)
entry = ArchiveEntry(
pathname=entry_path, size=entry_size, filetype=filetype,
perm=permission, header_codec=self.header_codec,
**other_attributes
)
write_header(archive_pointer, entry._entry_p)
for chunk in entry_data:
if not chunk:
break
write_data(archive_pointer, chunk, len(chunk))
write_finish_entry(archive_pointer)
@contextmanager
def new_archive_write(format_name, filter_name=None, options='', passphrase=None):
archive_p = ffi.write_new()
try:
ffi.get_write_format_function(format_name)(archive_p)
if filter_name:
ffi.get_write_filter_function(filter_name)(archive_p)
if passphrase and 'encryption' not in options:
if format_name == 'zip':
warnings.warn(
"The default encryption scheme of zip archives is weak. "
"Use `options='encryption=$type'` to specify the encryption "
"type you want to use. The supported values are 'zipcrypt' "
"(the weak default), 'aes128' and 'aes256'."
)
options += ',encryption' if options else 'encryption'
if options:
if not isinstance(options, bytes):
options = options.encode('utf-8')
ffi.write_set_options(archive_p, options)
if passphrase:
if not isinstance(passphrase, bytes):
passphrase = passphrase.encode('utf-8')
try:
ffi.write_set_passphrase(archive_p, passphrase)
except AttributeError:
raise NotImplementedError(
f"the libarchive being used (version {ffi.version_number()}, "
f"path {ffi.libarchive_path}) doesn't support encryption"
)
yield archive_p
ffi.write_close(archive_p)
ffi.write_free(archive_p)
except Exception:
ffi.write_fail(archive_p)
ffi.write_free(archive_p)
raise
@property
def bytes_written(self):
return ffi.filter_bytes(self._pointer, -1)
@contextmanager
def custom_writer(
write_func, format_name, filter_name=None,
open_func=None, close_func=None, block_size=page_size,
archive_write_class=ArchiveWrite, options='', passphrase=None,
header_codec='utf-8',
):
"""Create an archive and send it in chunks to the `write_func` function.
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
`libarchive.ffi` module.
"""
def write_cb_internal(archive_p, context, buffer_, length):
data = cast(buffer_, POINTER(c_char * length))[0]
return write_func(data)
open_cb = OPEN_CALLBACK(open_func) if open_func else NO_OPEN_CB
write_cb = WRITE_CALLBACK(write_cb_internal)
close_cb = CLOSE_CALLBACK(close_func) if close_func else NO_CLOSE_CB
with new_archive_write(format_name, filter_name, options,
passphrase) as archive_p:
ffi.write_set_bytes_in_last_block(archive_p, 1)
ffi.write_set_bytes_per_block(archive_p, block_size)
ffi.write_open(archive_p, None, open_cb, write_cb, close_cb)
yield archive_write_class(archive_p, header_codec)
@contextmanager
def fd_writer(
fd, format_name, filter_name=None,
archive_write_class=ArchiveWrite, options='', passphrase=None,
header_codec='utf-8',
):
"""Create an archive and write it into a file descriptor.
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
`libarchive.ffi` module.
"""
with new_archive_write(format_name, filter_name, options,
passphrase) as archive_p:
ffi.write_open_fd(archive_p, fd)
yield archive_write_class(archive_p, header_codec)
@contextmanager
def file_writer(
filepath, format_name, filter_name=None,
archive_write_class=ArchiveWrite, options='', passphrase=None,
header_codec='utf-8',
):
"""Create an archive and write it into a file.
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
`libarchive.ffi` module.
"""
with new_archive_write(format_name, filter_name, options,
passphrase) as archive_p:
ffi.write_open_filename_w(archive_p, filepath)
yield archive_write_class(archive_p, header_codec)
@contextmanager
def memory_writer(
buf, format_name, filter_name=None,
archive_write_class=ArchiveWrite, options='', passphrase=None,
header_codec='utf-8',
):
"""Create an archive and write it into a buffer.
For formats and filters, see `WRITE_FORMATS` and `WRITE_FILTERS` in the
`libarchive.ffi` module.
"""
with new_archive_write(format_name, filter_name, options,
passphrase) as archive_p:
used = byref(c_size_t())
buf_p = cast(buf, c_void_p)
ffi.write_open_memory(archive_p, buf_p, len(buf), used)
yield archive_write_class(archive_p, header_codec)

View File

@ -0,0 +1,12 @@
[wheel]
universal = 1
[flake8]
exclude = .?*,env*/
ignore = E226,E731,W504
max-line-length = 85
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,25 @@
import os
from os.path import join, dirname
from setuptools import setup, find_packages
from version import get_version
os.umask(0o022)
with open(join(dirname(__file__), 'README.rst'), encoding="utf-8") as f:
README = f.read()
setup(
name='libarchive-c',
version=get_version(),
description='Python interface to libarchive',
author='Changaco',
author_email='changaco@changaco.oy.lc',
url='https://github.com/Changaco/python-libarchive-c',
license='CC0',
packages=find_packages(exclude=['tests']),
long_description=README,
long_description_content_type='text/x-rst',
keywords='archive libarchive 7z tar bz2 zip gz',
)

View File

@ -0,0 +1,136 @@
from contextlib import closing, contextmanager
from copy import copy
from os import chdir, getcwd, stat, walk
from os.path import abspath, dirname, join
from stat import S_ISREG
import tarfile
try:
from stat import filemode
except ImportError: # Python 2
filemode = tarfile.filemode
from libarchive import file_reader
data_dir = join(dirname(__file__), 'data')
def check_archive(archive, tree):
tree2 = copy(tree)
for e in archive:
epath = str(e).rstrip('/')
assert epath in tree2
estat = tree2.pop(epath)
assert e.mtime == int(estat['mtime'])
if not e.isdir:
size = e.size
if size is not None:
assert size == estat['size']
with open(epath, 'rb') as f:
for block in e.get_blocks():
assert f.read(len(block)) == block
leftover = f.read()
assert not leftover
# Check that there are no missing directories or files
assert len(tree2) == 0
def get_entries(location):
"""
Using the archive file at `location`, return an iterable of name->value
mappings for each libarchive.ArchiveEntry objects essential attributes.
Paths are base64-encoded because JSON is UTF-8 and cannot handle
arbitrary binary pathdata.
"""
with file_reader(location) as arch:
for entry in arch:
# libarchive introduces prefixes such as h prefix for
# hardlinks: tarfile does not, so we ignore the first char
mode = entry.strmode[1:].decode('ascii')
yield {
'path': surrogate_decode(entry.pathname),
'mtime': entry.mtime,
'size': entry.size,
'mode': mode,
'isreg': entry.isreg,
'isdir': entry.isdir,
'islnk': entry.islnk,
'issym': entry.issym,
'linkpath': surrogate_decode(entry.linkpath),
'isblk': entry.isblk,
'ischr': entry.ischr,
'isfifo': entry.isfifo,
'isdev': entry.isdev,
'uid': entry.uid,
'gid': entry.gid
}
def get_tarinfos(location):
"""
Using the tar archive file at `location`, return an iterable of
name->value mappings for each tarfile.TarInfo objects essential
attributes.
Paths are base64-encoded because JSON is UTF-8 and cannot handle
arbitrary binary pathdata.
"""
with closing(tarfile.open(location)) as tar:
for entry in tar:
path = surrogate_decode(entry.path or '')
if entry.isdir() and not path.endswith('/'):
path += '/'
# libarchive introduces prefixes such as h prefix for
# hardlinks: tarfile does not, so we ignore the first char
mode = filemode(entry.mode)[1:]
yield {
'path': path,
'mtime': entry.mtime,
'size': entry.size,
'mode': mode,
'isreg': entry.isreg(),
'isdir': entry.isdir(),
'islnk': entry.islnk(),
'issym': entry.issym(),
'linkpath': surrogate_decode(entry.linkpath or None),
'isblk': entry.isblk(),
'ischr': entry.ischr(),
'isfifo': entry.isfifo(),
'isdev': entry.isdev(),
'uid': entry.uid,
'gid': entry.gid
}
@contextmanager
def in_dir(dirpath):
prev = abspath(getcwd())
chdir(dirpath)
try:
yield
finally:
chdir(prev)
def stat_dict(path):
keys = set(('uid', 'gid', 'mtime'))
mode, _, _, _, uid, gid, size, _, mtime, _ = stat(path)
if S_ISREG(mode):
keys.add('size')
return {k: v for k, v in locals().items() if k in keys}
def treestat(d, stat_dict=stat_dict):
r = {}
for dirpath, dirnames, filenames in walk(d):
r[dirpath] = stat_dict(dirpath)
for fname in filenames:
fpath = join(dirpath, fname)
r[fpath] = stat_dict(fpath)
return r
def surrogate_decode(o):
if isinstance(o, bytes):
return o.decode('utf8', errors='surrogateescape')
return o

View File

@ -0,0 +1,3 @@
This test file is borrowed from Python codebase and test suite.
This is a trick Tar with several weird and malformed entries:
https://hg.python.org/cpython/file/bff88c866886/Lib/test/testtar.tar

View File

@ -0,0 +1,665 @@
[
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/conttype",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/regtype",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxr-xr-x",
"mtime": 1041808783,
"path": "ustar/dirtype/",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxr-xr-x",
"mtime": 1041808783,
"path": "ustar/dirtype-with-size/",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": true,
"isreg": false,
"issym": false,
"linkpath": "ustar/regtype",
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/lnktype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": true,
"linkpath": "regtype",
"mode": "rwxrwxrwx",
"mtime": 1041808783,
"path": "ustar/symtype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": true,
"ischr": false,
"isdev": true,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rw-rw----",
"mtime": 1041808783,
"path": "ustar/blktype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": true,
"isdev": true,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rw-rw-rw-",
"mtime": 1041808783,
"path": "ustar/chrtype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": true,
"isdir": false,
"isfifo": true,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/fifotype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/sparse",
"size": 86016,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/umlauts-\udcc4\udcd6\udcdc\udce4\udcf6\udcfc\udcdf",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/12345/1234567/longname",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": true,
"linkpath": "../linktest1/regtype",
"mode": "rwxrwxrwx",
"mtime": 1041808783,
"path": "./ustar/linktest2/symtype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "ustar/linktest1/regtype",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": true,
"isreg": false,
"issym": false,
"linkpath": "./ustar/linktest1/regtype",
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "./ustar/linktest2/lnktype",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": true,
"linkpath": "ustar/regtype",
"mode": "rwxrwxrwx",
"mtime": 1041808783,
"path": "symtype2",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longname",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": true,
"isreg": false,
"issym": false,
"linkpath": "gnu/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longname",
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longlink",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/sparse",
"size": 86016,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/sparse-0.0",
"size": 86016,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/sparse-0.1",
"size": 86016,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/sparse-1.0",
"size": 86016,
"uid": 1000
},
{
"gid": 4294967295,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "gnu/regtype-gnu-uid",
"size": 7011,
"uid": 4294967295
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-old-v7",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-hpux-signed-chksum-\udcc4\udcd6\udcdc\udce4\udcf6\udcfc\udcdf",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-old-v7-signed-chksum-\udcc4\udcd6\udcdc\udce4\udcf6\udcfc\udcdf",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxr-xr-x",
"mtime": 1041808783,
"path": "misc/dirtype-old-v7/",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-suntar",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/regtype-xstar",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longname",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": true,
"isreg": false,
"issym": false,
"linkpath": "pax/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longname",
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/123/longlink",
"size": 0,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/umlauts-\u00c4\u00d6\u00dc\u00e4\u00f6\u00fc\u00df",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/regtype1",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/regtype2",
"size": 7011,
"uid": 1000
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/regtype3",
"size": 7011,
"uid": 1000
},
{
"gid": 123,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/regtype4",
"size": 7011,
"uid": 123
},
{
"gid": 1000,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/bad-pax-\udce4\udcf6\udcfc",
"size": 7011,
"uid": 1000
},
{
"gid": 0,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "pax/hdrcharset-\udce4\udcf6\udcfc",
"size": 7011,
"uid": 0
},
{
"gid": 100,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1041808783,
"path": "misc/eof",
"size": 0,
"uid": 1000
}
]

View File

@ -0,0 +1,53 @@
[
{
"gid": 513,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwx------",
"mtime": 1319027321,
"path": "2859/",
"size": 0,
"uid": 500
},
{
"gid": 513,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rwx------",
"mtime": 1319027194,
"path": "2859/Copy of h\u00e0nz\u00ec-somefile.txt",
"size": 0,
"uid": 500
},
{
"gid": 513,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rwx------",
"mtime": 1319027194,
"path": "2859/h\u00e0nz\u00ec?-somefile.txt ",
"size": 0,
"uid": 500
}
]

View File

@ -0,0 +1,36 @@
[
{
"gid": 1000,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxr-xr-x",
"mtime": 1268678396,
"path": "a/",
"size": 0,
"uid": 1000
},
{
"gid": 1000,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-r--r--",
"mtime": 1268678259,
"path": "a/gr\u00fcn.png",
"size": 362,
"uid": 1000
}
]

View File

@ -0,0 +1,36 @@
[
{
"gid": 0,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": true,
"isfifo": false,
"islnk": false,
"isreg": false,
"issym": false,
"linkpath": null,
"mode": "rwxrwxr-x",
"mtime": 1381752672,
"path": "a/",
"size": 0,
"uid": 0
},
{
"gid": 0,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-rw-r--",
"mtime": 1268681860,
"path": "a/gru\u0308n.png",
"size": 362,
"uid": 0
}
]

View File

@ -0,0 +1,3 @@
Test file from borrowed from
https://github.com/libarchive/libarchive/issues/459
http://libarchive.github.io/google-code/issue-350/comment-0/%ED%94%84%EB%A1%9C%EA%B7%B8%EB%9E%A8.zip

View File

@ -0,0 +1,36 @@
[
{
"gid": 502,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-rw-r--",
"mtime": 1390485689,
"path": "hello.txt",
"size": 14,
"uid": 502
},
{
"gid": 502,
"isblk": false,
"ischr": false,
"isdev": false,
"isdir": false,
"isfifo": false,
"islnk": false,
"isreg": true,
"issym": false,
"linkpath": null,
"mode": "rw-rw-r--",
"mtime": 1390485651,
"path": "\ud504\ub85c\uadf8\ub7a8.txt",
"size": 13,
"uid": 502
}
]

View File

@ -0,0 +1,127 @@
from copy import copy
from os import stat
from libarchive import (file_reader, file_writer, memory_reader, memory_writer)
import pytest
from . import treestat
# NOTE: zip does not support high resolution time data, but pax and others do
def check_atime_ctime(archive, tree, timefmt=int):
tree2 = copy(tree)
for entry in archive:
epath = str(entry).rstrip('/')
assert epath in tree2
estat = tree2.pop(epath)
assert entry.atime == timefmt(estat.st_atime)
assert entry.ctime == timefmt(estat.st_ctime)
def stat_dict(path):
# return the raw stat output, the tuple output only returns ints
return stat(path)
def time_check(time_tuple, timefmt):
seconds, nanos = time_tuple
maths = float(seconds) + float(nanos) / 1000000000.0
return timefmt(maths)
@pytest.mark.parametrize('archfmt,timefmt', [('zip', int), ('pax', float)])
def test_memory_atime_ctime(archfmt, timefmt):
# Collect information on what should be in the archive
tree = treestat('libarchive', stat_dict)
# Create an archive of our libarchive/ directory
buf = bytes(bytearray(1000000))
with memory_writer(buf, archfmt) as archive1:
archive1.add_files('libarchive/')
# Check the data
with memory_reader(buf) as archive2:
check_atime_ctime(archive2, tree, timefmt=timefmt)
@pytest.mark.parametrize('archfmt,timefmt', [('zip', int), ('pax', float)])
def test_file_atime_ctime(archfmt, timefmt, tmpdir):
archive_path = "{0}/test.{1}".format(tmpdir.strpath, archfmt)
# Collect information on what should be in the archive
tree = treestat('libarchive', stat_dict)
# Create an archive of our libarchive/ directory
with file_writer(archive_path, archfmt) as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
with file_reader(archive_path) as archive:
check_atime_ctime(archive, tree, timefmt=timefmt)
@pytest.mark.parametrize('archfmt,timefmt', [('zip', int), ('pax', float)])
def test_memory_time_setters(archfmt, timefmt):
has_birthtime = archfmt != 'zip'
# Create an archive of our libarchive/ directory
buf = bytes(bytearray(1000000))
with memory_writer(buf, archfmt) as archive1:
archive1.add_files('libarchive/')
atimestamp = (1482144741, 495628118)
mtimestamp = (1482155417, 659017086)
ctimestamp = (1482145211, 536858081)
btimestamp = (1482144740, 495628118)
buf2 = bytes(bytearray(1000000))
with memory_reader(buf) as archive1:
with memory_writer(buf2, archfmt) as archive2:
for entry in archive1:
entry.set_atime(*atimestamp)
entry.set_mtime(*mtimestamp)
entry.set_ctime(*ctimestamp)
if has_birthtime:
entry.set_birthtime(*btimestamp)
archive2.add_entries([entry])
with memory_reader(buf2) as archive2:
for entry in archive2:
assert entry.atime == time_check(atimestamp, timefmt)
assert entry.mtime == time_check(mtimestamp, timefmt)
assert entry.ctime == time_check(ctimestamp, timefmt)
if has_birthtime:
assert entry.birthtime == time_check(btimestamp, timefmt)
@pytest.mark.parametrize('archfmt,timefmt', [('zip', int), ('pax', float)])
def test_file_time_setters(archfmt, timefmt, tmpdir):
has_birthtime = archfmt != 'zip'
# Create an archive of our libarchive/ directory
archive_path = tmpdir.join('/test.{0}'.format(archfmt)).strpath
archive2_path = tmpdir.join('/test2.{0}'.format(archfmt)).strpath
with file_writer(archive_path, archfmt) as archive1:
archive1.add_files('libarchive/')
atimestamp = (1482144741, 495628118)
mtimestamp = (1482155417, 659017086)
ctimestamp = (1482145211, 536858081)
btimestamp = (1482144740, 495628118)
with file_reader(archive_path) as archive1:
with file_writer(archive2_path, archfmt) as archive2:
for entry in archive1:
entry.set_atime(*atimestamp)
entry.set_mtime(*mtimestamp)
entry.set_ctime(*ctimestamp)
if has_birthtime:
entry.set_birthtime(*btimestamp)
archive2.add_entries([entry])
with file_reader(archive2_path) as archive2:
for entry in archive2:
assert entry.atime == time_check(atimestamp, timefmt)
assert entry.mtime == time_check(mtimestamp, timefmt)
assert entry.ctime == time_check(ctimestamp, timefmt)
if has_birthtime:
assert entry.birthtime == time_check(btimestamp, timefmt)

View File

@ -0,0 +1,24 @@
from libarchive import memory_reader, memory_writer
from . import check_archive, treestat
def test_convert():
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
buf = bytes(bytearray(1000000))
with memory_writer(buf, 'gnutar', 'xz') as archive1:
archive1.add_files('libarchive/')
# Convert the archive to another format
buf2 = bytes(bytearray(1000000))
with memory_reader(buf) as archive1:
with memory_writer(buf2, 'zip') as archive2:
archive2.add_entries(archive1)
# Check the data
with memory_reader(buf2) as archive2:
check_archive(archive2, tree)

View File

@ -0,0 +1,151 @@
# -*- coding: utf-8 -*-
from codecs import open
import json
import locale
from os import environ, stat
from os.path import join
import unicodedata
import pytest
from libarchive import memory_reader, memory_writer
from libarchive.entry import ArchiveEntry, ConsumedArchiveEntry, PassedArchiveEntry
from . import data_dir, get_entries, get_tarinfos
text_type = unicode if str is bytes else str # noqa: F821
locale.setlocale(locale.LC_ALL, '')
# needed for sane time stamp comparison
environ['TZ'] = 'UTC'
def test_entry_properties():
buf = bytes(bytearray(1000000))
with memory_writer(buf, 'gnutar') as archive:
archive.add_files('README.rst')
readme_stat = stat('README.rst')
with memory_reader(buf) as archive:
for entry in archive:
assert entry.uid == readme_stat.st_uid
assert entry.gid == readme_stat.st_gid
assert entry.mode == readme_stat.st_mode
assert not entry.isblk
assert not entry.ischr
assert not entry.isdir
assert not entry.isfifo
assert not entry.islnk
assert not entry.issym
assert not entry.linkpath
assert entry.linkpath == entry.linkname
assert entry.isreg
assert entry.isfile
assert not entry.issock
assert not entry.isdev
assert b'rw' in entry.strmode
assert entry.pathname == entry.path
assert entry.pathname == entry.name
def test_check_ArchiveEntry_against_TarInfo():
for name in ('special.tar', 'tar_relative.tar'):
path = join(data_dir, name)
tarinfos = list(get_tarinfos(path))
entries = list(get_entries(path))
for tarinfo, entry in zip(tarinfos, entries):
assert tarinfo == entry
assert len(tarinfos) == len(entries)
def test_check_archiveentry_using_python_testtar():
check_entries(join(data_dir, 'testtar.tar'))
def test_check_archiveentry_with_unicode_and_binary_entries_tar():
check_entries(join(data_dir, 'unicode.tar'))
def test_check_archiveentry_with_unicode_and_binary_entries_zip():
check_entries(join(data_dir, 'unicode.zip'))
def test_check_archiveentry_with_unicode_and_binary_entries_zip2():
check_entries(join(data_dir, 'unicode2.zip'), ignore='mode')
def test_check_archiveentry_with_unicode_entries_and_name_zip():
check_entries(join(data_dir, '\ud504\ub85c\uadf8\ub7a8.zip'))
def check_entries(test_file, regen=False, ignore=''):
ignore = ignore.split()
fixture_file = test_file + '.json'
if regen:
entries = list(get_entries(test_file))
with open(fixture_file, 'w', encoding='UTF-8') as ex:
json.dump(entries, ex, indent=2, sort_keys=True)
with open(fixture_file, encoding='UTF-8') as ex:
expected = json.load(ex)
actual = list(get_entries(test_file))
for e1, e2 in zip(actual, expected):
for key in ignore:
e1.pop(key)
e2.pop(key)
# Normalize all unicode (can vary depending on the system)
for d in (e1, e2):
for key in d:
if isinstance(d[key], text_type):
d[key] = unicodedata.normalize('NFC', d[key])
assert e1 == e2
def test_the_life_cycle_of_archive_entries():
"""Check that `get_blocks` only works on the current entry, and only once.
"""
# Create a test archive in memory
buf = bytes(bytearray(10_000_000))
with memory_writer(buf, 'gnutar') as archive:
archive.add_files(
'README.rst',
'libarchive/__init__.py',
'libarchive/entry.py',
)
# Read multiple entries of the test archive and check how the evolve
with memory_reader(buf) as archive:
archive_iter = iter(archive)
entry1 = next(archive_iter)
assert type(entry1) is ArchiveEntry
for block in entry1.get_blocks():
pass
assert type(entry1) is ConsumedArchiveEntry
with pytest.raises(TypeError):
entry1.get_blocks()
entry2 = next(archive_iter)
assert type(entry2) is ArchiveEntry
assert type(entry1) is PassedArchiveEntry
with pytest.raises(TypeError):
entry1.get_blocks()
entry3 = next(archive_iter)
assert type(entry3) is ArchiveEntry
assert type(entry2) is PassedArchiveEntry
assert type(entry1) is PassedArchiveEntry
def test_non_ASCII_encoding_of_file_metadata():
buf = bytes(bytearray(100_000))
file_name = 'README.rst'
encoded_file_name = 'README.rst'.encode('cp037')
with memory_writer(buf, 'ustar', header_codec='cp037') as archive:
archive.add_file(file_name)
with memory_reader(buf) as archive:
entry = next(iter(archive))
assert entry.pathname == encoded_file_name
with memory_reader(buf, header_codec='cp037') as archive:
entry = next(iter(archive))
assert entry.pathname == file_name

View File

@ -0,0 +1,40 @@
from errno import ENOENT
import pytest
from libarchive import ArchiveError, ffi, memory_writer
def test_add_files_nonexistent():
with memory_writer(bytes(bytearray(4096)), 'zip') as archive:
with pytest.raises(ArchiveError) as e:
archive.add_files('nonexistent')
assert e.value.msg
assert e.value.errno == ENOENT
assert e.value.retcode == -25
def test_check_int_logs_warnings(monkeypatch):
calls = []
monkeypatch.setattr(ffi.logger, 'warning', lambda *_: calls.append(1))
archive_p = ffi.write_new()
ffi.check_int(ffi.ARCHIVE_WARN, print, [archive_p])
assert calls == [1]
def test_check_null():
with pytest.raises(ArchiveError) as e:
ffi.check_null(None, print, [])
assert str(e)
def test_error_string_decoding(monkeypatch):
monkeypatch.setattr(ffi, 'error_string', lambda *_: None)
r = ffi._error_string(None)
assert r is None
monkeypatch.setattr(ffi, 'error_string', lambda *_: b'a')
r = ffi._error_string(None)
assert isinstance(r, type(''))
monkeypatch.setattr(ffi, 'error_string', lambda *_: '\xe9'.encode('utf8'))
r = ffi._error_string(None)
assert isinstance(r, bytes)

View File

@ -0,0 +1,183 @@
"""Test reading, writing and extracting archives."""
import io
import json
import libarchive
from libarchive.entry import format_time
from libarchive.extract import EXTRACT_OWNER, EXTRACT_PERM, EXTRACT_TIME
from libarchive.write import memory_writer
from unittest.mock import patch
import pytest
from . import check_archive, in_dir, treestat
def test_buffers(tmpdir):
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
buf = bytes(bytearray(1000000))
with libarchive.memory_writer(buf, 'gnutar', 'xz') as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
with libarchive.memory_reader(buf) as archive:
check_archive(archive, tree)
assert archive.format_name == b'GNU tar format'
assert archive.filter_names == [b'xz']
# Extract the archive in tmpdir and check that the data is intact
with in_dir(tmpdir.strpath):
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
libarchive.extract_memory(buf, flags)
tree2 = treestat('libarchive')
assert tree2 == tree
def test_fd(tmpdir):
archive_file = open(tmpdir.strpath+'/test.tar.bz2', 'w+b')
fd = archive_file.fileno()
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
with libarchive.fd_writer(fd, 'gnutar', 'bzip2') as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
archive_file.seek(0)
with libarchive.fd_reader(fd) as archive:
check_archive(archive, tree)
assert archive.format_name == b'GNU tar format'
assert archive.filter_names == [b'bzip2']
# Extract the archive in tmpdir and check that the data is intact
archive_file.seek(0)
with in_dir(tmpdir.strpath):
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
libarchive.extract_fd(fd, flags)
tree2 = treestat('libarchive')
assert tree2 == tree
def test_files(tmpdir):
archive_path = tmpdir.strpath+'/test.tar.gz'
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
with libarchive.file_writer(archive_path, 'ustar', 'gzip') as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
with libarchive.file_reader(archive_path) as archive:
check_archive(archive, tree)
assert archive.format_name == b'POSIX ustar format'
assert archive.filter_names == [b'gzip']
# Extract the archive in tmpdir and check that the data is intact
with in_dir(tmpdir.strpath):
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
libarchive.extract_file(archive_path, flags)
tree2 = treestat('libarchive')
assert tree2 == tree
def test_custom_writer_and_stream_reader():
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
stream = io.BytesIO()
with libarchive.custom_writer(stream.write, 'zip') as archive:
archive.add_files('libarchive/')
stream.seek(0)
# Read the archive and check that the data is correct
with libarchive.stream_reader(stream, 'zip') as archive:
check_archive(archive, tree)
assert archive.format_name == b'ZIP 2.0 (deflation)'
assert archive.filter_names == []
@patch('libarchive.ffi.write_fail')
def test_write_fail(write_fail_mock):
buf = bytes(bytearray(1000000))
try:
with memory_writer(buf, 'gnutar', 'xz') as archive:
archive.add_files('libarchive/')
raise TypeError
except TypeError:
pass
assert write_fail_mock.called
@patch('libarchive.ffi.write_fail')
def test_write_not_fail(write_fail_mock):
buf = bytes(bytearray(1000000))
with memory_writer(buf, 'gnutar', 'xz') as archive:
archive.add_files('libarchive/')
assert not write_fail_mock.called
def test_adding_nonexistent_file_to_archive():
stream = io.BytesIO()
with libarchive.custom_writer(stream.write, 'zip') as archive:
with pytest.raises(libarchive.ArchiveError):
archive.add_files('nonexistent')
archive.add_files('libarchive/')
@pytest.mark.parametrize(
'archfmt,data_bytes',
[('zip', b'content'),
('gnutar', b''),
('pax', json.dumps({'a': 1, 'b': 2, 'c': 3}).encode()),
('7zip', b'lorem\0ipsum')])
def test_adding_entry_from_memory(archfmt, data_bytes):
entry_path = 'testfile.data'
entry_data = data_bytes
entry_size = len(data_bytes)
blocks = []
archfmt = 'zip'
has_birthtime = archfmt != 'zip'
atime = (1482144741, 495628118)
mtime = (1482155417, 659017086)
ctime = (1482145211, 536858081)
btime = (1482144740, 495628118) if has_birthtime else None
def write_callback(data):
blocks.append(data[:])
return len(data)
with libarchive.custom_writer(write_callback, archfmt) as archive:
archive.add_file_from_memory(
entry_path, entry_size, entry_data,
atime=atime, mtime=mtime, ctime=ctime, birthtime=btime,
uid=1000, gid=1000,
)
buf = b''.join(blocks)
with libarchive.memory_reader(buf) as memory_archive:
for archive_entry in memory_archive:
expected = entry_data
actual = b''.join(archive_entry.get_blocks())
assert expected == actual
assert archive_entry.path == entry_path
assert archive_entry.atime in (atime[0], format_time(*atime))
assert archive_entry.mtime in (mtime[0], format_time(*mtime))
assert archive_entry.ctime in (ctime[0], format_time(*ctime))
if has_birthtime:
assert archive_entry.birthtime in (
btime[0], format_time(*btime)
)
assert archive_entry.uid == 1000
assert archive_entry.gid == 1000

View File

@ -0,0 +1,36 @@
"""Test security-related extraction flags."""
import pytest
import os
from libarchive import extract_file, file_reader
from libarchive.extract import (
EXTRACT_SECURE_NOABSOLUTEPATHS, EXTRACT_SECURE_NODOTDOT,
)
from libarchive.exception import ArchiveError
from . import data_dir
def run_test(flags):
archive_path = os.path.join(data_dir, 'flags.tar')
try:
extract_file(archive_path, 0)
with pytest.raises(ArchiveError):
extract_file(archive_path, flags)
finally:
with file_reader(archive_path) as archive:
for entry in archive:
if os.path.exists(entry.pathname):
os.remove(entry.pathname)
def test_extraction_is_secure_by_default():
run_test(None)
def test_explicit_no_dot_dot():
run_test(EXTRACT_SECURE_NODOTDOT)
def test_explicit_no_absolute_paths():
run_test(EXTRACT_SECURE_NOABSOLUTEPATHS)

View File

@ -0,0 +1,14 @@
[tox]
envlist=py38,py39,py310,py311
skipsdist=True
[testenv]
passenv = LIBARCHIVE
commands=
python -m pytest -Wd -vv --forked --cov libarchive --cov-report term-missing {toxinidir}/tests {posargs}
flake8 {toxinidir}
deps=
flake8
pytest
pytest-cov
pytest-forked

View File

@ -0,0 +1,45 @@
# Source: https://github.com/Changaco/version.py
from os.path import dirname, isdir, join
import re
from subprocess import CalledProcessError, check_output
PREFIX = ''
tag_re = re.compile(r'\btag: %s([0-9][^,]*)\b' % PREFIX)
version_re = re.compile('^Version: (.+)$', re.M)
def get_version():
# Return the version if it has been injected into the file by git-archive
version = tag_re.search('$Format:%D$')
if version:
return version.group(1)
d = dirname(__file__)
if isdir(join(d, '.git')):
# Get the version using "git describe".
cmd = 'git describe --tags --match %s[0-9]* --dirty' % PREFIX
try:
version = check_output(cmd.split()).decode().strip()[len(PREFIX):]
except CalledProcessError:
raise RuntimeError('Unable to get version number from git tags')
# PEP 440 compatibility
if '-' in version:
if version.endswith('-dirty'):
raise RuntimeError('The working tree is dirty')
version = '.post'.join(version.split('-')[:2])
else:
# Extract the version from the PKG-INFO file.
with open(join(d, 'PKG-INFO'), encoding='utf-8', errors='replace') as f:
version = version_re.search(f.read()).group(1)
return version
if __name__ == '__main__':
print(get_version())