451 lines
14 KiB
Python
451 lines
14 KiB
Python
from contextlib import contextmanager
|
|
from ctypes import create_string_buffer
|
|
from enum import IntEnum
|
|
import math
|
|
|
|
from . import ffi
|
|
|
|
|
|
class FileType(IntEnum):
|
|
NAMED_PIPE = AE_IFIFO = 0o010000 # noqa: E221
|
|
CHAR_DEVICE = AE_IFCHR = 0o020000 # noqa: E221
|
|
DIRECTORY = AE_IFDIR = 0o040000 # noqa: E221
|
|
BLOCK_DEVICE = AE_IFBLK = 0o060000 # noqa: E221
|
|
REGULAR_FILE = AE_IFREG = 0o100000 # noqa: E221
|
|
SYMBOLINK_LINK = AE_IFLNK = 0o120000 # noqa: E221
|
|
SOCKET = AE_IFSOCK = 0o140000 # noqa: E221
|
|
|
|
|
|
@contextmanager
|
|
def new_archive_entry():
|
|
entry_p = ffi.entry_new()
|
|
try:
|
|
yield entry_p
|
|
finally:
|
|
ffi.entry_free(entry_p)
|
|
|
|
|
|
def format_time(seconds, nanos):
|
|
""" return float of seconds.nanos when nanos set, or seconds when not """
|
|
if nanos:
|
|
return float(seconds) + float(nanos) / 1000000000.0
|
|
return int(seconds)
|
|
|
|
|
|
class ArchiveEntry:
|
|
|
|
__slots__ = ('_archive_p', '_entry_p', 'header_codec')
|
|
|
|
def __init__(self, archive_p=None, header_codec='utf-8', **attributes):
|
|
"""Allocate memory for an `archive_entry` struct.
|
|
|
|
The `header_codec` is used to decode and encode file paths and other
|
|
attributes.
|
|
|
|
The `**attributes` are passed to the `modify` method.
|
|
"""
|
|
self._archive_p = archive_p
|
|
self._entry_p = ffi.entry_new()
|
|
self.header_codec = header_codec
|
|
if attributes:
|
|
self.modify(**attributes)
|
|
|
|
def __del__(self):
|
|
"""Free the C struct"""
|
|
ffi.entry_free(self._entry_p)
|
|
|
|
def __str__(self):
|
|
"""Returns the file's path"""
|
|
return self.pathname
|
|
|
|
def modify(self, header_codec=None, **attributes):
|
|
"""Convenience method to modify the entry's attributes.
|
|
|
|
Args:
|
|
filetype (int): the file's type, see the `FileType` class for values
|
|
pathname (str): the file's path
|
|
linkpath (str): the other path of the file, if the file is a link
|
|
size (int | None): the file's size, in bytes
|
|
perm (int): the file's permissions in standard Unix format, e.g. 0o640
|
|
uid (int): the file owner's numerical identifier
|
|
gid (int): the file group's numerical identifier
|
|
uname (str | bytes): the file owner's name
|
|
gname (str | bytes): the file group's name
|
|
atime (int | Tuple[int, int] | float | None):
|
|
the file's most recent access time,
|
|
either in seconds or as a tuple (seconds, nanoseconds)
|
|
mtime (int | Tuple[int, int] | float | None):
|
|
the file's most recent modification time,
|
|
either in seconds or as a tuple (seconds, nanoseconds)
|
|
ctime (int | Tuple[int, int] | float | None):
|
|
the file's most recent metadata change time,
|
|
either in seconds or as a tuple (seconds, nanoseconds)
|
|
birthtime (int | Tuple[int, int] | float | None):
|
|
the file's creation time (for archive formats that support it),
|
|
either in seconds or as a tuple (seconds, nanoseconds)
|
|
rdev (int | Tuple[int, int]): device number, if the file is a device
|
|
rdevmajor (int): major part of the device number
|
|
rdevminor (int): minor part of the device number
|
|
"""
|
|
if header_codec:
|
|
self.header_codec = header_codec
|
|
for name, value in attributes.items():
|
|
setattr(self, name, value)
|
|
|
|
@property
|
|
def filetype(self):
|
|
return ffi.entry_filetype(self._entry_p)
|
|
|
|
@filetype.setter
|
|
def filetype(self, value):
|
|
ffi.entry_set_filetype(self._entry_p, value)
|
|
|
|
@property
|
|
def uid(self):
|
|
return ffi.entry_uid(self._entry_p)
|
|
|
|
@uid.setter
|
|
def uid(self, uid):
|
|
ffi.entry_set_uid(self._entry_p, uid)
|
|
|
|
@property
|
|
def gid(self):
|
|
return ffi.entry_gid(self._entry_p)
|
|
|
|
@gid.setter
|
|
def gid(self, gid):
|
|
ffi.entry_set_gid(self._entry_p, gid)
|
|
|
|
@property
|
|
def uname(self):
|
|
uname = ffi.entry_uname_w(self._entry_p)
|
|
if not uname:
|
|
uname = ffi.entry_uname(self._entry_p)
|
|
if uname is not None:
|
|
try:
|
|
uname = uname.decode(self.header_codec)
|
|
except UnicodeError:
|
|
pass
|
|
return uname
|
|
|
|
@uname.setter
|
|
def uname(self, value):
|
|
if not isinstance(value, bytes):
|
|
value = value.encode(self.header_codec)
|
|
if self.header_codec == 'utf-8':
|
|
ffi.entry_update_uname_utf8(self._entry_p, value)
|
|
else:
|
|
ffi.entry_copy_uname(self._entry_p, value)
|
|
|
|
@property
|
|
def gname(self):
|
|
gname = ffi.entry_gname_w(self._entry_p)
|
|
if not gname:
|
|
gname = ffi.entry_gname(self._entry_p)
|
|
if gname is not None:
|
|
try:
|
|
gname = gname.decode(self.header_codec)
|
|
except UnicodeError:
|
|
pass
|
|
return gname
|
|
|
|
@gname.setter
|
|
def gname(self, value):
|
|
if not isinstance(value, bytes):
|
|
value = value.encode(self.header_codec)
|
|
if self.header_codec == 'utf-8':
|
|
ffi.entry_update_gname_utf8(self._entry_p, value)
|
|
else:
|
|
ffi.entry_copy_gname(self._entry_p, value)
|
|
|
|
def get_blocks(self, block_size=ffi.page_size):
|
|
"""Read the file's content, keeping only one chunk in memory at a time.
|
|
|
|
Don't do anything like `list(entry.get_blocks())`, it would silently fail.
|
|
|
|
Args:
|
|
block_size (int): the buffer's size, in bytes
|
|
"""
|
|
archive_p = self._archive_p
|
|
if not archive_p:
|
|
raise TypeError("this entry isn't linked to any content")
|
|
buf = create_string_buffer(block_size)
|
|
read = ffi.read_data
|
|
while 1:
|
|
r = read(archive_p, buf, block_size)
|
|
if r == 0:
|
|
break
|
|
yield buf.raw[0:r]
|
|
self.__class__ = ConsumedArchiveEntry
|
|
|
|
@property
|
|
def isblk(self):
|
|
return self.filetype & 0o170000 == 0o060000
|
|
|
|
@property
|
|
def ischr(self):
|
|
return self.filetype & 0o170000 == 0o020000
|
|
|
|
@property
|
|
def isdir(self):
|
|
return self.filetype & 0o170000 == 0o040000
|
|
|
|
@property
|
|
def isfifo(self):
|
|
return self.filetype & 0o170000 == 0o010000
|
|
|
|
@property
|
|
def islnk(self):
|
|
return bool(ffi.entry_hardlink_w(self._entry_p) or
|
|
ffi.entry_hardlink(self._entry_p))
|
|
|
|
@property
|
|
def issym(self):
|
|
return self.filetype & 0o170000 == 0o120000
|
|
|
|
@property
|
|
def isreg(self):
|
|
return self.filetype & 0o170000 == 0o100000
|
|
|
|
@property
|
|
def isfile(self):
|
|
return self.isreg
|
|
|
|
@property
|
|
def issock(self):
|
|
return self.filetype & 0o170000 == 0o140000
|
|
|
|
@property
|
|
def isdev(self):
|
|
return self.ischr or self.isblk or self.isfifo or self.issock
|
|
|
|
@property
|
|
def atime(self):
|
|
if not ffi.entry_atime_is_set(self._entry_p):
|
|
return None
|
|
sec_val = ffi.entry_atime(self._entry_p)
|
|
nsec_val = ffi.entry_atime_nsec(self._entry_p)
|
|
return format_time(sec_val, nsec_val)
|
|
|
|
@atime.setter
|
|
def atime(self, value):
|
|
if value is None:
|
|
ffi.entry_unset_atime(self._entry_p)
|
|
elif isinstance(value, int):
|
|
self.set_atime(value)
|
|
elif isinstance(value, tuple):
|
|
self.set_atime(*value)
|
|
else:
|
|
seconds, fraction = math.modf(value)
|
|
self.set_atime(int(seconds), int(fraction * 1_000_000_000))
|
|
|
|
def set_atime(self, timestamp_sec, timestamp_nsec=0):
|
|
"Kept for backward compatibility. `entry.atime = ...` is supported now."
|
|
return ffi.entry_set_atime(self._entry_p, timestamp_sec, timestamp_nsec)
|
|
|
|
@property
|
|
def mtime(self):
|
|
if not ffi.entry_mtime_is_set(self._entry_p):
|
|
return None
|
|
sec_val = ffi.entry_mtime(self._entry_p)
|
|
nsec_val = ffi.entry_mtime_nsec(self._entry_p)
|
|
return format_time(sec_val, nsec_val)
|
|
|
|
@mtime.setter
|
|
def mtime(self, value):
|
|
if value is None:
|
|
ffi.entry_unset_mtime(self._entry_p)
|
|
elif isinstance(value, int):
|
|
self.set_mtime(value)
|
|
elif isinstance(value, tuple):
|
|
self.set_mtime(*value)
|
|
else:
|
|
seconds, fraction = math.modf(value)
|
|
self.set_mtime(int(seconds), int(fraction * 1_000_000_000))
|
|
|
|
def set_mtime(self, timestamp_sec, timestamp_nsec=0):
|
|
"Kept for backward compatibility. `entry.mtime = ...` is supported now."
|
|
return ffi.entry_set_mtime(self._entry_p, timestamp_sec, timestamp_nsec)
|
|
|
|
@property
|
|
def ctime(self):
|
|
if not ffi.entry_ctime_is_set(self._entry_p):
|
|
return None
|
|
sec_val = ffi.entry_ctime(self._entry_p)
|
|
nsec_val = ffi.entry_ctime_nsec(self._entry_p)
|
|
return format_time(sec_val, nsec_val)
|
|
|
|
@ctime.setter
|
|
def ctime(self, value):
|
|
if value is None:
|
|
ffi.entry_unset_ctime(self._entry_p)
|
|
elif isinstance(value, int):
|
|
self.set_ctime(value)
|
|
elif isinstance(value, tuple):
|
|
self.set_ctime(*value)
|
|
else:
|
|
seconds, fraction = math.modf(value)
|
|
self.set_ctime(int(seconds), int(fraction * 1_000_000_000))
|
|
|
|
def set_ctime(self, timestamp_sec, timestamp_nsec=0):
|
|
"Kept for backward compatibility. `entry.ctime = ...` is supported now."
|
|
return ffi.entry_set_ctime(self._entry_p, timestamp_sec, timestamp_nsec)
|
|
|
|
@property
|
|
def birthtime(self):
|
|
if not ffi.entry_birthtime_is_set(self._entry_p):
|
|
return None
|
|
sec_val = ffi.entry_birthtime(self._entry_p)
|
|
nsec_val = ffi.entry_birthtime_nsec(self._entry_p)
|
|
return format_time(sec_val, nsec_val)
|
|
|
|
@birthtime.setter
|
|
def birthtime(self, value):
|
|
if value is None:
|
|
ffi.entry_unset_birthtime(self._entry_p)
|
|
elif isinstance(value, int):
|
|
self.set_birthtime(value)
|
|
elif isinstance(value, tuple):
|
|
self.set_birthtime(*value)
|
|
else:
|
|
seconds, fraction = math.modf(value)
|
|
self.set_birthtime(int(seconds), int(fraction * 1_000_000_000))
|
|
|
|
def set_birthtime(self, timestamp_sec, timestamp_nsec=0):
|
|
"Kept for backward compatibility. `entry.birthtime = ...` is supported now."
|
|
return ffi.entry_set_birthtime(
|
|
self._entry_p, timestamp_sec, timestamp_nsec
|
|
)
|
|
|
|
@property
|
|
def pathname(self):
|
|
path = ffi.entry_pathname_w(self._entry_p)
|
|
if not path:
|
|
path = ffi.entry_pathname(self._entry_p)
|
|
if path is not None:
|
|
try:
|
|
path = path.decode(self.header_codec)
|
|
except UnicodeError:
|
|
pass
|
|
return path
|
|
|
|
@pathname.setter
|
|
def pathname(self, value):
|
|
if not isinstance(value, bytes):
|
|
value = value.encode(self.header_codec)
|
|
if self.header_codec == 'utf-8':
|
|
ffi.entry_update_pathname_utf8(self._entry_p, value)
|
|
else:
|
|
ffi.entry_copy_pathname(self._entry_p, value)
|
|
|
|
@property
|
|
def linkpath(self):
|
|
path = (
|
|
(
|
|
ffi.entry_symlink_w(self._entry_p) or
|
|
ffi.entry_symlink(self._entry_p)
|
|
) if self.issym else (
|
|
ffi.entry_hardlink_w(self._entry_p) or
|
|
ffi.entry_hardlink(self._entry_p)
|
|
)
|
|
)
|
|
if isinstance(path, bytes):
|
|
try:
|
|
path = path.decode(self.header_codec)
|
|
except UnicodeError:
|
|
pass
|
|
return path
|
|
|
|
@linkpath.setter
|
|
def linkpath(self, value):
|
|
if not isinstance(value, bytes):
|
|
value = value.encode(self.header_codec)
|
|
if self.header_codec == 'utf-8':
|
|
ffi.entry_update_link_utf8(self._entry_p, value)
|
|
else:
|
|
ffi.entry_copy_link(self._entry_p, value)
|
|
|
|
# aliases for compatibility with the standard `tarfile` module
|
|
path = property(pathname.fget, pathname.fset, doc="alias of pathname")
|
|
name = path
|
|
linkname = property(linkpath.fget, linkpath.fset, doc="alias of linkpath")
|
|
|
|
@property
|
|
def size(self):
|
|
if ffi.entry_size_is_set(self._entry_p):
|
|
return ffi.entry_size(self._entry_p)
|
|
|
|
@size.setter
|
|
def size(self, value):
|
|
if value is None:
|
|
ffi.entry_unset_size(self._entry_p)
|
|
else:
|
|
ffi.entry_set_size(self._entry_p, value)
|
|
|
|
@property
|
|
def mode(self):
|
|
return ffi.entry_mode(self._entry_p)
|
|
|
|
@mode.setter
|
|
def mode(self, value):
|
|
ffi.entry_set_mode(self._entry_p, value)
|
|
|
|
@property
|
|
def strmode(self):
|
|
"""The file's mode as a string, e.g. '?rwxrwx---'"""
|
|
# note we strip the mode because archive_entry_strmode
|
|
# returns a trailing space: strcpy(bp, "?rwxrwxrwx ");
|
|
return ffi.entry_strmode(self._entry_p).strip()
|
|
|
|
@property
|
|
def perm(self):
|
|
return ffi.entry_perm(self._entry_p)
|
|
|
|
@perm.setter
|
|
def perm(self, value):
|
|
ffi.entry_set_perm(self._entry_p, value)
|
|
|
|
@property
|
|
def rdev(self):
|
|
return ffi.entry_rdev(self._entry_p)
|
|
|
|
@rdev.setter
|
|
def rdev(self, value):
|
|
if isinstance(value, tuple):
|
|
ffi.entry_set_rdevmajor(self._entry_p, value[0])
|
|
ffi.entry_set_rdevminor(self._entry_p, value[1])
|
|
else:
|
|
ffi.entry_set_rdev(self._entry_p, value)
|
|
|
|
@property
|
|
def rdevmajor(self):
|
|
return ffi.entry_rdevmajor(self._entry_p)
|
|
|
|
@rdevmajor.setter
|
|
def rdevmajor(self, value):
|
|
ffi.entry_set_rdevmajor(self._entry_p, value)
|
|
|
|
@property
|
|
def rdevminor(self):
|
|
return ffi.entry_rdevminor(self._entry_p)
|
|
|
|
@rdevminor.setter
|
|
def rdevminor(self, value):
|
|
ffi.entry_set_rdevminor(self._entry_p, value)
|
|
|
|
|
|
class ConsumedArchiveEntry(ArchiveEntry):
|
|
|
|
__slots__ = ()
|
|
|
|
def get_blocks(self, **kw):
|
|
raise TypeError("the content of this entry has already been read")
|
|
|
|
|
|
class PassedArchiveEntry(ArchiveEntry):
|
|
|
|
__slots__ = ()
|
|
|
|
def get_blocks(self, **kw):
|
|
raise TypeError("this entry is passed, it's too late to read its content")
|