Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add symbolic link options #482

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
5 changes: 0 additions & 5 deletions py7zr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from lzma import CHECK_CRC64, CHECK_SHA256, is_check_supported
from typing import Any, Optional

import _lzma # type: ignore
import multivolumefile
import texttable # type: ignore

Expand Down Expand Up @@ -347,8 +346,6 @@ def run_extract(self, args: argparse.Namespace) -> int:
else:
print("The archive is corrupted, or password is wrong. ABORT.")
return 1
except _lzma.LZMAError:
return 1

cb = None # Optional[ExtractCallback]
if verbose:
Expand All @@ -374,8 +371,6 @@ def run_extract(self, args: argparse.Namespace) -> int:
else:
print("The archive is corrupted, or password is wrong. ABORT.")
return 1
except _lzma.LZMAError:
return 1
else:
return 0

Expand Down
10 changes: 5 additions & 5 deletions py7zr/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# p7zr library
#
# Copyright (c) 2019-2023 Hiroshi Miura <[email protected]>
# Copyright (c) 2019-2025 Hiroshi Miura <[email protected]>
# Copyright (c) 2004-2015 by Joachim Bauch, [email protected]
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
Expand All @@ -28,7 +28,7 @@
import zlib
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Optional, Union
from typing import Any, BinaryIO, Optional, Tuple, Union

import bcj
import inflate64
Expand Down Expand Up @@ -890,10 +890,10 @@ def _set_alternate_compressors_coders(self, alt_filter, password=None):
},
)

def compress(self, fd, fp, crc=0):
def compress(self, fd: BinaryIO, fp: BinaryIO, crc: int = 0) -> Tuple[int, int, int]:
data = fd.read(self._block_size)
insize = len(data)
foutsize = 0
insize: int = len(data)
foutsize: int = 0
while data:
crc = calculate_crc32(data, crc)
for i, compressor in enumerate(self.chain):
Expand Down
80 changes: 51 additions & 29 deletions py7zr/py7zr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
# p7zr library
#
# Copyright (c) 2019-2024 Hiroshi Miura <[email protected]>
# Copyright (c) 2019-2025 Hiroshi Miura <[email protected]>
# Copyright (c) 2004-2015 by Joachim Bauch, [email protected]
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
Expand Down Expand Up @@ -40,7 +40,7 @@
from multiprocessing import Process
from shutil import ReadError
from threading import Thread
from typing import IO, Any, BinaryIO, Optional, Union
from typing import IO, Any, BinaryIO, Optional, Tuple, Union

import multivolumefile

Expand Down Expand Up @@ -74,6 +74,7 @@
FILE_ATTRIBUTE_UNIX_EXTENSION = 0x8000
FILE_ATTRIBUTE_WINDOWS_MASK = 0x07FFF


