diff --git a/py7zr/cli.py b/py7zr/cli.py index 71ed9eba..29db092b 100644 --- a/py7zr/cli.py +++ b/py7zr/cli.py @@ -31,7 +31,6 @@ from lzma import CHECK_CRC64, CHECK_SHA256, is_check_supported from typing import Any, Optional -import _lzma # type: ignore import multivolumefile import texttable # type: ignore @@ -347,8 +346,6 @@ def run_extract(self, args: argparse.Namespace) -> int: else: print("The archive is corrupted, or password is wrong. ABORT.") return 1 - except _lzma.LZMAError: - return 1 cb = None # Optional[ExtractCallback] if verbose: @@ -374,8 +371,6 @@ def run_extract(self, args: argparse.Namespace) -> int: else: print("The archive is corrupted, or password is wrong. ABORT.") return 1 - except _lzma.LZMAError: - return 1 else: return 0 diff --git a/py7zr/compressor.py b/py7zr/compressor.py index 852bb608..6ba4e956 100644 --- a/py7zr/compressor.py +++ b/py7zr/compressor.py @@ -2,7 +2,7 @@ # # p7zr library # -# Copyright (c) 2019-2023 Hiroshi Miura +# Copyright (c) 2019-2025 Hiroshi Miura # Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de # 7-Zip Copyright (C) 1999-2010 Igor Pavlov # LZMA SDK Copyright (C) 1999-2010 Igor Pavlov @@ -28,7 +28,7 @@ import zlib from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Optional, Union +from typing import Any, BinaryIO, Optional, Tuple, Union import bcj import inflate64 @@ -890,10 +890,10 @@ def _set_alternate_compressors_coders(self, alt_filter, password=None): }, ) - def compress(self, fd, fp, crc=0): + def compress(self, fd: BinaryIO, fp: BinaryIO, crc: int = 0) -> Tuple[int, int, int]: data = fd.read(self._block_size) - insize = len(data) - foutsize = 0 + insize: int = len(data) + foutsize: int = 0 while data: crc = calculate_crc32(data, crc) for i, compressor in enumerate(self.chain): diff --git a/py7zr/py7zr.py b/py7zr/py7zr.py index fc021ad9..d7fefa76 100644 --- a/py7zr/py7zr.py +++ b/py7zr/py7zr.py @@ -2,7 +2,7 @@ # # p7zr library # -# Copyright (c) 2019-2024 Hiroshi Miura +# Copyright (c) 2019-2025 Hiroshi Miura # Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de # 7-Zip Copyright (C) 1999-2010 Igor Pavlov # LZMA SDK Copyright (C) 1999-2010 Igor Pavlov @@ -40,7 +40,7 @@ from multiprocessing import Process from shutil import ReadError from threading import Thread -from typing import IO, Any, BinaryIO, Optional, Union +from typing import IO, Any, BinaryIO, Optional, Tuple, Union import multivolumefile @@ -74,6 +74,7 @@ FILE_ATTRIBUTE_UNIX_EXTENSION = 0x8000 FILE_ATTRIBUTE_WINDOWS_MASK = 0x07FFF + class ArchiveFile: """Represent each files metadata inside archive file. It holds file properties; filename, permissions, and type whether @@ -331,7 +332,7 @@ def __init__( mode: str = "r", *, filters: Optional[list[dict[str, int]]] = None, - dereference=False, + dereference: bool = False, password: Optional[str] = None, header_encryption: bool = False, blocksize: Optional[int] = None, @@ -385,7 +386,7 @@ def __init__( elif isinstance(file, io.IOBase): self._filePassed = True self.fp = file - self.filename = getattr(file, "name", None) + self.filename = getattr(file, "name", None) # type: ignore self.mode = mode # noqa else: raise TypeError(f"invalid file: {type(file)}") @@ -531,6 +532,7 @@ def _extract( path: Optional[Any] = None, targets: Optional[Collection[str]] = None, callback: Optional[ExtractCallback] = None, + enable_symlink: bool = False, recursive: Optional[bool] = False, writer_factory: Optional[WriterFactory] = None, ) -> None: @@ -601,7 +603,12 @@ def _extract( elif f.is_socket: pass # TODO: implement me. elif f.is_symlink or f.is_junction: - self.worker.register_filelike(f.id, outfilename) + if enable_symlink: + self.worker.register_filelike(f.id, outfilename) + else: + # Archive has symlink or junction + # this has security consequences. + raise ValueError("Archive has symbolic link that is not explicitly enabled.") else: self.worker.register_filelike(f.id, outfilename) target_files.append((outfilename, f.file_properties())) @@ -705,9 +712,9 @@ def _write_header(self): self.sig_header.calccrc(header_len, header_crc) self.sig_header.write(self.fp) - def _writeall(self, path, arcname): + def _writeall(self, path: pathlib.Path, arcname: Optional[str], dereference: bool) -> None: try: - if path.is_symlink() and not self.dereference: + if path.is_symlink() and not dereference: self.write(path, arcname) elif path.is_file(): self.write(path, arcname) @@ -716,14 +723,14 @@ def _writeall(self, path, arcname): self.write(path, arcname) for nm in sorted(os.listdir(str(path))): arc = os.path.join(arcname, nm) if arcname is not None else None - self._writeall(path.joinpath(nm), arc) + self._writeall(path.joinpath(nm), arc, dereference) else: return # pathlib ignores ELOOP and return False for is_*(). except OSError as ose: - if self.dereference and ose.errno in [errno.ELOOP]: + if dereference and ose.errno in [errno.ELOOP]: return # ignore ELOOP here, this resulted to stop looped symlink reference. - elif self.dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]: - return # ignore ENOENT which is happened when a case of ELOOP on windows. + elif dereference and sys.platform == "win32" and ose.errno in [errno.ENOENT]: + return # ignore ENOENT which is happened when a case of ELOOP on Windows. else: raise @@ -814,7 +821,7 @@ def _var_release(self): del self.sig_header @staticmethod - def _make_file_info(target: pathlib.Path, arcname: Optional[str] = None, dereference=False) -> dict[str, Any]: + def _make_file_info(target: pathlib.Path, arcname: Optional[str] = None, dereference: bool = False) -> dict[str, Any]: f: dict[str, Any] = {} f["origin"] = target if arcname is not None: @@ -1014,13 +1021,14 @@ def extractall( *, callback: Optional[ExtractCallback] = None, factory: Optional[WriterFactory] = None, + enable_symlink: bool = True, ) -> None: """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterward. ``path`` specifies a different directory to extract to. """ - self._extract(path=path, callback=callback, writer_factory=factory) + self._extract(path=path, callback=callback, writer_factory=factory, enable_symlink=enable_symlink) def extract( self, @@ -1030,6 +1038,7 @@ def extract( *, callback: Optional[ExtractCallback] = None, factory: Optional[WriterFactory] = None, + enable_symlink: bool = True, ) -> None: if not self._is_none_or_collection(targets): raise TypeError("Wrong argument type given.") @@ -1037,7 +1046,9 @@ def extract( # This also matches the behavior of TarFile if targets is not None: targets = [remove_trailing_slash(target) for target in targets] - self._extract(path, targets, recursive=recursive, callback=callback, writer_factory=factory) + self._extract( + path, targets, recursive=recursive, callback=callback, writer_factory=factory, enable_symlink=enable_symlink + ) def reporter(self, callback: ExtractCallback): while True: @@ -1064,18 +1075,24 @@ def reporter(self, callback: ExtractCallback): pass self.q.task_done() - def writeall(self, path: Union[pathlib.Path, str], arcname: Optional[str] = None): + def writeall( + self, path: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: Optional[bool] = None + ) -> None: """Write files in target path into archive.""" if isinstance(path, str): path = pathlib.Path(path) if not path.exists(): raise ValueError("specified path does not exist.") + if dereference is None: + dereference = self.dereference if path.is_dir() or path.is_file(): - self._writeall(path, arcname) + self._writeall(path, arcname, dereference) else: raise ValueError("specified path is not a directory or a file") - def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None): + def write( + self, file: Union[pathlib.Path, str], arcname: Optional[str] = None, dereference: Optional[bool] = None + ) -> None: """Write single target file into archive.""" if not isinstance(file, str) and not isinstance(file, pathlib.Path): raise ValueError("Unsupported file type.") @@ -1088,11 +1105,13 @@ def write(self, file: Union[pathlib.Path, str], arcname: Optional[str] = None): else: path = file folder = self.header.initialize() - file_info = self._make_file_info(path, arcname, self.dereference) + if dereference is None: + dereference = self.dereference + file_info = self._make_file_info(path, arcname, dereference) self.header.files_info.files.append(file_info) self.header.files_info.emptyfiles.append(file_info["emptystream"]) self.files.append(file_info) - self.worker.archive(self.fp, self.files, folder, deref=self.dereference) + self.worker.archive(self.fp, self.files, folder, dereference) def writef(self, bio: IO[Any], arcname: str): if not check_archive_path(arcname): @@ -1123,7 +1142,7 @@ def _writef(self, bio: IO[Any], arcname: str): self.header.files_info.files.append(file_info) self.header.files_info.emptyfiles.append(file_info["emptystream"]) self.files.append(file_info) - self.worker.archive(self.fp, self.files, folder, deref=False) + self.worker.archive(self.fp, self.files, folder, False) else: file_info = self._make_file_info_from_name(bio, size, arcname) self.header.files_info.files.append(file_info) @@ -1530,7 +1549,7 @@ def _find_link_target(self, target): member = linkname return member - def _after_write(self, insize, foutsize, crc): + def _after_write(self, insize: int, foutsize: int, crc: int) -> Tuple[int, int]: self.header.main_streams.substreamsinfo.digestsdefined.append(True) self.header.main_streams.substreamsinfo.digests.append(crc) if self.header.main_streams.substreamsinfo.unpacksizes is None: @@ -1543,12 +1562,12 @@ def _after_write(self, insize, foutsize, crc): self.header.main_streams.substreamsinfo.num_unpackstreams_folders[-1] += 1 return foutsize, crc - def write(self, fp: BinaryIO, f, assym, folder): + def write(self, fp: BinaryIO, f: ArchiveFile, assym: bool, folder: Folder) -> Tuple[int, int]: compressor = folder.get_compressor() if assym: link_target: str = self._find_link_target(f.origin) tgt: bytes = link_target.encode("utf-8") - fd = io.BytesIO(tgt) + fd: BinaryIO = io.BytesIO(tgt) insize, foutsize, crc = compressor.compress(fd, fp) fd.close() else: @@ -1556,12 +1575,15 @@ def write(self, fp: BinaryIO, f, assym, folder): insize, foutsize, crc = compressor.compress(fd, fp) return self._after_write(insize, foutsize, crc) - def writestr(self, fp: BinaryIO, f, folder): + def writestr(self, fp: BinaryIO, f: ArchiveFile, folder: Folder) -> Tuple[int, int]: compressor = folder.get_compressor() - insize, foutsize, crc = compressor.compress(f.data(), fp) + fd: Optional[BinaryIO] = f.data() + if fd is None: + return 0, 0 + insize, foutsize, crc = compressor.compress(fd, fp) return self._after_write(insize, foutsize, crc) - def flush_archive(self, fp, folder): + def flush_archive(self, fp: BinaryIO, folder: Folder) -> None: compressor = folder.get_compressor() foutsize = compressor.flush(fp) if len(self.files) > 0: @@ -1577,7 +1599,7 @@ def flush_archive(self, fp, folder): self.header.main_streams.packinfo.packsizes.append(compressor.packsize) folder.unpacksizes = compressor.unpacksizes - def archive(self, fp: BinaryIO, files, folder, deref=False): + def archive(self, fp: BinaryIO, files: ArchiveFileList, folder: Folder, dereference: bool) -> None: """Run archive task for specified 7zip folder.""" f = files[self.current_file_index] if f.has_strdata(): @@ -1585,8 +1607,8 @@ def archive(self, fp: BinaryIO, files, folder, deref=False): self.header.files_info.files[self.current_file_index]["maxsize"] = foutsize self.header.files_info.files[self.current_file_index]["digest"] = crc self.last_file_index = self.current_file_index - elif (f.is_symlink and not deref) or not f.emptystream: - foutsize, crc = self.write(fp, f, (f.is_symlink and not deref), folder) + elif (f.is_symlink and not dereference) or not f.emptystream: + foutsize, crc = self.write(fp, f, (f.is_symlink and not dereference), folder) self.header.files_info.files[self.current_file_index]["maxsize"] = foutsize self.header.files_info.files[self.current_file_index]["digest"] = crc self.last_file_index = self.current_file_index