184 lines
6.1 KiB
Python
184 lines
6.1 KiB
Python
"""Test reading, writing and extracting archives."""
|
|
|
|
import io
|
|
import json
|
|
|
|
import libarchive
|
|
from libarchive.entry import format_time
|
|
from libarchive.extract import EXTRACT_OWNER, EXTRACT_PERM, EXTRACT_TIME
|
|
from libarchive.write import memory_writer
|
|
from unittest.mock import patch
|
|
import pytest
|
|
|
|
from . import check_archive, in_dir, treestat
|
|
|
|
|
|
def test_buffers(tmpdir):
|
|
|
|
# Collect information on what should be in the archive
|
|
tree = treestat('libarchive')
|
|
|
|
# Create an archive of our libarchive/ directory
|
|
buf = bytes(bytearray(1000000))
|
|
with libarchive.memory_writer(buf, 'gnutar', 'xz') as archive:
|
|
archive.add_files('libarchive/')
|
|
|
|
# Read the archive and check that the data is correct
|
|
with libarchive.memory_reader(buf) as archive:
|
|
check_archive(archive, tree)
|
|
assert archive.format_name == b'GNU tar format'
|
|
assert archive.filter_names == [b'xz']
|
|
|
|
# Extract the archive in tmpdir and check that the data is intact
|
|
with in_dir(tmpdir.strpath):
|
|
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
|
|
libarchive.extract_memory(buf, flags)
|
|
tree2 = treestat('libarchive')
|
|
assert tree2 == tree
|
|
|
|
|
|
def test_fd(tmpdir):
|
|
archive_file = open(tmpdir.strpath+'/test.tar.bz2', 'w+b')
|
|
fd = archive_file.fileno()
|
|
|
|
# Collect information on what should be in the archive
|
|
tree = treestat('libarchive')
|
|
|
|
# Create an archive of our libarchive/ directory
|
|
with libarchive.fd_writer(fd, 'gnutar', 'bzip2') as archive:
|
|
archive.add_files('libarchive/')
|
|
|
|
# Read the archive and check that the data is correct
|
|
archive_file.seek(0)
|
|
with libarchive.fd_reader(fd) as archive:
|
|
check_archive(archive, tree)
|
|
assert archive.format_name == b'GNU tar format'
|
|
assert archive.filter_names == [b'bzip2']
|
|
|
|
# Extract the archive in tmpdir and check that the data is intact
|
|
archive_file.seek(0)
|
|
with in_dir(tmpdir.strpath):
|
|
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
|
|
libarchive.extract_fd(fd, flags)
|
|
tree2 = treestat('libarchive')
|
|
assert tree2 == tree
|
|
|
|
|
|
def test_files(tmpdir):
|
|
archive_path = tmpdir.strpath+'/test.tar.gz'
|
|
|
|
# Collect information on what should be in the archive
|
|
tree = treestat('libarchive')
|
|
|
|
# Create an archive of our libarchive/ directory
|
|
with libarchive.file_writer(archive_path, 'ustar', 'gzip') as archive:
|
|
archive.add_files('libarchive/')
|
|
|
|
# Read the archive and check that the data is correct
|
|
with libarchive.file_reader(archive_path) as archive:
|
|
check_archive(archive, tree)
|
|
assert archive.format_name == b'POSIX ustar format'
|
|
assert archive.filter_names == [b'gzip']
|
|
|
|
# Extract the archive in tmpdir and check that the data is intact
|
|
with in_dir(tmpdir.strpath):
|
|
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
|
|
libarchive.extract_file(archive_path, flags)
|
|
tree2 = treestat('libarchive')
|
|
assert tree2 == tree
|
|
|
|
|
|
def test_custom_writer_and_stream_reader():
|
|
# Collect information on what should be in the archive
|
|
tree = treestat('libarchive')
|
|
|
|
# Create an archive of our libarchive/ directory
|
|
stream = io.BytesIO()
|
|
with libarchive.custom_writer(stream.write, 'zip') as archive:
|
|
archive.add_files('libarchive/')
|
|
stream.seek(0)
|
|
|
|
# Read the archive and check that the data is correct
|
|
with libarchive.stream_reader(stream, 'zip') as archive:
|
|
check_archive(archive, tree)
|
|
assert archive.format_name == b'ZIP 2.0 (deflation)'
|
|
assert archive.filter_names == []
|
|
|
|
|
|
@patch('libarchive.ffi.write_fail')
|
|
def test_write_fail(write_fail_mock):
|
|
buf = bytes(bytearray(1000000))
|
|
try:
|
|
with memory_writer(buf, 'gnutar', 'xz') as archive:
|
|
archive.add_files('libarchive/')
|
|
raise TypeError
|
|
except TypeError:
|
|
pass
|
|
assert write_fail_mock.called
|
|
|
|
|
|
@patch('libarchive.ffi.write_fail')
|
|
def test_write_not_fail(write_fail_mock):
|
|
buf = bytes(bytearray(1000000))
|
|
with memory_writer(buf, 'gnutar', 'xz') as archive:
|
|
archive.add_files('libarchive/')
|
|
assert not write_fail_mock.called
|
|
|
|
|
|
def test_adding_nonexistent_file_to_archive():
|
|
stream = io.BytesIO()
|
|
with libarchive.custom_writer(stream.write, 'zip') as archive:
|
|
with pytest.raises(libarchive.ArchiveError):
|
|
archive.add_files('nonexistent')
|
|
archive.add_files('libarchive/')
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'archfmt,data_bytes',
|
|
[('zip', b'content'),
|
|
('gnutar', b''),
|
|
('pax', json.dumps({'a': 1, 'b': 2, 'c': 3}).encode()),
|
|
('7zip', b'lorem\0ipsum')])
|
|
def test_adding_entry_from_memory(archfmt, data_bytes):
|
|
entry_path = 'testfile.data'
|
|
entry_data = data_bytes
|
|
entry_size = len(data_bytes)
|
|
|
|
blocks = []
|
|
|
|
archfmt = 'zip'
|
|
has_birthtime = archfmt != 'zip'
|
|
|
|
atime = (1482144741, 495628118)
|
|
mtime = (1482155417, 659017086)
|
|
ctime = (1482145211, 536858081)
|
|
btime = (1482144740, 495628118) if has_birthtime else None
|
|
|
|
def write_callback(data):
|
|
blocks.append(data[:])
|
|
return len(data)
|
|
|
|
with libarchive.custom_writer(write_callback, archfmt) as archive:
|
|
archive.add_file_from_memory(
|
|
entry_path, entry_size, entry_data,
|
|
atime=atime, mtime=mtime, ctime=ctime, birthtime=btime,
|
|
uid=1000, gid=1000,
|
|
)
|
|
|
|
buf = b''.join(blocks)
|
|
with libarchive.memory_reader(buf) as memory_archive:
|
|
for archive_entry in memory_archive:
|
|
expected = entry_data
|
|
actual = b''.join(archive_entry.get_blocks())
|
|
assert expected == actual
|
|
assert archive_entry.path == entry_path
|
|
assert archive_entry.atime in (atime[0], format_time(*atime))
|
|
assert archive_entry.mtime in (mtime[0], format_time(*mtime))
|
|
assert archive_entry.ctime in (ctime[0], format_time(*ctime))
|
|
if has_birthtime:
|
|
assert archive_entry.birthtime in (
|
|
btime[0], format_time(*btime)
|
|
)
|
|
assert archive_entry.uid == 1000
|
|
assert archive_entry.gid == 1000
|