class ArchiveFile:
"""Represent each files metadata inside archive file.
It holds file properties; filename, permissions, and type whether
Expand Down Expand Up @@ -331,7 +332,7 @@ def __init__(
mode: str = "r",
*,
filters: Optional[list[dict[str, int]]] = None,
dereference=False,
dereference: bool = False,
password: Optional[str] = None,
header_encryption: bool = False,
blocksize: Optional[int] = None,
Expand Down Expand Up @@ -385,7 +386,7 @@ def __init__(
elif isinstance(file, io.IOBase):
self._filePassed = True
self.fp = file
self.filename = getattr(file, "name", None)
self.filename = getattr(file, "name", None) # type: ignore
self.mode = mode # noqa
else:
raise TypeError(f"invalid file: {type(file)}")
Expand Down Expand Up @@ -531,6 +532,7 @@ def _extract(
path: Optional[Any] = None,
targets: Optional[Collection[str]] = None,
callback: Optional[ExtractCallback] = None,
enable_symlink: bool = False,
recursive: Optional[bool] = False,
writer_factory: Optional[WriterFactory] = None,
) -> None:
Expand Down Expand Up @@ -601,7 +603,12 @@ def _extract(
elif f.is_socket:
pass # TODO: implement me.
elif f.is_symlink or f.is_junction:
self.worker.register_filelike(f.id, outfilename)
if enable_symlink:
self.worker.register_filelike(f.id, outfilename)
else:
# Archive has symlink or junction
# this has security consequences.
raise ValueError("Archive has symbolic link that is not explicitly enabled.")
else:
self.worker.register_filelike(f.id, outfilename)
target_files.append((outfilename, f.file_properties()))
Expand Down Expand Up @@ -705,9 +712,9 @@ def _write_header(self):
self.sig_header.calccrc(header_len, header_crc)
self.sig_header.write(self.fp)

def _writeall(self, path, arcname):
def _writeall(self, path: pathlib.Path, arcname: Optional[str], dereference: bool) -> None:
try:
if path.is_symlink() and not self.dereference:
if path.is_symlink() and not dereference:
self.write(path, arcname)
elif path.is_file():
self.write(path, arcname)
Expand All @@ -716,14 +723,14 @@ def _writeall(self, path, arcname):
self.write(path, arcname)
for nm in sorted(os.listdir(str(path))):
arc = os.path.join(arcname, nm) if arcname is not None else None
self._writeall(path.joinpath(nm), arc)
self._writeall(path.joinpath(nm), arc, dereference)
else:
return # pathlib ignores ELOOP and return False for is_*().
except OSError as ose:
if self.dereference and ose.errno in [errno.ELOOP]:
if dereference and ose.errno in [errno.ELOOP]:
return # ignore ELOOP here, this resulted to stop looped symlink reference.
elif self.dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]:
return # ignore ENOENT which is happened when a case of ELOOP on windows.
elif dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]:
return # ignore ENOENT which is happened when a case of ELOOP on Windows.
else:
raise

Expand Down Expand Up @@ -814,7 +821,7 @@ def _var_release(self):
del self.sig_header

@staticmethod
def _make_file_info(target: pathlib.Path, arcname: Optional[str] = None, dereference=False) -> dict[str, Any]:
def _make_file_info(target: pathlib.Path, arcname: Optional[str] = None, dereference: bool = False) -> dict[str, Any]:
f: dict[str, Any] = {}
f["origin"] = target
if arcname is not None:
Expand Down Expand Up @@ -1014,13 +1021,14 @@ def extractall(
*,
callback: Optional[ExtractCallback] = None,
factory: Optional[WriterFactory] = None,
enable_symlink: bool = True,
) -> None:
"""Extract all members from the archive to the current working
directory and set owner, modification time and permissions on
directories afterward. ``path`` specifies a different directory
to extract to.
"""
self._extract(path=path, callback=callback, writer_factory=factory)
self._extract(path=path, callback=callback, writer_factory=factory, enable_symlink=enable_symlink)

def extract(
self,
Expand All @@ -1030,14 +1038,17 @@ def extract(
*,
callback: Optional[ExtractCallback] = None,
factory: Optional[WriterFactory] = None,
enable_symlink: bool = True,
) -> None:
if not self._is_none_or_collection(targets):
raise TypeError("Wrong argument type given.")
# For interoperability with ZipFile, we strip any trailing slashes
# This also matches the behavior of TarFile
if targets is not None:
targets = [remove_trailing_slash(target) for target in targets]
self._extract(path, targets, recursive=recursive, callback=callback, writer_factory=factory)
self._extract(
path, targets, recursive=recursive, callback=callback, writer_factory=factory, enable_symlink=enable_symlink
)

def reporter(self, callback: ExtractCallback):
while True:
Expand All @@ -1064,18 +1075,24 @@ def reporter(self, callback: ExtractCallback):
pass
self.q.task_done()

def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None):
def writeall(
self, path: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: Optional[bool] = None
) -> None:
"""Write files in target path into archive."""
if isinstance(path, str):
path = pathlib.Path(path)
if not path.exists():
raise ValueError("specified path does not exist.")
if dereference is None:
dereference = self.dereference
if path.is_dir() or path.is_file():
self._writeall(path, arcname)
self._writeall(path, arcname, dereference)
else:
raise ValueError("specified path is not a directory or a file")

def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None):
def write(
self, file: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: Optional[bool] = None
) -> None:
"""Write single target file into archive."""
if not isinstance(file, str) and not isinstance(file, pathlib.Path):
raise ValueError("Unsupported file type.")
Expand All @@ -1088,11 +1105,13 @@ def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None):
else:
path = file
folder = self.header.initialize()
file_info = self._make_file_info(path, arcname, self.dereference)
if dereference is None:
dereference = self.dereference
file_info = self._make_file_info(path, arcname, dereference)
self.header.files_info.files.append(file_info)
self.header.files_info.emptyfiles.append(file_info["emptystream"])
self.files.append(file_info)
self.worker.archive(self.fp, self.files, folder, deref=self.dereference)
self.worker.archive(self.fp, self.files, folder, dereference)

def writef(self, bio: IO[Any], arcname: str):
if not check_archive_path(arcname):
Expand Down Expand Up @@ -1123,7 +1142,7 @@ def _writef(self, bio: IO[Any], arcname: str):
self.header.files_info.files.append(file_info)
self.header.files_info.emptyfiles.append(file_info["emptystream"])
self.files.append(file_info)
self.worker.archive(self.fp, self.files, folder, deref=False)
self.worker.archive(self.fp, self.files, folder, False)
else:
file_info = self._make_file_info_from_name(bio, size, arcname)
self.header.files_info.files.append(file_info)
Expand Down Expand Up @@ -1530,7 +1549,7 @@ def _find_link_target(self, target):
member = linkname
return member

def _after_write(self, insize, foutsize, crc):
def _after_write(self, insize: int, foutsize: int, crc: int) -> Tuple[int, int]:
self.header.main_streams.substreamsinfo.digestsdefined.append(True)
self.header.main_streams.substreamsinfo.digests.append(crc)
if self.header.main_streams.substreamsinfo.unpacksizes is None:
Expand All @@ -1543,25 +1562,28 @@ def _after_write(self, insize, foutsize, crc):
self.header.main_streams.substreamsinfo.num_unpackstreams_folders[-1] += 1
return foutsize, crc

def write(self, fp: BinaryIO, f, assym, folder):
def write(self, fp: BinaryIO, f: ArchiveFile, assym: bool, folder: Folder) -> Tuple[int, int]:
compressor = folder.get_compressor()
if assym:
link_target: str = self._find_link_target(f.origin)
tgt: bytes = link_target.encode("utf-8")
fd = io.BytesIO(tgt)
fd: BinaryIO = io.BytesIO(tgt)
insize, foutsize, crc = compressor.compress(fd, fp)
fd.close()
else:
with f.origin.open(mode="rb") as fd:
insize, foutsize, crc = compressor.compress(fd, fp)
return self._after_write(insize, foutsize, crc)

def writestr(self, fp: BinaryIO, f, folder):
def writestr(self, fp: BinaryIO, f: ArchiveFile, folder: Folder) -> Tuple[int, int]:
compressor = folder.get_compressor()
insize, foutsize, crc = compressor.compress(f.data(), fp)
fd: Optional[BinaryIO] = f.data()
if fd is None:
return 0, 0
insize, foutsize, crc = compressor.compress(fd, fp)
return self._after_write(insize, foutsize, crc)

def flush_archive(self, fp, folder):
def flush_archive(self, fp: BinaryIO, folder: Folder) -> None:
compressor = folder.get_compressor()
foutsize = compressor.flush(fp)
if len(self.files) > 0:
Expand All @@ -1577,16 +1599,16 @@ def flush_archive(self, fp, folder):
self.header.main_streams.packinfo.packsizes.append(compressor.packsize)
folder.unpacksizes = compressor.unpacksizes

def archive(self, fp: BinaryIO, files, folder, deref=False):
def archive(self, fp: BinaryIO, files: ArchiveFileList, folder: Folder, dereference: bool) -> None:
"""Run archive task for specified 7zip folder."""
f = files[self.current_file_index]
if f.has_strdata():
foutsize, crc = self.writestr(fp, f, folder)
self.header.files_info.files[self.current_file_index]["maxsize"] = foutsize
self.header.files_info.files[self.current_file_index]["digest"] = crc
self.last_file_index = self.current_file_index
elif (f.is_symlink and not deref) or not f.emptystream:
foutsize, crc = self.write(fp, f, (f.is_symlink and not deref), folder)
elif (f.is_symlink and not dereference) or not f.emptystream:
foutsize, crc = self.write(fp, f, (f.is_symlink and not dereference), folder)
self.header.files_info.files[self.current_file_index]["maxsize"] = foutsize
self.header.files_info.files[self.current_file_index]["digest"] = crc
self.last_file_index = self.current_file_index
Expand Down