oggit/packages/libarchive-c/opengnsys-libarchive-c-5.1/libarchive/entry.py

451 lines
14 KiB
Python

from contextlib import contextmanager
from ctypes import create_string_buffer
from enum import IntEnum
import math
from . import ffi
class FileType(IntEnum):
NAMED_PIPE = AE_IFIFO = 0o010000 # noqa: E221
CHAR_DEVICE = AE_IFCHR = 0o020000 # noqa: E221
DIRECTORY = AE_IFDIR = 0o040000 # noqa: E221
BLOCK_DEVICE = AE_IFBLK = 0o060000 # noqa: E221
REGULAR_FILE = AE_IFREG = 0o100000 # noqa: E221
SYMBOLINK_LINK = AE_IFLNK = 0o120000 # noqa: E221
SOCKET = AE_IFSOCK = 0o140000 # noqa: E221
@contextmanager
def new_archive_entry():
entry_p = ffi.entry_new()
try:
yield entry_p
finally:
ffi.entry_free(entry_p)
def format_time(seconds, nanos):
""" return float of seconds.nanos when nanos set, or seconds when not """
if nanos:
return float(seconds) + float(nanos) / 1000000000.0
return int(seconds)
class ArchiveEntry:
__slots__ = ('_archive_p', '_entry_p', 'header_codec')
def __init__(self, archive_p=None, header_codec='utf-8', **attributes):
"""Allocate memory for an `archive_entry` struct.
The `header_codec` is used to decode and encode file paths and other
attributes.
The `**attributes` are passed to the `modify` method.
"""
self._archive_p = archive_p
self._entry_p = ffi.entry_new()
self.header_codec = header_codec
if attributes:
self.modify(**attributes)
def __del__(self):
"""Free the C struct"""
ffi.entry_free(self._entry_p)
def __str__(self):
"""Returns the file's path"""
return self.pathname
def modify(self, header_codec=None, **attributes):
"""Convenience method to modify the entry's attributes.
Args:
filetype (int): the file's type, see the `FileType` class for values
pathname (str): the file's path
linkpath (str): the other path of the file, if the file is a link
size (int | None): the file's size, in bytes
perm (int): the file's permissions in standard Unix format, e.g. 0o640
uid (int): the file owner's numerical identifier
gid (int): the file group's numerical identifier
uname (str | bytes): the file owner's name
gname (str | bytes): the file group's name
atime (int | Tuple[int, int] | float | None):
the file's most recent access time,
either in seconds or as a tuple (seconds, nanoseconds)
mtime (int | Tuple[int, int] | float | None):
the file's most recent modification time,
either in seconds or as a tuple (seconds, nanoseconds)
ctime (int | Tuple[int, int] | float | None):
the file's most recent metadata change time,
either in seconds or as a tuple (seconds, nanoseconds)
birthtime (int | Tuple[int, int] | float | None):
the file's creation time (for archive formats that support it),
either in seconds or as a tuple (seconds, nanoseconds)
rdev (int | Tuple[int, int]): device number, if the file is a device
rdevmajor (int): major part of the device number
rdevminor (int): minor part of the device number
"""
if header_codec:
self.header_codec = header_codec
for name, value in attributes.items():
setattr(self, name, value)
@property
def filetype(self):
return ffi.entry_filetype(self._entry_p)
@filetype.setter
def filetype(self, value):
ffi.entry_set_filetype(self._entry_p, value)
@property
def uid(self):
return ffi.entry_uid(self._entry_p)
@uid.setter
def uid(self, uid):
ffi.entry_set_uid(self._entry_p, uid)
@property
def gid(self):
return ffi.entry_gid(self._entry_p)
@gid.setter
def gid(self, gid):
ffi.entry_set_gid(self._entry_p, gid)
@property
def uname(self):
uname = ffi.entry_uname_w(self._entry_p)
if not uname:
uname = ffi.entry_uname(self._entry_p)
if uname is not None:
try:
uname = uname.decode(self.header_codec)
except UnicodeError:
pass
return uname
@uname.setter
def uname(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_uname_utf8(self._entry_p, value)
else:
ffi.entry_copy_uname(self._entry_p, value)
@property
def gname(self):
gname = ffi.entry_gname_w(self._entry_p)
if not gname:
gname = ffi.entry_gname(self._entry_p)
if gname is not None:
try:
gname = gname.decode(self.header_codec)
except UnicodeError:
pass
return gname
@gname.setter
def gname(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_gname_utf8(self._entry_p, value)
else:
ffi.entry_copy_gname(self._entry_p, value)
def get_blocks(self, block_size=ffi.page_size):
"""Read the file's content, keeping only one chunk in memory at a time.
Don't do anything like `list(entry.get_blocks())`, it would silently fail.
Args:
block_size (int): the buffer's size, in bytes
"""
archive_p = self._archive_p
if not archive_p:
raise TypeError("this entry isn't linked to any content")
buf = create_string_buffer(block_size)
read = ffi.read_data
while 1:
r = read(archive_p, buf, block_size)
if r == 0:
break
yield buf.raw[0:r]
self.__class__ = ConsumedArchiveEntry
@property
def isblk(self):
return self.filetype & 0o170000 == 0o060000
@property
def ischr(self):
return self.filetype & 0o170000 == 0o020000
@property
def isdir(self):
return self.filetype & 0o170000 == 0o040000
@property
def isfifo(self):
return self.filetype & 0o170000 == 0o010000
@property
def islnk(self):
return bool(ffi.entry_hardlink_w(self._entry_p) or
ffi.entry_hardlink(self._entry_p))
@property
def issym(self):
return self.filetype & 0o170000 == 0o120000
@property
def isreg(self):
return self.filetype & 0o170000 == 0o100000
@property
def isfile(self):
return self.isreg
@property
def issock(self):
return self.filetype & 0o170000 == 0o140000
@property
def isdev(self):
return self.ischr or self.isblk or self.isfifo or self.issock
@property
def atime(self):
if not ffi.entry_atime_is_set(self._entry_p):
return None
sec_val = ffi.entry_atime(self._entry_p)
nsec_val = ffi.entry_atime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@atime.setter
def atime(self, value):
if value is None:
ffi.entry_unset_atime(self._entry_p)
elif isinstance(value, int):
self.set_atime(value)
elif isinstance(value, tuple):
self.set_atime(*value)
else:
seconds, fraction = math.modf(value)
self.set_atime(int(seconds), int(fraction * 1_000_000_000))
def set_atime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.atime = ...` is supported now."
return ffi.entry_set_atime(self._entry_p, timestamp_sec, timestamp_nsec)
@property
def mtime(self):
if not ffi.entry_mtime_is_set(self._entry_p):
return None
sec_val = ffi.entry_mtime(self._entry_p)
nsec_val = ffi.entry_mtime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@mtime.setter
def mtime(self, value):
if value is None:
ffi.entry_unset_mtime(self._entry_p)
elif isinstance(value, int):
self.set_mtime(value)
elif isinstance(value, tuple):
self.set_mtime(*value)
else:
seconds, fraction = math.modf(value)
self.set_mtime(int(seconds), int(fraction * 1_000_000_000))
def set_mtime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.mtime = ...` is supported now."
return ffi.entry_set_mtime(self._entry_p, timestamp_sec, timestamp_nsec)
@property
def ctime(self):
if not ffi.entry_ctime_is_set(self._entry_p):
return None
sec_val = ffi.entry_ctime(self._entry_p)
nsec_val = ffi.entry_ctime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@ctime.setter
def ctime(self, value):
if value is None:
ffi.entry_unset_ctime(self._entry_p)
elif isinstance(value, int):
self.set_ctime(value)
elif isinstance(value, tuple):
self.set_ctime(*value)
else:
seconds, fraction = math.modf(value)
self.set_ctime(int(seconds), int(fraction * 1_000_000_000))
def set_ctime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.ctime = ...` is supported now."
return ffi.entry_set_ctime(self._entry_p, timestamp_sec, timestamp_nsec)
@property
def birthtime(self):
if not ffi.entry_birthtime_is_set(self._entry_p):
return None
sec_val = ffi.entry_birthtime(self._entry_p)
nsec_val = ffi.entry_birthtime_nsec(self._entry_p)
return format_time(sec_val, nsec_val)
@birthtime.setter
def birthtime(self, value):
if value is None:
ffi.entry_unset_birthtime(self._entry_p)
elif isinstance(value, int):
self.set_birthtime(value)
elif isinstance(value, tuple):
self.set_birthtime(*value)
else:
seconds, fraction = math.modf(value)
self.set_birthtime(int(seconds), int(fraction * 1_000_000_000))
def set_birthtime(self, timestamp_sec, timestamp_nsec=0):
"Kept for backward compatibility. `entry.birthtime = ...` is supported now."
return ffi.entry_set_birthtime(
self._entry_p, timestamp_sec, timestamp_nsec
)
@property
def pathname(self):
path = ffi.entry_pathname_w(self._entry_p)
if not path:
path = ffi.entry_pathname(self._entry_p)
if path is not None:
try:
path = path.decode(self.header_codec)
except UnicodeError:
pass
return path
@pathname.setter
def pathname(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_pathname_utf8(self._entry_p, value)
else:
ffi.entry_copy_pathname(self._entry_p, value)
@property
def linkpath(self):
path = (
(
ffi.entry_symlink_w(self._entry_p) or
ffi.entry_symlink(self._entry_p)
) if self.issym else (
ffi.entry_hardlink_w(self._entry_p) or
ffi.entry_hardlink(self._entry_p)
)
)
if isinstance(path, bytes):
try:
path = path.decode(self.header_codec)
except UnicodeError:
pass
return path
@linkpath.setter
def linkpath(self, value):
if not isinstance(value, bytes):
value = value.encode(self.header_codec)
if self.header_codec == 'utf-8':
ffi.entry_update_link_utf8(self._entry_p, value)
else:
ffi.entry_copy_link(self._entry_p, value)
# aliases for compatibility with the standard `tarfile` module
path = property(pathname.fget, pathname.fset, doc="alias of pathname")
name = path
linkname = property(linkpath.fget, linkpath.fset, doc="alias of linkpath")
@property
def size(self):
if ffi.entry_size_is_set(self._entry_p):
return ffi.entry_size(self._entry_p)
@size.setter
def size(self, value):
if value is None:
ffi.entry_unset_size(self._entry_p)
else:
ffi.entry_set_size(self._entry_p, value)
@property
def mode(self):
return ffi.entry_mode(self._entry_p)
@mode.setter
def mode(self, value):
ffi.entry_set_mode(self._entry_p, value)
@property
def strmode(self):
"""The file's mode as a string, e.g. '?rwxrwx---'"""
# note we strip the mode because archive_entry_strmode
# returns a trailing space: strcpy(bp, "?rwxrwxrwx ");
return ffi.entry_strmode(self._entry_p).strip()
@property
def perm(self):
return ffi.entry_perm(self._entry_p)
@perm.setter
def perm(self, value):
ffi.entry_set_perm(self._entry_p, value)
@property
def rdev(self):
return ffi.entry_rdev(self._entry_p)
@rdev.setter
def rdev(self, value):
if isinstance(value, tuple):
ffi.entry_set_rdevmajor(self._entry_p, value[0])
ffi.entry_set_rdevminor(self._entry_p, value[1])
else:
ffi.entry_set_rdev(self._entry_p, value)
@property
def rdevmajor(self):
return ffi.entry_rdevmajor(self._entry_p)
@rdevmajor.setter
def rdevmajor(self, value):
ffi.entry_set_rdevmajor(self._entry_p, value)
@property
def rdevminor(self):
return ffi.entry_rdevminor(self._entry_p)
@rdevminor.setter
def rdevminor(self, value):
ffi.entry_set_rdevminor(self._entry_p, value)
class ConsumedArchiveEntry(ArchiveEntry):
__slots__ = ()
def get_blocks(self, **kw):
raise TypeError("the content of this entry has already been read")
class PassedArchiveEntry(ArchiveEntry):
__slots__ = ()
def get_blocks(self, **kw):
raise TypeError("this entry is passed, it's too late to read its content")