From bcf4d41d90191a308c425ad9c6a9ebba2b247f2e Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Tue, 31 Jan 2023 14:40:52 +0100 Subject: [PATCH 01/13] Implement tarfile filter PEP --- Lib/shutil.py | 13 +- Lib/tarfile.py | 321 +++++++++++++++-- Lib/test/test_tarfile.py | 758 +++++++++++++++++++++++++++++++++++++-- 3 files changed, 1042 insertions(+), 50 deletions(-) diff --git a/Lib/shutil.py b/Lib/shutil.py index 867925aa10cc04..24ee23fccf8742 100644 --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -1261,7 +1261,7 @@ def _find_unpack_format(filename): return name return None -def unpack_archive(filename, extract_dir=None, format=None): +def unpack_archive(filename, extract_dir=None, format=None, filter=None): """Unpack an archive. `filename` is the name of the archive. @@ -1275,6 +1275,9 @@ def unpack_archive(filename, extract_dir=None, format=None): was registered for that extension. In case none is found, a ValueError is raised. + + If `filter` isgiven, it is passed to he underlying + extraction function. """ sys.audit("shutil.unpack_archive", filename, extract_dir, format) @@ -1284,6 +1287,10 @@ def unpack_archive(filename, extract_dir=None, format=None): extract_dir = os.fspath(extract_dir) filename = os.fspath(filename) + if filter is None: + filter_kwargs = {} + else: + filter_kwargs = {'filter': filter} if format is not None: try: format_info = _UNPACK_FORMATS[format] @@ -1291,7 +1298,7 @@ def unpack_archive(filename, extract_dir=None, format=None): raise ValueError("Unknown unpack format '{0}'".format(format)) from None func = format_info[1] - func(filename, extract_dir, **dict(format_info[2])) + func(filename, extract_dir, **dict(format_info[2]), **filter_kwargs) else: # we need to look at the registered unpackers supported extensions format = _find_unpack_format(filename) @@ -1299,7 +1306,7 @@ def unpack_archive(filename, extract_dir=None, format=None): raise ReadError("Unknown archive format '{0}'".format(filename)) func = _UNPACK_FORMATS[format][1] - kwargs = dict(_UNPACK_FORMATS[format][2]) + kwargs = filter_kwargs | dict(_UNPACK_FORMATS[format][2]) func(filename, extract_dir, **kwargs) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index d686435d90ad1b..16ad9b77d279cd 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -46,6 +46,7 @@ import struct import copy import re +import warnings try: import pwd @@ -65,7 +66,11 @@ __all__ = ["TarFile", "TarInfo", "is_tarfile", "TarError", "ReadError", "CompressionError", "StreamError", "ExtractError", "HeaderError", "ENCODING", "USTAR_FORMAT", "GNU_FORMAT", "PAX_FORMAT", - "DEFAULT_FORMAT", "open"] + "DEFAULT_FORMAT", "open","fully_trusted_filter", "data_filter", + "tar_filter", "FilterError", "AbsoluteLinkError", + "OutsideDestinationError", "SpecialFileError", "AbsolutePathError", + "LinkOutsideDestinationError"] + #--------------------------------------------------------- # tar constants @@ -154,6 +159,8 @@ def stn(s, length, encoding, errors): """Convert a string to a null-terminated bytes object. """ + if s is None: + raise ValueError("metadata cannot contain None") s = s.encode(encoding, errors) return s[:length] + (length - len(s)) * NUL @@ -707,9 +714,126 @@ def __init__(self, tarfile, tarinfo): super().__init__(fileobj) #class ExFileObject + +#----------------------------- +# extraction filters (PEP XXX) +#----------------------------- + +class FilterError(TarError): + pass + +class AbsolutePathError(FilterError): + # XXX Windows only + pass + +class OutsideDestinationError(FilterError): + def __init__(self, tarinfo, path): + self.tarinfo = tarinfo + self._path = path + super().__init__(f'{tarinfo.name!r} would be extracted to {path!r}, ' + + 'which is outside the destination') + +class SpecialFileError(FilterError): + def __init__(self, tarinfo): + self.tarinfo = tarinfo + super().__init__(f'{tarinfo.name!r} is a special file') + +class AbsoluteLinkError(FilterError): + def __init__(self, tarinfo): + self.tarinfo = tarinfo + super().__init__(f'{tarinfo.name!r} is a symlink to an absolute path') + +class LinkOutsideDestinationError(FilterError): + def __init__(self, tarinfo, path): + self.tarinfo = tarinfo + self._path = path + super().__init__(f'{tarinfo.name!r} would link to {path!r}, ' + + 'which is outside the destination') + +def _get_filtered_attrs(member, dest_path, for_data=True): + new_attrs = {} + name = member.name + dest_path = os.path.realpath(dest_path) + # Strip leading / (tar's directory separator) from filenames. + # Include os.sep (target OS directory separator) as well. + if name.startswith(('/', os.sep)): + name = new_attrs['name'] = member.path.lstrip('/' + os.sep) + if os.path.isabs(name): + # Path is absolute even after stripping. + # For example, 'C:/foo' on Windows. + raise AbsolutePathError(member) + # Ensure we stay in the destination + target_path = os.path.realpath(os.path.join(dest_path, name)) + if os.path.commonpath([target_path, dest_path]) != dest_path: + raise OutsideDestinationError(member, target_path) + # Limit permissions (no high bits, and go-w) + mode = member.mode + if mode is not None: + # Strip high bits & group/other write bits + mode = mode & 0o755 + if for_data: + # For data, handle permissions & file types + if member.isreg() or member.islnk(): + if not mode & 0o100: + # Clear executable bits if not executable by user + mode &= ~0o111 + # Ensure owner can read & write + mode |= 0o600 + elif member.isdir() or member.issym(): + # Ignore mode for directories & symlinks + mode = None + else: + # Reject special files + raise SpecialFileError(member) + if mode != member.mode: + new_attrs['mode'] = mode + if for_data: + # Ignore ownership for 'data' + if member.uid is not None: + new_attrs['uid'] = None + if member.gid is not None: + new_attrs['gid'] = None + if member.uname is not None: + new_attrs['uname'] = None + if member.gname is not None: + new_attrs['gname'] = None + # Check link destination for 'data' + if member.islnk() or member.issym(): + if os.path.isabs(member.linkname): + raise AbsoluteLinkError(member) + target_path = os.path.realpath(os.path.join(dest_path, member.linkname)) + if os.path.commonpath([target_path, dest_path]) != dest_path: + raise LinkOutsideDestinationError(member, target_path) + return new_attrs + +def fully_trusted_filter(member, dest_path): + return member + +def tar_filter(member, dest_path): + new_attrs = _get_filtered_attrs(member, dest_path, False) + if new_attrs: + return member.replace(**new_attrs, deep=False) + return member + +def data_filter(member, dest_path): + new_attrs = _get_filtered_attrs(member, dest_path, True) + if new_attrs: + return member.replace(**new_attrs, deep=False) + return member + +_NAMED_FILTERS = { + "fully_trusted": fully_trusted_filter, + "tar": tar_filter, + "data": data_filter, +} + #------------------ # Exported Classes #------------------ + +# Sentinel for replace() defaults, meaning "don't change the attribute" +_KEEP = object() + class TarInfo(object): """Informational class which holds the details about an archive member given by a tar header block. @@ -790,12 +914,44 @@ def linkpath(self, linkname): def __repr__(self): return "<%s %r at %#x>" % (self.__class__.__name__,self.name,id(self)) + def replace(self, *, + name=_KEEP, mtime=_KEEP, mode=_KEEP, linkname=_KEEP, + uid=_KEEP, gid=_KEEP, uname=_KEEP, gname=_KEEP, + deep=True, _KEEP=_KEEP): + """Return a deep copy of self with the given attributes replaced. + """ + if deep: + result = copy.deepcopy(self) + else: + result = copy.copy(self) + if name is not _KEEP: + result.name = name + if mtime is not _KEEP: + result.mtime = mtime + if mode is not _KEEP: + result.mode = mode + if linkname is not _KEEP: + result.linkname = linkname + if uid is not _KEEP: + result.uid = uid + if gid is not _KEEP: + result.gid = gid + if uname is not _KEEP: + result.uname = uname + if gname is not _KEEP: + result.gname = gname + return result + def get_info(self): """Return the TarInfo's attributes as a dictionary. """ + if self.mode == None: + mode = None + else: + mode = self.mode & 0o7777 info = { "name": self.name, - "mode": self.mode & 0o7777, + "mode": mode, "uid": self.uid, "gid": self.gid, "size": self.size, @@ -818,6 +974,9 @@ def tobuf(self, format=DEFAULT_FORMAT, encoding=ENCODING, errors="surrogateescap """Return a tar header as a string of 512 byte blocks. """ info = self.get_info() + for name, value in info.items(): + if value is None: + raise ValueError("%s may not be None" % name) if format == USTAR_FORMAT: return self.create_ustar_header(info, encoding, errors) @@ -948,6 +1107,12 @@ def _create_header(info, format, encoding, errors): devmajor = stn("", 8, encoding, errors) devminor = stn("", 8, encoding, errors) + # None values in metadata should cause ValueError. + # itn()/stn() do this for all fields except type. + filetype = info.get("type", REGTYPE) + if filetype is None: + raise ValueError("TarInfo.type must not be None") + parts = [ stn(info.get("name", ""), 100, encoding, errors), itn(info.get("mode", 0) & 0o7777, 8, format), @@ -956,7 +1121,7 @@ def _create_header(info, format, encoding, errors): itn(info.get("size", 0), 12, format), itn(info.get("mtime", 0), 12, format), b" ", # checksum field - info.get("type", REGTYPE), + filetype, stn(info.get("linkname", ""), 100, encoding, errors), info.get("magic", POSIX_MAGIC), stn(info.get("uname", ""), 32, encoding, errors), @@ -1462,6 +1627,8 @@ class TarFile(object): fileobject = ExFileObject # The file-object for extractfile(). + extraction_filter = None # The default filter for extraction. + def __init__(self, name=None, mode="r", fileobj=None, format=None, tarinfo=None, dereference=None, ignore_zeros=None, encoding=None, errors="surrogateescape", pax_headers=None, debug=None, @@ -1936,7 +2103,10 @@ def list(self, verbose=True, *, members=None): members = self for tarinfo in members: if verbose: - _safe_print(stat.filemode(tarinfo.mode)) + if tarinfo.mode is None: + _safe_print("??????????") + else: + _safe_print(stat.filemode(tarinfo.mode)) _safe_print("%s/%s" % (tarinfo.uname or tarinfo.uid, tarinfo.gname or tarinfo.gid)) if tarinfo.ischr() or tarinfo.isblk(): @@ -1944,8 +2114,11 @@ def list(self, verbose=True, *, members=None): ("%d,%d" % (tarinfo.devmajor, tarinfo.devminor))) else: _safe_print("%10d" % tarinfo.size) - _safe_print("%d-%02d-%02d %02d:%02d:%02d" \ - % time.localtime(tarinfo.mtime)[:6]) + if tarinfo.mtime is None: + _safe_print("????-??-?? ??:??:??") + else: + _safe_print("%d-%02d-%02d %02d:%02d:%02d" \ + % time.localtime(tarinfo.mtime)[:6]) _safe_print(tarinfo.name + ("/" if tarinfo.isdir() else "")) @@ -2032,32 +2205,58 @@ def addfile(self, tarinfo, fileobj=None): self.members.append(tarinfo) - def extractall(self, path=".", members=None, *, numeric_owner=False): + def _get_filter_function(self, filter): + if filter is None: + filter = self.extraction_filter + if filter is None: + warnings.warn( + 'Python 3.14 will, by default, filter extracted tar ' + + 'archives and reject files or modify their metadata. ' + + 'Use the filter argument to control this behavior.', + DeprecationWarning) + return fully_trusted_filter + return filter + if callable(filter): + return filter + try: + return _NAMED_FILTERS[filter] + except KeyError: + raise LookupError(f"filter {filter!r} not found") + + def extractall(self, path=".", members=None, *, numeric_owner=False, + filter=None): """Extract all members from the archive to the current working directory and set owner, modification time and permissions on directories afterwards. `path' specifies a different directory to extract to. `members' is optional and must be a subset of the list returned by getmembers(). If `numeric_owner` is True, only the numbers for user/group names are used and not the names. + + The `filter` function will be called on each member just + before extraction. + It can return a changed TarInfo or None to skip the member. + String names of common filters are accepted. """ directories = [] + filter_function = self._get_filter_function(filter) if members is None: members = self - for tarinfo in members: + for member in members: + tarinfo = self._get_extract_tarinfo(member, filter_function, path) + if tarinfo is None: + continue if tarinfo.isdir(): - # Extract directories with a safe mode. + # For directories, delay setting attributes until later, + # since permissions can interfere with extraction and + # extracting contents can reset mtime. directories.append(tarinfo) - tarinfo = copy.copy(tarinfo) - tarinfo.mode = 0o700 - # Do not set_attrs directories, as we will do that further down - self.extract(tarinfo, path, set_attrs=not tarinfo.isdir(), - numeric_owner=numeric_owner) + self._extract_one(tarinfo, path, set_attrs=not tarinfo.isdir(), + numeric_owner=numeric_owner) # Reverse sort directories. - directories.sort(key=lambda a: a.name) - directories.reverse() + directories.sort(key=lambda a: a.name, reverse=True) # Set correct owner, mtime and filemode on directories. for tarinfo in directories: @@ -2072,7 +2271,8 @@ def extractall(self, path=".", members=None, *, numeric_owner=False): else: self._dbg(1, "tarfile: %s" % e) - def extract(self, member, path="", set_attrs=True, *, numeric_owner=False): + def extract(self, member, path="", set_attrs=True, *, numeric_owner=False, + filter=None): """Extract a member from the archive to the current working directory, using its full name. Its file information is extracted as accurately as possible. `member' may be a filename or a TarInfo object. You can @@ -2080,17 +2280,43 @@ def extract(self, member, path="", set_attrs=True, *, numeric_owner=False): mtime, mode) are set unless `set_attrs' is False. If `numeric_owner` is True, only the numbers for user/group names are used and not the names. + + The `filter` function will be called before extraction. + It can return a changed TarInfo or None to skip the member. + String names of common filters are accepted. """ - self._check("r") + filter_function = self._get_filter_function(filter) + tarinfo = self._get_extract_tarinfo(member, filter_function, path) + if tarinfo is not None: + self._extract_one(tarinfo, path, set_attrs, numeric_owner) + def _get_extract_tarinfo(self, member, filter_function, path): + """Get filtered TarInfo (or None) from member, which might be a str""" if isinstance(member, str): tarinfo = self.getmember(member) else: tarinfo = member + unfiltered = tarinfo + try: + tarinfo = filter_function(tarinfo, path) + except TarError as e: + if self.errorlevel > 0: + raise + else: + self._dbg(1, "tarfile: %s" % e) + if tarinfo is None: + self._dbg(2, "tarfile: Excluded %r" % unfiltered.name) + return None # Prepare the link target for makelink(). if tarinfo.islnk(): + tarinfo = copy.copy(tarinfo) tarinfo._link_target = os.path.join(path, tarinfo.linkname) + return tarinfo + + def _extract_one(self, tarinfo, path, set_attrs, numeric_owner): + """Extract from filtered tarinfo to disk""" + self._check("r") try: self._extract_member(tarinfo, os.path.join(path, tarinfo.name), @@ -2195,9 +2421,13 @@ def makedir(self, tarinfo, targetpath): """Make a directory called targetpath. """ try: - # Use a safe mode for the directory, the real mode is set - # later in _extract_member(). - os.mkdir(targetpath, 0o700) + if tarinfo.mode is None: + # Use the system's default mode + os.mkdir(targetpath) + else: + # Use a safe mode for the directory, the real mode is set + # later in _extract_member(). + os.mkdir(targetpath, 0o700) except FileExistsError: pass @@ -2240,6 +2470,9 @@ def makedev(self, tarinfo, targetpath): raise ExtractError("special devices not supported by system") mode = tarinfo.mode + if mode is None: + # Use mknod's default + mode = 0o600 if tarinfo.isblk(): mode |= stat.S_IFBLK else: @@ -2261,7 +2494,6 @@ def makelink(self, tarinfo, targetpath): os.unlink(targetpath) os.symlink(tarinfo.linkname, targetpath) else: - # See extract(). if os.path.exists(tarinfo._link_target): os.link(tarinfo._link_target, targetpath) else: @@ -2286,15 +2518,19 @@ def chown(self, tarinfo, targetpath, numeric_owner): u = tarinfo.uid if not numeric_owner: try: - if grp: + if grp and tarinfo.gname: g = grp.getgrnam(tarinfo.gname)[2] except KeyError: pass try: - if pwd: + if pwd and tarinfo.uname: u = pwd.getpwnam(tarinfo.uname)[2] except KeyError: pass + if g is None: + g = -1 + if u is None: + u = -1 try: if tarinfo.issym() and hasattr(os, "lchown"): os.lchown(targetpath, u, g) @@ -2306,6 +2542,8 @@ def chown(self, tarinfo, targetpath, numeric_owner): def chmod(self, tarinfo, targetpath): """Set file permissions of targetpath according to tarinfo. """ + if tarinfo.mode is None: + return try: os.chmod(targetpath, tarinfo.mode) except OSError as e: @@ -2314,10 +2552,13 @@ def chmod(self, tarinfo, targetpath): def utime(self, tarinfo, targetpath): """Set modification time of targetpath according to tarinfo. """ + mtime = tarinfo.mtime + if mtime is None: + return if not hasattr(os, 'utime'): return try: - os.utime(targetpath, (tarinfo.mtime, tarinfo.mtime)) + os.utime(targetpath, (mtime, mtime)) except OSError as e: raise ExtractError("could not change modification time") from e @@ -2395,13 +2636,26 @@ def _getmember(self, name, tarinfo=None, normalize=False): members = self.getmembers() # Limit the member search list up to tarinfo. + skipping = False if tarinfo is not None: - members = members[:members.index(tarinfo)] + try: + index = members.index(tarinfo) + except ValueError: + # The given starting point might be a (modified) copy. + # We'll later skip members until we find an equivalent. + skipping = True + else: + # Happy fast path + members = members[:index] if normalize: name = os.path.normpath(name) for member in reversed(members): + if skipping: + if tarinfo.offset == member.offset: + skipping = False + continue if normalize: member_name = os.path.normpath(member.name) else: @@ -2410,6 +2664,10 @@ def _getmember(self, name, tarinfo=None, normalize=False): if name == member_name: return member + if skipping: + # Starting point was not found + raise ValueError(tarinfo) + def _load(self): """Read through the entire archive file and look for readable members. @@ -2500,6 +2758,7 @@ def __exit__(self, type, value, traceback): #-------------------- # exported functions #-------------------- + def is_tarfile(name): """Return True if name points to a tar archive that we are able to handle, else return False. @@ -2528,6 +2787,10 @@ def main(): parser = argparse.ArgumentParser(description=description) parser.add_argument('-v', '--verbose', action='store_true', default=False, help='Verbose output') + parser.add_argument('--filter', metavar='', + choices=_NAMED_FILTERS, + help='Filter for extraction') + group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-l', '--list', metavar='', help='Show listing of a tarfile') @@ -2539,8 +2802,12 @@ def main(): help='Create tarfile from sources') group.add_argument('-t', '--test', metavar='', help='Test if a tarfile is valid') + args = parser.parse_args() + if args.filter and args.extract is None: + parser.exit(1, '--filter is only valid for extraction\n') + if args.test is not None: src = args.test if is_tarfile(src): @@ -2571,7 +2838,7 @@ def main(): if is_tarfile(src): with TarFile.open(src, 'r:*') as tf: - tf.extractall(path=curdir) + tf.extractall(path=curdir, filter=args.filter) if args.verbose: if curdir == '.': msg = '{!r} file is extracted.'.format(src) diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index f15a800976681c..1a22f2abfa43c3 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -2,9 +2,16 @@ import os import io from hashlib import sha256 -from contextlib import contextmanager +from contextlib import contextmanager, ExitStack from random import Random import pathlib +import shutil +import time +import re +import datetime +import warnings +import stat +import inspect import unittest import unittest.mock @@ -108,7 +115,7 @@ def test_fileobj_regular_file(self): "regular file extraction failed") def test_fileobj_readlines(self): - self.tar.extract("ustar/regtype", TEMPDIR) + self.tar.extract("ustar/regtype", TEMPDIR, filter='data') tarinfo = self.tar.getmember("ustar/regtype") with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: lines1 = fobj1.readlines() @@ -126,7 +133,7 @@ def test_fileobj_readlines(self): "fileobj.readlines() failed") def test_fileobj_iter(self): - self.tar.extract("ustar/regtype", TEMPDIR) + self.tar.extract("ustar/regtype", TEMPDIR, filter='data') tarinfo = self.tar.getmember("ustar/regtype") with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: lines1 = fobj1.readlines() @@ -136,7 +143,8 @@ def test_fileobj_iter(self): "fileobj.__iter__() failed") def test_fileobj_seek(self): - self.tar.extract("ustar/regtype", TEMPDIR) + self.tar.extract("ustar/regtype", TEMPDIR, + filter='data') with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: data = fobj.read() @@ -466,7 +474,7 @@ def test_premature_end_of_archive(self): t = tar.next() with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): - tar.extract(t, TEMPDIR) + tar.extract(t, TEMPDIR, filter='data') with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): tar.extractfile(t).read() @@ -621,16 +629,16 @@ def test_find_members(self): def test_extract_hardlink(self): # Test hardlink extraction (e.g. bug #857297). with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: - tar.extract("ustar/regtype", TEMPDIR) + tar.extract("ustar/regtype", TEMPDIR, filter='data') self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype")) - tar.extract("ustar/lnktype", TEMPDIR) + tar.extract("ustar/lnktype", TEMPDIR, filter='data') self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: data = f.read() self.assertEqual(sha256sum(data), sha256_regtype) - tar.extract("ustar/symtype", TEMPDIR) + tar.extract("ustar/symtype", TEMPDIR, filter='data') self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype")) with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: data = f.read() @@ -645,13 +653,14 @@ def test_extractall(self): os.mkdir(DIR) try: directories = [t for t in tar if t.isdir()] - tar.extractall(DIR, directories) + tar.extractall(DIR, directories, filter='fully_trusted') for tarinfo in directories: path = os.path.join(DIR, tarinfo.name) if sys.platform != "win32": # Win32 has no support for fine grained permissions. self.assertEqual(tarinfo.mode & 0o777, - os.stat(path).st_mode & 0o777) + os.stat(path).st_mode & 0o777, + tarinfo.name) def format_mtime(mtime): if isinstance(mtime, float): return "{} ({})".format(mtime, mtime.hex()) @@ -675,7 +684,7 @@ def test_extract_directory(self): try: with tarfile.open(tarname, encoding="iso8859-1") as tar: tarinfo = tar.getmember(dirtype) - tar.extract(tarinfo, path=DIR) + tar.extract(tarinfo, path=DIR, filter='fully_trusted') extracted = os.path.join(DIR, dirtype) self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) if sys.platform != "win32": @@ -688,7 +697,7 @@ def test_extractall_pathlike_name(self): with os_helper.temp_dir(DIR), \ tarfile.open(tarname, encoding="iso8859-1") as tar: directories = [t for t in tar if t.isdir()] - tar.extractall(DIR, directories) + tar.extractall(DIR, directories, filter='fully_trusted') for tarinfo in directories: path = DIR / tarinfo.name self.assertEqual(os.path.getmtime(path), tarinfo.mtime) @@ -699,7 +708,7 @@ def test_extract_pathlike_name(self): with os_helper.temp_dir(DIR), \ tarfile.open(tarname, encoding="iso8859-1") as tar: tarinfo = tar.getmember(dirtype) - tar.extract(tarinfo, path=DIR) + tar.extract(tarinfo, path=DIR, filter='fully_trusted') extracted = DIR / dirtype self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) @@ -1067,7 +1076,7 @@ class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): # an all platforms, and after that a test that will work only on # platforms/filesystems that prove to support sparse files. def _test_sparse_file(self, name): - self.tar.extract(name, TEMPDIR) + self.tar.extract(name, TEMPDIR, filter='data') filename = os.path.join(TEMPDIR, name) with open(filename, "rb") as fobj: data = fobj.read() @@ -1434,7 +1443,8 @@ def test_extractall_symlinks(self): with tarfile.open(temparchive, errorlevel=2) as tar: # this should not raise OSError: [Errno 17] File exists try: - tar.extractall(path=tempdir) + tar.extractall(path=tempdir, + filter='fully_trusted') except OSError: self.fail("extractall failed with symlinked files") finally: @@ -2539,6 +2549,17 @@ def make_simple_tarfile(self, tar_name): for tardata in files: tf.add(tardata, arcname=os.path.basename(tardata)) + def make_evil_tarfile(self, tar_name): + files = [support.findfile('tokenize_tests.txt')] + self.addCleanup(os_helper.unlink, tar_name) + with tarfile.open(tar_name, 'w') as tf: + benign = tarfile.TarInfo('empty') + tf.addfile(benign, fileobj=io.BytesIO(b'')) + evil = tarfile.TarInfo('exfiltration') + evil.type = tarfile.LNKTYPE + evil.linkname = '/etc/passwd' + tf.addfile(evil) + def test_bad_use(self): rc, out, err = self.tarfilecmd_failure() self.assertEqual(out, b'') @@ -2695,6 +2716,20 @@ def test_extract_command_verbose(self): finally: os_helper.rmtree(tarextdir) + def test_extract_command_filter(self): + self.make_evil_tarfile(tmpname) + try: + with os_helper.temp_cwd(tarextdir): + self.tarfilecmd_failure('-e', tmpname, + '--filter', 'data') + out = self.tarfilecmd('-e', tmpname, + '-v', + '--filter', 'fully_trusted', + PYTHONIOENCODING='utf-8') + self.assertIn(b' file is extracted.', out) + finally: + os_helper.rmtree(tarextdir) + def test_extract_command_different_directory(self): self.make_simple_tarfile(tmpname) try: @@ -2778,7 +2813,7 @@ class LinkEmulationTest(ReadTest, unittest.TestCase): # symbolic or hard links tarfile tries to extract these types of members # as the regular files they point to. def _test_link_extraction(self, name): - self.tar.extract(name, TEMPDIR) + self.tar.extract(name, TEMPDIR, filter='fully_trusted') with open(os.path.join(TEMPDIR, name), "rb") as f: data = f.read() self.assertEqual(sha256sum(data), sha256_regtype) @@ -2910,8 +2945,10 @@ def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, mock_chown): with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, filename_2): - tarfl.extract(filename_1, TEMPDIR, numeric_owner=True) - tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True) + tarfl.extract(filename_1, TEMPDIR, numeric_owner=True, + filter='fully_trusted') + tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True, + filter='fully_trusted') # convert to filesystem paths f_filename_1 = os.path.join(TEMPDIR, filename_1) @@ -2929,7 +2966,8 @@ def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, mock_chown): with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, filename_2): - tarfl.extractall(TEMPDIR, numeric_owner=True) + tarfl.extractall(TEMPDIR, numeric_owner=True, + filter='fully_trusted') # convert to filesystem paths f_filename_1 = os.path.join(TEMPDIR, filename_1) @@ -2954,7 +2992,8 @@ def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, mock_chown): with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): - tarfl.extract(filename_1, TEMPDIR, numeric_owner=False) + tarfl.extract(filename_1, TEMPDIR, numeric_owner=False, + filter='fully_trusted') # convert to filesystem paths f_filename_1 = os.path.join(TEMPDIR, filename_1) @@ -2968,6 +3007,685 @@ def test_keyword_only(self, mock_geteuid): tarfl.extract, filename_1, TEMPDIR, False, True) +class ReplaceTests(ReadTest, unittest.TestCase): + def test_replace_name(self): + member = self.tar.getmember('ustar/regtype') + replaced = member.replace(name='misc/other') + self.assertEqual(replaced.name, 'misc/other') + self.assertEqual(member.name, 'ustar/regtype') + self.assertEqual(self.tar.getmember('ustar/regtype').name, + 'ustar/regtype') + + def test_replace_deep(self): + member = self.tar.getmember('pax/regtype1') + replaced = member.replace() + replaced.pax_headers['gname'] = 'not-bar' + self.assertEqual(member.pax_headers['gname'], 'bar') + self.assertEqual( + self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar') + + def test_replace_shallow(self): + member = self.tar.getmember('pax/regtype1') + replaced = member.replace(deep=False) + replaced.pax_headers['gname'] = 'not-bar' + self.assertEqual(member.pax_headers['gname'], 'not-bar') + self.assertEqual( + self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar') + + def test_replace_all(self): + member = self.tar.getmember('ustar/regtype') + for attr_name in ('name', 'mtime', 'mode', 'linkname', + 'uid', 'gid', 'uname', 'gname'): + with self.subTest(attr_name=attr_name): + replaced = member.replace(**{attr_name: None}) + self.assertEqual(getattr(replaced, attr_name), None) + self.assertNotEqual(getattr(member, attr_name), None) + + def test_replace_internal(self): + member = self.tar.getmember('ustar/regtype') + with self.assertRaises(TypeError): + member.replace(offset=123456789) + + +class NoneInfoExtractTests(ReadTest): + # These mainly check that all kinds of members are extracted successfully + # if some metadata is None. + # Some of the methods do additional spot checks. + + # We also test that the default filters can deal with None. + + extraction_filter = None + + @classmethod + def setUpClass(cls): + tar = tarfile.open(tarname, mode='r', encoding="iso8859-1") + cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_control" + tar.errorlevel = 0 + with ExitStack() as cm: + if cls.extraction_filter is None: + cm.enter_context(warnings.catch_warnings( + category=DeprecationWarning)) + tar.extractall(cls.control_dir, filter=cls.extraction_filter) + tar.close() + cls.control_paths = set( + p.relative_to(cls.control_dir) + for p in pathlib.Path(cls.control_dir).glob('**/*')) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.control_dir) + + def check_files_present(self, directory): + got_paths = set( + p.relative_to(directory) + for p in pathlib.Path(directory).glob('**/*')) + self.assertEqual(self.control_paths, got_paths) + + @contextmanager + def extract_with_none(self, *attr_names): + DIR = pathlib.Path(TEMPDIR) / ("extractall_none_" + + '_'.join(attr_names)) + self.tar.errorlevel = 0 + for member in self.tar.getmembers(): + for attr_name in attr_names: + setattr(member, attr_name, None) + with os_helper.temp_dir(DIR): + self.tar.extractall(DIR, filter='fully_trusted') + self.check_files_present(DIR) + yield DIR + + def test_extractall_none_mtime(self): + # mtimes of extracted files should be later than 'now' -- the mtime + # of a previously created directory. + now = pathlib.Path(TEMPDIR).stat().st_mtime + with self.extract_with_none('mtime') as DIR: + for path in pathlib.Path(DIR).glob('**/*'): + with self.subTest(path=path): + self.assertGreaterEqual(path.stat().st_mtime, now) + + def test_extractall_none_mode(self): + # modes of directories and regular files should match the mode + # of a "normally" created directory or regular file + dir_mode = pathlib.Path(TEMPDIR).stat().st_mode + regular_file = pathlib.Path(TEMPDIR) / 'regular_file' + regular_file.write_text('') + regular_file_mode = regular_file.stat().st_mode + with self.extract_with_none('mode') as DIR: + for path in pathlib.Path(DIR).glob('**/*'): + with self.subTest(path=path): + if path.is_dir(): + self.assertEqual(path.stat().st_mode, dir_mode) + elif path.is_file(): + self.assertEqual(path.stat().st_mode, + regular_file_mode) + + def test_extractall_none_uid(self): + with self.extract_with_none('uid') as DIR: + pass + + def test_extractall_none_gid(self): + with self.extract_with_none('gid') as DIR: + pass + + def test_extractall_none_uname(self): + with self.extract_with_none('uname') as DIR: + pass + + def test_extractall_none_gname(self): + with self.extract_with_none('gname') as DIR: + pass + + def test_extractall_none_ownership(self): + with self.extract_with_none('uid', 'gid', 'uname', 'gname') as DIR: + pass + +class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase): + extraction_filter = 'data' + +class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests, + unittest.TestCase): + extraction_filter = 'fully_trusted' + +class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase): + extraction_filter = 'tar' + +class NoneInfoExtractTests_LegacyWarning(NoneInfoExtractTests, + unittest.TestCase): + extraction_filter = None + +class NoneInfoTests_Misc(unittest.TestCase): + def test_add(self): + # When addfile() encounters None metadata, it raises a ValueError + bio = io.BytesIO() + for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT, + tarfile.PAX_FORMAT): + with self.subTest(tarformat=tarformat): + tar = tarfile.open(fileobj=bio, mode='w', format=tarformat) + tarinfo = tar.gettarinfo(tarname) + tar.addfile(tarinfo) + for attr_name in ('mtime', 'mode', 'uid', 'gid', + 'uname', 'gname'): + with self.subTest(attr_name=attr_name): + replaced = tarinfo.replace(**{attr_name: None}) + with self.assertRaisesRegex(ValueError, + f"{attr_name}"): + tar.addfile(replaced) + + def test_list(self): + # Change some metadata to None, then compare list() output + # word-for-word. We want is list() to not raise, and to only change + # printout for the affected piece of metadata. + for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'}, + {'uname'}, {'gname'}, + {'uid', 'uname'}, {'gid', 'gname'}): + with self.subTest(attr_names=attr_names): + tar = tarfile.open(tarname, encoding="iso8859-1") + tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') + with support.swap_attr(sys, 'stdout', tio_prev): + tar.list() + for member in tar.getmembers(): + for attr_name in attr_names: + setattr(member, attr_name, None) + tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') + with support.swap_attr(sys, 'stdout', tio_new): + tar.list() + for expected, got in zip(tio_prev.detach().getvalue().split(), + tio_new.detach().getvalue().split()): + if attr_names == {'mtime'} and expected == b'2003-01-06': + self.assertEqual(got, b'????-??-??') + elif attr_names == {'mtime'} and expected == b'00:19:43': + self.assertEqual(got, b'??:??:??') + elif attr_names == {'mode'} and re.match(b'.([r-][w-][x-]){3}', expected): + self.assertEqual(got, b'??????????') + elif attr_names == {'uname'} and expected.startswith((b'tarfile/', b'lars/', b'foo/')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_group, exp_group) + self.assertRegex(got_user, b'[0-9]+') + elif attr_names == {'gname'} and expected.endswith((b'/tarfile', b'/users', b'/bar')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_user, exp_user) + self.assertRegex(got_group, b'[0-9]+') + elif attr_names == {'uid'} and expected.startswith((b'1000/')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_group, exp_group) + self.assertEqual(got_user, b'None') + elif attr_names == {'gid'} and expected.endswith((b'/100')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_user, exp_user) + self.assertEqual(got_group, b'None') + elif attr_names == {'uid', 'uname'} and expected.startswith((b'tarfile/', b'lars/', b'foo/', b'1000/')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_group, exp_group) + self.assertEqual(got_user, b'None') + elif attr_names == {'gname', 'gid'} and expected.endswith((b'/tarfile', b'/users', b'/bar', b'/100')): + exp_user, exp_group = expected.split(b'/') + got_user, got_group = got.split(b'/') + self.assertEqual(got_user, exp_user) + self.assertEqual(got_group, b'None') + else: + # In other cases the output should be the same + self.assertEqual(expected, got) + +def _filemode_to_int(mode): + str_mode = mode[1:] + result = ( + {'r': stat.S_IRUSR, '-': 0}[str_mode[0]] + | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]] + | {'x': stat.S_IXUSR, '-': 0, + 's': stat.S_IXUSR | stat.S_ISUID, + 'S': stat.S_ISUID}[str_mode[2]] + | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]] + | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]] + | {'x': stat.S_IXGRP, '-': 0, + 's': stat.S_IXGRP | stat.S_ISGID, + 'S': stat.S_ISGID}[str_mode[5]] + | {'r': stat.S_IROTH, '-': 0}[str_mode[6]] + | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]] + | {'x': stat.S_IXOTH, '-': 0, + 't': stat.S_IXOTH | stat.S_ISVTX, + 'T': stat.S_ISVTX}[str_mode[8]] + ) + # check we did this right + assert stat.filemode(result)[1:] == mode[1:] + + return result + +class TestArchiveMaker: + """Test helper to create a tar file + + Usage: + + with TestArchiveMaker() as t: + t.add('filename', ...) + + tar = t.open() + # `tar` is now a TarFile with 'filename' in it! + """ + def __init__(self): + self.bio = io.BytesIO() + + def __enter__(self): + self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio) + return self + + def __exit__(self, *exc): + self.tar_w.close() + self.contents = self.bio.getvalue() + self.bio = None + + def add(self, name, *, type=None, symlink_to=None, hardlink_to=None, + mode=None, **kwargs): + """Add a member to the test archive. Call within `with`.""" + name = str(name) + tarinfo = tarfile.TarInfo(name).replace(**kwargs) + if mode: + tarinfo.mode = _filemode_to_int(mode) + if symlink_to is not None: + type = tarfile.SYMTYPE + tarinfo.linkname = str(symlink_to) + if hardlink_to is not None: + type = tarfile.LNKTYPE + tarinfo.linkname = str(hardlink_to) + if name.endswith('/') and type is None: + type = tarfile.DIRTYPE + if type is not None: + tarinfo.type = type + if tarinfo.isreg(): + fileobj = io.BytesIO(bytes(tarinfo.size)) + else: + fileobj = None + self.tar_w.addfile(tarinfo, fileobj) + + def open(self, **kwargs): + """Open the resulting archive as TarFile. Call after `with`.""" + bio = io.BytesIO(self.contents) + return tarfile.open(fileobj=bio, **kwargs) + +class TestExtractionFilters(unittest.TestCase): + + # A temporary directory for the extraction results. + # All files that "escape" the destination path should still end + # up in this directory. + outerdir = pathlib.Path(TEMPDIR) / 'outerdir' + + # The destination for the extraction, within `outerdir` + destdir = outerdir / 'dest' + + @contextmanager + def check_context(self, tar, filter): + """Extracts `tar` to `self.destdir` and allows checking it + + Ensures that all resulting files must be checked using `expect_file`, + except the destination directory itself and parent directories of + other files. + When checking directories, do so before their contents. + """ + with os_helper.temp_dir(self.outerdir): + tar.extractall(self.destdir, filter=filter) + self.expected_paths = set(self.outerdir.glob('**/*')) + self.expected_paths.discard(self.destdir) + yield + self.assertEqual(self.expected_paths, set()) + + def extractall_fail(self, tar, filter): + """Attempt extraction. Use in an assertRaises context.""" + with os_helper.temp_dir(self.outerdir): + tar.extractall(self.destdir, filter=filter) + + def expect_file(self, name, type=None, symlink_to=None, mode=None): + """Check a single file. See check_context.""" + # use normpath() rather than resolve() so we don't follow symlinks + path = pathlib.Path(os.path.normpath(self.destdir / name)) + self.assertIn(path, self.expected_paths) + self.expected_paths.remove(path) + if mode is not None: + got = stat.filemode(stat.S_IMODE(path.stat().st_mode)) + self.assertEqual(got, mode) + if type is None and isinstance(name, str) and name.endswith('/'): + type = tarfile.DIRTYPE + if symlink_to is not None: + self.assertEqual((self.destdir / name).readlink(), + pathlib.Path(symlink_to)) + elif type == tarfile.REGTYPE or type is None: + self.assertTrue(path.is_file()) + elif type == tarfile.DIRTYPE: + self.assertTrue(path.is_dir()) + elif type == tarfile.FIFOTYPE: + self.assertTrue(path.is_fifo()) + else: + raise NotImplementedError(type) + for parent in path.parents: + self.expected_paths.discard(parent) + + def test_benign_file(self): + with TestArchiveMaker() as chk: + chk.add('benign.txt') + for filter in 'fully_trusted', 'tar', 'data': + with self.check_context(chk.open(), filter): + self.expect_file('benign.txt') + + def test_absolute(self): + # Test handling a member with an absolute path + # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives + with TestArchiveMaker() as chk: + chk.add(self.outerdir / 'escaped.evil') + outerdir_stripped = str(self.outerdir).lstrip('/') + + # XXX Windows + + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('../escaped.evil') + + with self.check_context(chk.open(), 'tar'): + self.expect_file(f'{outerdir_stripped}/escaped.evil') + + with self.check_context(chk.open(), 'data'): + self.expect_file(f'{outerdir_stripped}/escaped.evil') + + + def test_absolute_with_extra_slash(self): + # Test handling a member with an absolute path + # Inspired by 'absolute2' in https://github.com/jwilk/traversal-archives + with TestArchiveMaker() as chk: + chk.add(f'/{self.outerdir}/escaped.evil') + chk.add(f'{os.sep}{self.outerdir}/escaped.eviler') + chk.add(f'/{os.sep}{self.outerdir}/escaped.evilest') + outerdir_stripped = str(self.outerdir).lstrip('/') + + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('../escaped.evil') + self.expect_file('../escaped.eviler') + self.expect_file('../escaped.evilest') + + for filter in 'tar', 'data': + with self.check_context(chk.open(), filter): + self.expect_file(f'{outerdir_stripped}/escaped.evil') + self.expect_file(f'{outerdir_stripped}/escaped.eviler') + self.expect_file(f'{outerdir_stripped}/escaped.evilest') + + def test_parent_symlink(self): + # Test interplaying symlinks + # Inspired by 'dirsymlink2a' in jwilk/traversal-archives + with TestArchiveMaker() as chk: + chk.add('current', symlink_to='.') + chk.add('parent', symlink_to='current/..') + chk.add('parent/evil') + + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('current', symlink_to='.') + self.expect_file('parent', symlink_to='current/..') + self.expect_file('../evil') + + with self.assertRaisesRegex( + tarfile.OutsideDestinationError, + f"'parent/evil' would be extracted to '{self.outerdir}/evil', " + + "which is outside the destination"): + self.extractall_fail(chk.open(), filter='tar') + + with self.assertRaisesRegex( + tarfile.LinkOutsideDestinationError, + f"'parent' would link to '{self.outerdir}', " + + "which is outside the destination"): + self.extractall_fail(chk.open(), filter='data') + + def test_parent_symlink2(self): + # Test interplaying symlinks + # Inspired by 'dirsymlink2b' in jwilk/traversal-archives + with TestArchiveMaker() as chk: + chk.add('current', symlink_to='.') + chk.add('current/parent', symlink_to='..') + chk.add('parent/evil') + + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('current', symlink_to='.') + self.expect_file('parent', symlink_to='..') + self.expect_file('../evil') + + with self.assertRaisesRegex( + tarfile.OutsideDestinationError, + f"'parent/evil' would be extracted to " + + f"'{self.outerdir}/evil', which is outside " + + "the destination"): + self.extractall_fail(chk.open(), filter='tar') + + with self.assertRaisesRegex( + tarfile.LinkOutsideDestinationError, + f"'current/parent' would link to '{self.outerdir}', " + + "which is outside the destination"): + self.extractall_fail(chk.open(), filter='data') + + def test_absolute_symlink(self): + # Test symlink to an absolute path + # Inspired by 'dirsymlink' in jwilk/traversal-archives + with TestArchiveMaker() as chk: + chk.add('parent', symlink_to=self.outerdir) + chk.add('parent/evil') + + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('parent', symlink_to=self.outerdir) + self.expect_file('../evil') + + with self.assertRaisesRegex( + tarfile.OutsideDestinationError, + f"'parent/evil' would be extracted to " + + f"'{self.outerdir}/evil', which is outside " + + "the destination"): + self.extractall_fail(chk.open(), filter='tar') + + with self.assertRaisesRegex( + tarfile.AbsoluteLinkError, + f"'parent' is a symlink to an absolute path"): + self.extractall_fail(chk.open(), filter='data') + + def test_sly_relative0(self): + # Inspired by 'relative0' in jwilk/traversal-archives + with TestArchiveMaker() as chk: + chk.add('../moo', symlink_to='..//tmp/moo') + + # XXX TarFile happens to fail creating a parent directory. + # This might be a bug, but fixing it hurts security. + # Note that GNU `tar` rejects '..' components, so you could + # argue this is an invalid archive and we just raise an + # incorrect type of exception. + # - @encukou, when adding extraction filters + with self.assertRaises(FileExistsError): + self.extractall_fail(chk.open(), filter='fully_trusted') + + for filter in 'tar', 'data': + with self.assertRaisesRegex( + tarfile.OutsideDestinationError, + f"'../moo' would be extracted to " + + f"'{self.outerdir}/moo', which is outside " + + "the destination"): + self.extractall_fail(chk.open(), filter=filter) + + def test_sly_relative2(self): + # Inspired by 'relative2' in jwilk/traversal-archives + with TestArchiveMaker() as chk: + chk.add('tmp/') + chk.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo') + + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('tmp', type=tarfile.DIRTYPE) + self.expect_file('../moo', symlink_to='tmp/../../tmp/moo') + + for filter in 'tar', 'data': + with self.assertRaisesRegex( + tarfile.OutsideDestinationError, + f"'tmp/../../moo' would be extracted to " + + f"'{self.outerdir}/moo', which is outside the " + + "destination"): + self.extractall_fail(chk.open(), filter=filter) + + def test_modes(self): + # Test how file modes are extracted + with TestArchiveMaker() as chk: + chk.add('all_bits', mode='?rwsrwsrwt') + chk.add('perm_bits', mode='?rwxrwxrwx') + chk.add('exec_group_other', mode='?rw-rwxrwx') + chk.add('read_group_only', mode='?---r-----') + chk.add('no_bits', mode='?---------') + chk.add('dir/', mode='?---rwsrwt', type=tarfile.DIRTYPE) + + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('all_bits', mode='?rwsrwsrwt') + self.expect_file('perm_bits', mode='?rwxrwxrwx') + self.expect_file('exec_group_other', mode='?rw-rwxrwx') + self.expect_file('read_group_only', mode='?---r-----') + self.expect_file('no_bits', mode='?---------') + self.expect_file('dir', type=tarfile.DIRTYPE, mode='?---rwsrwt') + + with self.check_context(chk.open(), 'tar'): + self.expect_file('all_bits', mode='?rwxr-xr-x') + self.expect_file('perm_bits', mode='?rwxr-xr-x') + self.expect_file('exec_group_other', mode='?rw-r-xr-x') + self.expect_file('read_group_only', mode='?---r-----') + self.expect_file('no_bits', mode='?---------') + self.expect_file('dir/', type=tarfile.DIRTYPE, mode='?---r-xr-x') + + with self.check_context(chk.open(), 'data'): + normal_dir_mode = stat.filemode(stat.S_IMODE( + self.outerdir.stat().st_mode)) + self.expect_file('all_bits', mode='?rwxr-xr-x') + self.expect_file('perm_bits', mode='?rwxr-xr-x') + self.expect_file('exec_group_other', mode='?rw-r--r--') + self.expect_file('read_group_only', mode='?rw-r-----') + self.expect_file('no_bits', mode='?rw-------') + self.expect_file('dir/', type=tarfile.DIRTYPE, mode=normal_dir_mode) + + def test_pipe(self): + with TestArchiveMaker() as chk: + chk.add('foo', type=tarfile.FIFOTYPE) + + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('foo', type=tarfile.FIFOTYPE) + + with self.check_context(chk.open(), 'tar'): + self.expect_file('foo', type=tarfile.FIFOTYPE) + + with self.assertRaisesRegex( + tarfile.SpecialFileError, + "'foo' is a special file"): + self.extractall_fail(chk.open(), filter='data') + + def test_special_files(self): + for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE: + tarinfo = tarfile.TarInfo('foo') + tarinfo.type = special_type + trusted = tarfile.fully_trusted_filter(tarinfo, '') + self.assertIs(trusted, tarinfo) + tar = tarfile.tar_filter(tarinfo, '') + self.assertEqual(tar.type, special_type) + with self.assertRaises(tarfile.SpecialFileError) as cm: + tarfile.data_filter(tarinfo, '') + self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo) + self.assertEqual(cm.exception.tarinfo.name, 'foo') + + def test_fully_trusted_filter(self): + with tarfile.TarFile.open(tarname) as tar: + for tarinfo in tar.getmembers(): + filtered = tarfile.fully_trusted_filter(tarinfo, '') + self.assertIs(filtered, tarinfo) + + def test_tar_filter(self): + with tarfile.TarFile.open(tarname) as tar: + for tarinfo in tar.getmembers(): + filtered = tarfile.tar_filter(tarinfo, '') + self.assertIs(filtered.name, tarinfo.name) + self.assertIs(filtered.type, tarinfo.type) + + def test_data_filter(self): + with tarfile.TarFile.open(tarname) as tar: + for tarinfo in tar.getmembers(): + try: + filtered = tarfile.data_filter(tarinfo, '') + except tarfile.FilterError: + continue + self.assertIs(filtered.name, tarinfo.name) + self.assertIs(filtered.type, tarinfo.type) + + def test_default_filter_warns(self): + """Ensure the default filter warns""" + with TestArchiveMaker() as chk: + chk.add('foo') + with warnings.catch_warnings(record=True) as warninglist: + with self.check_context(chk.open(), None): + self.expect_file('foo') + self.assertEqual([w.category for w in warninglist], + [DeprecationWarning]) + + def test_change_default_filter_on_instance(self): + tar = tarfile.TarFile(tarname, 'r') + def strict_filter(tarinfo, path): + if tarinfo.name == 'ustar/regtype': + return tarinfo + tar.extraction_filter = strict_filter + with self.check_context(tar, None): + self.expect_file('ustar/regtype') + + def test_change_default_filter_on_class(self): + def strict_filter(tarinfo, path): + if tarinfo.name == 'ustar/regtype': + return tarinfo + tar = tarfile.TarFile(tarname, 'r') + old_default = tarfile.TarFile.__dict__['extraction_filter'] + try: + tarfile.TarFile.extraction_filter = staticmethod(strict_filter) + with self.check_context(tar, None): + self.expect_file('ustar/regtype') + finally: + tarfile.TarFile.extraction_filter = old_default + + def test_custom_filter(self): + def custom_filter(tarinfo, path): + self.assertIs(path, self.destdir) + if tarinfo.name == 'move_this': + return tarinfo.replace(name='moved') + if tarinfo.name == 'ignore_this': + return None + return tarinfo + + with TestArchiveMaker() as chk: + chk.add('move_this') + chk.add('ignore_this') + chk.add('keep') + with self.check_context(chk.open(), custom_filter): + self.expect_file('moved') + self.expect_file('keep') + + def test_stateful_filter(self): + class StatefulFilter: + def __enter__(self): + self.num_files = 0 + return self + + def __call__(self, tarinfo, path): + try: + tarinfo = tarfile.data_filter(tarinfo, path) + except tarfile.FilterError: + return None + self.num_files += 1 + return tarinfo + + def __exit__(self, *exc_info): + self.done = True + + with TestArchiveMaker() as chk: + chk.add('good') + chk.add('bad', symlink_to='/') + chk.add('good') + with StatefulFilter() as custom_filter: + with self.check_context(chk.open(), custom_filter): + self.expect_file('good') + self.assertEqual(custom_filter.num_files, 2) + self.assertEqual(custom_filter.done, True) + + def setUpModule(): os_helper.unlink(TEMPDIR) os.makedirs(TEMPDIR) From 3b00cdf5e7a2d13d30b66f874dea12af103b9634 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Thu, 16 Feb 2023 15:46:26 +0100 Subject: [PATCH 02/13] Adjust tests for Windows (w/o symlinks) --- Lib/tarfile.py | 5 +- Lib/test/test_tarfile.py | 193 +++++++++++++++++++++------------------ 2 files changed, 108 insertions(+), 90 deletions(-) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 16ad9b77d279cd..65638322cb6d43 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -723,8 +723,9 @@ class FilterError(TarError): pass class AbsolutePathError(FilterError): - # XXX Windows only - pass + def __init__(self, tarinfo): + self.tarinfo = tarinfo + super().__init__(f'member {tarinfo.name!r} has an absolute path') class OutsideDestinationError(FilterError): def __init__(self, tarinfo, path): diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 1a22f2abfa43c3..c2fe8600d73257 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -20,6 +20,7 @@ from test import support from test.support import os_helper from test.support import script_helper +from test.support import warnings_helper # Check for our compression modules. try: @@ -3149,8 +3150,8 @@ class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests, class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase): extraction_filter = 'tar' -class NoneInfoExtractTests_LegacyWarning(NoneInfoExtractTests, - unittest.TestCase): +class NoneInfoExtractTests_Default(NoneInfoExtractTests, + unittest.TestCase): extraction_filter = None class NoneInfoTests_Misc(unittest.TestCase): @@ -3343,7 +3344,7 @@ def expect_file(self, name, type=None, symlink_to=None, mode=None): path = pathlib.Path(os.path.normpath(self.destdir / name)) self.assertIn(path, self.expected_paths) self.expected_paths.remove(path) - if mode is not None: + if mode is not None and os_helper.can_chmod(): got = stat.filemode(stat.S_IMODE(path.stat().st_mode)) self.assertEqual(got, mode) if type is None and isinstance(name, str) and name.endswith('/'): @@ -3376,37 +3377,25 @@ def test_absolute(self): chk.add(self.outerdir / 'escaped.evil') outerdir_stripped = str(self.outerdir).lstrip('/') - # XXX Windows - with self.check_context(chk.open(), 'fully_trusted'): self.expect_file('../escaped.evil') - with self.check_context(chk.open(), 'tar'): - self.expect_file(f'{outerdir_stripped}/escaped.evil') - - with self.check_context(chk.open(), 'data'): - self.expect_file(f'{outerdir_stripped}/escaped.evil') - - - def test_absolute_with_extra_slash(self): - # Test handling a member with an absolute path - # Inspired by 'absolute2' in https://github.com/jwilk/traversal-archives - with TestArchiveMaker() as chk: - chk.add(f'/{self.outerdir}/escaped.evil') - chk.add(f'{os.sep}{self.outerdir}/escaped.eviler') - chk.add(f'/{os.sep}{self.outerdir}/escaped.evilest') - outerdir_stripped = str(self.outerdir).lstrip('/') + if str(self.outerdir).startswith('/'): + # We strip leading slashes, as GNU tar does (without --absolute-filenames). + # This archive was made on a systems where absolute paths have leading slashes. - with self.check_context(chk.open(), 'fully_trusted'): - self.expect_file('../escaped.evil') - self.expect_file('../escaped.eviler') - self.expect_file('../escaped.evilest') + for filter in 'tar', 'data': + with self.check_context(chk.open(), filter): + self.expect_file(f'{outerdir_stripped}/escaped.evil') + else: + # This archive was made on a systems where absolute paths don't have leading slashes. + # So, we don't strip, but fail to unpack. - for filter in 'tar', 'data': - with self.check_context(chk.open(), filter): - self.expect_file(f'{outerdir_stripped}/escaped.evil') - self.expect_file(f'{outerdir_stripped}/escaped.eviler') - self.expect_file(f'{outerdir_stripped}/escaped.evilest') + for filter in 'tar', 'data': + with self.assertRaisesRegex( + tarfile.AbsolutePathError, + """['"].*escaped.evil['"] has an absolute path"""): + self.extractall_fail(chk.open(), 'tar') def test_parent_symlink(self): # Test interplaying symlinks @@ -3416,22 +3405,31 @@ def test_parent_symlink(self): chk.add('parent', symlink_to='current/..') chk.add('parent/evil') - with self.check_context(chk.open(), 'fully_trusted'): - self.expect_file('current', symlink_to='.') - self.expect_file('parent', symlink_to='current/..') - self.expect_file('../evil') + if os_helper.can_symlink(): + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('current', symlink_to='.') + self.expect_file('parent', symlink_to='current/..') + self.expect_file('../evil') - with self.assertRaisesRegex( - tarfile.OutsideDestinationError, - f"'parent/evil' would be extracted to '{self.outerdir}/evil', " - + "which is outside the destination"): - self.extractall_fail(chk.open(), filter='tar') + with self.assertRaisesRegex( + tarfile.OutsideDestinationError, + """'parent/evil' would be extracted to ['"].*evil['"], """ + + "which is outside the destination"): + self.extractall_fail(chk.open(), filter='tar') - with self.assertRaisesRegex( - tarfile.LinkOutsideDestinationError, - f"'parent' would link to '{self.outerdir}', " - + "which is outside the destination"): - self.extractall_fail(chk.open(), filter='data') + with self.assertRaisesRegex( + tarfile.LinkOutsideDestinationError, + """'parent' would link to ['"].*outerdir['"], """ + + "which is outside the destination"): + self.extractall_fail(chk.open(), filter='data') + + else: + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('parent/evil') + with self.check_context(chk.open(), 'tar'): + self.expect_file('parent/evil') + with self.check_context(chk.open(), 'data'): + self.expect_file('parent/evil') def test_parent_symlink2(self): # Test interplaying symlinks @@ -3441,21 +3439,31 @@ def test_parent_symlink2(self): chk.add('current/parent', symlink_to='..') chk.add('parent/evil') - with self.check_context(chk.open(), 'fully_trusted'): - self.expect_file('current', symlink_to='.') - self.expect_file('parent', symlink_to='..') - self.expect_file('../evil') + if os_helper.can_symlink(): + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('current', symlink_to='.') + self.expect_file('parent', symlink_to='..') + self.expect_file('../evil') + else: + with self.check_context(chk.open(), 'fully_trusted'): + self.expect_file('current/') + self.expect_file('parent/evil') - with self.assertRaisesRegex( - tarfile.OutsideDestinationError, - f"'parent/evil' would be extracted to " - + f"'{self.outerdir}/evil', which is outside " - + "the destination"): - self.extractall_fail(chk.open(), filter='tar') + if os_helper.can_symlink(): + with self.assertRaisesRegex( + tarfile.OutsideDestinationError, + "'parent/evil' would be extracted to " + + """['"].*evil['"], which is outside """ + + "the destination"): + self.extractall_fail(chk.open(), filter='tar') + else: + with self.check_context(chk.open(), 'tar'): + self.expect_file('current/') + self.expect_file('parent/evil') with self.assertRaisesRegex( tarfile.LinkOutsideDestinationError, - f"'current/parent' would link to '{self.outerdir}', " + """'current/parent' would link to ['"].*['"], """ + "which is outside the destination"): self.extractall_fail(chk.open(), filter='data') @@ -3467,15 +3475,22 @@ def test_absolute_symlink(self): chk.add('parent/evil') with self.check_context(chk.open(), 'fully_trusted'): - self.expect_file('parent', symlink_to=self.outerdir) - self.expect_file('../evil') + if os_helper.can_symlink(): + self.expect_file('parent', symlink_to=self.outerdir) + self.expect_file('../evil') + else: + self.expect_file('parent/evil') - with self.assertRaisesRegex( - tarfile.OutsideDestinationError, - f"'parent/evil' would be extracted to " - + f"'{self.outerdir}/evil', which is outside " - + "the destination"): - self.extractall_fail(chk.open(), filter='tar') + if os_helper.can_symlink(): + with self.assertRaisesRegex( + tarfile.OutsideDestinationError, + "'parent/evil' would be extracted to " + + """['"].*evil['"], which is outside """ + + "the destination"): + self.extractall_fail(chk.open(), filter='tar') + else: + with self.check_context(chk.open(), 'tar'): + self.expect_file('parent/evil') with self.assertRaisesRegex( tarfile.AbsoluteLinkError, @@ -3487,22 +3502,26 @@ def test_sly_relative0(self): with TestArchiveMaker() as chk: chk.add('../moo', symlink_to='..//tmp/moo') - # XXX TarFile happens to fail creating a parent directory. - # This might be a bug, but fixing it hurts security. - # Note that GNU `tar` rejects '..' components, so you could - # argue this is an invalid archive and we just raise an - # incorrect type of exception. - # - @encukou, when adding extraction filters - with self.assertRaises(FileExistsError): - self.extractall_fail(chk.open(), filter='fully_trusted') + try: + with self.check_context(chk.open(), filter='fully_trusted'): + if os_helper.can_symlink(): + self.expect_file('../moo', symlink_to='..//tmp/moo') + except FileExistsError: + # XXX TarFile happens to fail creating a parent directory. + # This might be a bug, but fixing it hurts security. + # Note that GNU `tar` rejects '..' components, so you could + # argue this is an invalid archive and we just raise an + # incorrect type of exception. + # - @encukou, when adding extraction filters + pass - for filter in 'tar', 'data': - with self.assertRaisesRegex( - tarfile.OutsideDestinationError, - f"'../moo' would be extracted to " - + f"'{self.outerdir}/moo', which is outside " - + "the destination"): - self.extractall_fail(chk.open(), filter=filter) + for filter in 'tar', 'data': + with self.assertRaisesRegex( + tarfile.OutsideDestinationError, + f"'../moo' would be extracted to " + + f"'.*moo', which is outside " + + "the destination"): + self.extractall_fail(chk.open(), filter=filter) def test_sly_relative2(self): # Inspired by 'relative2' in jwilk/traversal-archives @@ -3512,13 +3531,14 @@ def test_sly_relative2(self): with self.check_context(chk.open(), 'fully_trusted'): self.expect_file('tmp', type=tarfile.DIRTYPE) - self.expect_file('../moo', symlink_to='tmp/../../tmp/moo') + if os_helper.can_symlink(): + self.expect_file('../moo', symlink_to='tmp/../../tmp/moo') for filter in 'tar', 'data': with self.assertRaisesRegex( tarfile.OutsideDestinationError, - f"'tmp/../../moo' would be extracted to " - + f"'{self.outerdir}/moo', which is outside the " + "'tmp/../../moo' would be extracted to " + + """['"].*moo['"], which is outside the """ + "destination"): self.extractall_fail(chk.open(), filter=filter) @@ -3562,11 +3582,10 @@ def test_pipe(self): with TestArchiveMaker() as chk: chk.add('foo', type=tarfile.FIFOTYPE) - with self.check_context(chk.open(), 'fully_trusted'): - self.expect_file('foo', type=tarfile.FIFOTYPE) - - with self.check_context(chk.open(), 'tar'): - self.expect_file('foo', type=tarfile.FIFOTYPE) + for filter in 'fully_trusted', 'tar': + with self.check_context(chk.open(), filter): + if hasattr(os, 'mkfifo'): + self.expect_file('foo', type=tarfile.FIFOTYPE) with self.assertRaisesRegex( tarfile.SpecialFileError, @@ -3613,11 +3632,9 @@ def test_default_filter_warns(self): """Ensure the default filter warns""" with TestArchiveMaker() as chk: chk.add('foo') - with warnings.catch_warnings(record=True) as warninglist: + with warnings_helper.check_warnings(('Python 3.14', DeprecationWarning)): with self.check_context(chk.open(), None): self.expect_file('foo') - self.assertEqual([w.category for w in warninglist], - [DeprecationWarning]) def test_change_default_filter_on_instance(self): tar = tarfile.TarFile(tarname, 'r') From 75a1e2c076dc26491abb377441d71277822def91 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Mon, 20 Feb 2023 16:05:10 +0100 Subject: [PATCH 03/13] Tidy up tests --- Lib/test/test_tarfile.py | 426 +++++++++++++++++++++++---------------- 1 file changed, 253 insertions(+), 173 deletions(-) diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index c2fe8600d73257..d871259d06055e 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -2554,12 +2554,10 @@ def make_evil_tarfile(self, tar_name): files = [support.findfile('tokenize_tests.txt')] self.addCleanup(os_helper.unlink, tar_name) with tarfile.open(tar_name, 'w') as tf: - benign = tarfile.TarInfo('empty') + benign = tarfile.TarInfo('benign') tf.addfile(benign, fileobj=io.BytesIO(b'')) - evil = tarfile.TarInfo('exfiltration') - evil.type = tarfile.LNKTYPE - evil.linkname = '/etc/passwd' - tf.addfile(evil) + evil = tarfile.TarInfo('../evil') + tf.addfile(evil, fileobj=io.BytesIO(b'')) def test_bad_use(self): rc, out, err = self.tarfilecmd_failure() @@ -2719,9 +2717,14 @@ def test_extract_command_verbose(self): def test_extract_command_filter(self): self.make_evil_tarfile(tmpname) + # Make an inner directory, so the member named '../evil' + # is still extracted into `tarextdir` + destdir = os.path.join(tarextdir, 'dest') + os.mkdir(tarextdir) try: - with os_helper.temp_cwd(tarextdir): + with os_helper.temp_cwd(destdir): self.tarfilecmd_failure('-e', tmpname, + '-v', '--filter', 'data') out = self.tarfilecmd('-e', tmpname, '-v', @@ -3060,12 +3063,12 @@ class NoneInfoExtractTests(ReadTest): @classmethod def setUpClass(cls): tar = tarfile.open(tarname, mode='r', encoding="iso8859-1") - cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_control" + cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl" tar.errorlevel = 0 with ExitStack() as cm: if cls.extraction_filter is None: cm.enter_context(warnings.catch_warnings( - category=DeprecationWarning)) + action="ignore", category=DeprecationWarning)) tar.extractall(cls.control_dir, filter=cls.extraction_filter) tar.close() cls.control_paths = set( @@ -3084,8 +3087,7 @@ def check_files_present(self, directory): @contextmanager def extract_with_none(self, *attr_names): - DIR = pathlib.Path(TEMPDIR) / ("extractall_none_" - + '_'.join(attr_names)) + DIR = pathlib.Path(TEMPDIR) / "extractall_none" self.tar.errorlevel = 0 for member in self.tar.getmembers(): for attr_name in attr_names: @@ -3102,7 +3104,14 @@ def test_extractall_none_mtime(self): with self.extract_with_none('mtime') as DIR: for path in pathlib.Path(DIR).glob('**/*'): with self.subTest(path=path): - self.assertGreaterEqual(path.stat().st_mtime, now) + try: + mtime = path.stat().st_mtime + except OSError: + # Some systems can't stat symlinks, ignore those + if not path.is_symlink(): + raise + else: + self.assertGreaterEqual(path.stat().st_mtime, now) def test_extractall_none_mode(self): # modes of directories and regular files should match the mode @@ -3174,8 +3183,9 @@ def test_add(self): def test_list(self): # Change some metadata to None, then compare list() output - # word-for-word. We want is list() to not raise, and to only change + # word-for-word. We want list() to not raise, and to only change # printout for the affected piece of metadata. + # (n.b.: some contents of the test archive are hardcoded.) for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'}, {'uname'}, {'gname'}, {'uid', 'uname'}, {'gid', 'gname'}): @@ -3192,23 +3202,27 @@ def test_list(self): tar.list() for expected, got in zip(tio_prev.detach().getvalue().split(), tio_new.detach().getvalue().split()): - if attr_names == {'mtime'} and expected == b'2003-01-06': + if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected): self.assertEqual(got, b'????-??-??') - elif attr_names == {'mtime'} and expected == b'00:19:43': + elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected): self.assertEqual(got, b'??:??:??') - elif attr_names == {'mode'} and re.match(b'.([r-][w-][x-]){3}', expected): + elif attr_names == {'mode'} and re.match( + rb'.([r-][w-][x-]){3}', expected): self.assertEqual(got, b'??????????') - elif attr_names == {'uname'} and expected.startswith((b'tarfile/', b'lars/', b'foo/')): + elif attr_names == {'uname'} and expected.startswith( + (b'tarfile/', b'lars/', b'foo/')): exp_user, exp_group = expected.split(b'/') got_user, got_group = got.split(b'/') self.assertEqual(got_group, exp_group) self.assertRegex(got_user, b'[0-9]+') - elif attr_names == {'gname'} and expected.endswith((b'/tarfile', b'/users', b'/bar')): + elif attr_names == {'gname'} and expected.endswith( + (b'/tarfile', b'/users', b'/bar')): exp_user, exp_group = expected.split(b'/') got_user, got_group = got.split(b'/') self.assertEqual(got_user, exp_user) self.assertRegex(got_group, b'[0-9]+') - elif attr_names == {'uid'} and expected.startswith((b'1000/')): + elif attr_names == {'uid'} and expected.startswith( + (b'1000/')): exp_user, exp_group = expected.split(b'/') got_user, got_group = got.split(b'/') self.assertEqual(got_group, exp_group) @@ -3218,12 +3232,14 @@ def test_list(self): got_user, got_group = got.split(b'/') self.assertEqual(got_user, exp_user) self.assertEqual(got_group, b'None') - elif attr_names == {'uid', 'uname'} and expected.startswith((b'tarfile/', b'lars/', b'foo/', b'1000/')): + elif attr_names == {'uid', 'uname'} and expected.startswith( + (b'tarfile/', b'lars/', b'foo/', b'1000/')): exp_user, exp_group = expected.split(b'/') got_user, got_group = got.split(b'/') self.assertEqual(got_group, exp_group) self.assertEqual(got_user, b'None') - elif attr_names == {'gname', 'gid'} and expected.endswith((b'/tarfile', b'/users', b'/bar', b'/100')): + elif attr_names == {'gname', 'gid'} and expected.endswith( + (b'/tarfile', b'/users', b'/bar', b'/100')): exp_user, exp_group = expected.split(b'/') got_user, got_group = got.split(b'/') self.assertEqual(got_user, exp_user) @@ -3233,39 +3249,43 @@ def test_list(self): self.assertEqual(expected, got) def _filemode_to_int(mode): + """Inverse of `stat.filemode` (for permission bits) + + Using mode strings rather than numbers makes the later tests more readable. + """ str_mode = mode[1:] result = ( - {'r': stat.S_IRUSR, '-': 0}[str_mode[0]] + {'r': stat.S_IRUSR, '-': 0}[str_mode[0]] | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]] | {'x': stat.S_IXUSR, '-': 0, - 's': stat.S_IXUSR | stat.S_ISUID, - 'S': stat.S_ISUID}[str_mode[2]] + 's': stat.S_IXUSR | stat.S_ISUID, + 'S': stat.S_ISUID}[str_mode[2]] | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]] | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]] | {'x': stat.S_IXGRP, '-': 0, - 's': stat.S_IXGRP | stat.S_ISGID, - 'S': stat.S_ISGID}[str_mode[5]] + 's': stat.S_IXGRP | stat.S_ISGID, + 'S': stat.S_ISGID}[str_mode[5]] | {'r': stat.S_IROTH, '-': 0}[str_mode[6]] | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]] | {'x': stat.S_IXOTH, '-': 0, - 't': stat.S_IXOTH | stat.S_ISVTX, - 'T': stat.S_ISVTX}[str_mode[8]] + 't': stat.S_IXOTH | stat.S_ISVTX, + 'T': stat.S_ISVTX}[str_mode[8]] ) # check we did this right assert stat.filemode(result)[1:] == mode[1:] return result -class TestArchiveMaker: - """Test helper to create a tar file +class ArchiveMaker: + """Helper to create a tar file with specific contents Usage: - with TestArchiveMaker() as t: + with ArchiveMaker() as t: t.add('filename', ...) - tar = t.open() - # `tar` is now a TarFile with 'filename' in it! + with t.open() as tar: + ... # `tar` is now a TarFile with 'filename' in it! """ def __init__(self): self.bio = io.BytesIO() @@ -3319,27 +3339,35 @@ class TestExtractionFilters(unittest.TestCase): @contextmanager def check_context(self, tar, filter): - """Extracts `tar` to `self.destdir` and allows checking it + """Extracts `tar` to `self.destdir` and allows checking the result + + If an error occurs, it must be checked using `expect_exception` - Ensures that all resulting files must be checked using `expect_file`, + Otherwise, all resulting files must be checked using `expect_file`, except the destination directory itself and parent directories of other files. When checking directories, do so before their contents. """ with os_helper.temp_dir(self.outerdir): - tar.extractall(self.destdir, filter=filter) - self.expected_paths = set(self.outerdir.glob('**/*')) - self.expected_paths.discard(self.destdir) + try: + tar.extractall(self.destdir, filter=filter) + except Exception as exc: + self.raised_exception = exc + self.expected_paths = set() + else: + self.raised_exception = None + self.expected_paths = set(self.outerdir.glob('**/*')) + self.expected_paths.discard(self.destdir) yield + tar.close() + if self.raised_exception: + raise self.raised_exception self.assertEqual(self.expected_paths, set()) - def extractall_fail(self, tar, filter): - """Attempt extraction. Use in an assertRaises context.""" - with os_helper.temp_dir(self.outerdir): - tar.extractall(self.destdir, filter=filter) - def expect_file(self, name, type=None, symlink_to=None, mode=None): """Check a single file. See check_context.""" + if self.raised_exception: + raise self.raised_exception # use normpath() rather than resolve() so we don't follow symlinks path = pathlib.Path(os.path.normpath(self.destdir / name)) self.assertIn(path, self.expected_paths) @@ -3350,8 +3378,13 @@ def expect_file(self, name, type=None, symlink_to=None, mode=None): if type is None and isinstance(name, str) and name.endswith('/'): type = tarfile.DIRTYPE if symlink_to is not None: - self.assertEqual((self.destdir / name).readlink(), - pathlib.Path(symlink_to)) + got = (self.destdir / name).readlink() + expected = pathlib.Path(symlink_to) + # The symlink might be the same (textually) as what we expect, + # but some systems change the link to an equivalent path, so + # we fall back to samefile(). + if expected != got: + self.assertTrue(got.samefile(expected)) elif type == tarfile.REGTYPE or type is None: self.assertTrue(path.is_file()) elif type == tarfile.DIRTYPE: @@ -3363,196 +3396,214 @@ def expect_file(self, name, type=None, symlink_to=None, mode=None): for parent in path.parents: self.expected_paths.discard(parent) + def expect_exception(self, exc_type, message_re='.'): + with self.assertRaisesRegex(exc_type, message_re): + if self.raised_exception is not None: + raise self.raised_exception + self.raised_exception = None + def test_benign_file(self): - with TestArchiveMaker() as chk: - chk.add('benign.txt') + with ArchiveMaker() as arc: + arc.add('benign.txt') for filter in 'fully_trusted', 'tar', 'data': - with self.check_context(chk.open(), filter): + with self.check_context(arc.open(), filter): self.expect_file('benign.txt') def test_absolute(self): # Test handling a member with an absolute path # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives - with TestArchiveMaker() as chk: - chk.add(self.outerdir / 'escaped.evil') - outerdir_stripped = str(self.outerdir).lstrip('/') + with ArchiveMaker() as arc: + arc.add(self.outerdir / 'escaped.evil') - with self.check_context(chk.open(), 'fully_trusted'): + with self.check_context(arc.open(), 'fully_trusted'): self.expect_file('../escaped.evil') - if str(self.outerdir).startswith('/'): - # We strip leading slashes, as GNU tar does (without --absolute-filenames). - # This archive was made on a systems where absolute paths have leading slashes. - - for filter in 'tar', 'data': - with self.check_context(chk.open(), filter): + for filter in 'tar', 'data': + with self.check_context(arc.open(), filter): + if str(self.outerdir).startswith('/'): + # We strip leading slashes, as e.g. GNU tar does + # (without --absolute-filenames). + outerdir_stripped = str(self.outerdir).lstrip('/') self.expect_file(f'{outerdir_stripped}/escaped.evil') - else: - # This archive was made on a systems where absolute paths don't have leading slashes. - # So, we don't strip, but fail to unpack. - - for filter in 'tar', 'data': - with self.assertRaisesRegex( + else: + # On this system, absolute paths don't have leading + # slashes. + # So, there's nothing to strip. We refuse to unpack + # to an absolute path, nonetheless. + self.expect_exception( tarfile.AbsolutePathError, - """['"].*escaped.evil['"] has an absolute path"""): - self.extractall_fail(chk.open(), 'tar') + """['"].*escaped.evil['"] has an absolute path""") def test_parent_symlink(self): # Test interplaying symlinks # Inspired by 'dirsymlink2a' in jwilk/traversal-archives - with TestArchiveMaker() as chk: - chk.add('current', symlink_to='.') - chk.add('parent', symlink_to='current/..') - chk.add('parent/evil') + with ArchiveMaker() as arc: + arc.add('current', symlink_to='.') + arc.add('parent', symlink_to='current/..') + arc.add('parent/evil') if os_helper.can_symlink(): - with self.check_context(chk.open(), 'fully_trusted'): + with self.check_context(arc.open(), 'fully_trusted'): + if self.raised_exception is not None: + # Windows will refuse to create a file that's a symlink to itself + # (and tarfile doesn't swallow that exception) + self.expect_exception(FileExistsError) + # The other cases will fail with this error too. + # Skip the rest of this test. + return + else: self.expect_file('current', symlink_to='.') self.expect_file('parent', symlink_to='current/..') self.expect_file('../evil') - with self.assertRaisesRegex( + with self.check_context(arc.open(), 'tar'): + self.expect_exception( tarfile.OutsideDestinationError, """'parent/evil' would be extracted to ['"].*evil['"], """ - + "which is outside the destination"): - self.extractall_fail(chk.open(), filter='tar') + + "which is outside the destination") - with self.assertRaisesRegex( + with self.check_context(arc.open(), 'data'): + self.expect_exception( tarfile.LinkOutsideDestinationError, """'parent' would link to ['"].*outerdir['"], """ - + "which is outside the destination"): - self.extractall_fail(chk.open(), filter='data') + + "which is outside the destination") else: - with self.check_context(chk.open(), 'fully_trusted'): + # No symlink support. The symlinks are ignored. + with self.check_context(arc.open(), 'fully_trusted'): self.expect_file('parent/evil') - with self.check_context(chk.open(), 'tar'): + with self.check_context(arc.open(), 'tar'): self.expect_file('parent/evil') - with self.check_context(chk.open(), 'data'): + with self.check_context(arc.open(), 'data'): self.expect_file('parent/evil') def test_parent_symlink2(self): # Test interplaying symlinks # Inspired by 'dirsymlink2b' in jwilk/traversal-archives - with TestArchiveMaker() as chk: - chk.add('current', symlink_to='.') - chk.add('current/parent', symlink_to='..') - chk.add('parent/evil') + with ArchiveMaker() as arc: + arc.add('current', symlink_to='.') + arc.add('current/parent', symlink_to='..') + arc.add('parent/evil') - if os_helper.can_symlink(): - with self.check_context(chk.open(), 'fully_trusted'): + with self.check_context(arc.open(), 'fully_trusted'): + if os_helper.can_symlink(): self.expect_file('current', symlink_to='.') self.expect_file('parent', symlink_to='..') self.expect_file('../evil') - else: - with self.check_context(chk.open(), 'fully_trusted'): + else: self.expect_file('current/') self.expect_file('parent/evil') - if os_helper.can_symlink(): - with self.assertRaisesRegex( + with self.check_context(arc.open(), 'tar'): + if os_helper.can_symlink(): + self.expect_exception( tarfile.OutsideDestinationError, "'parent/evil' would be extracted to " + """['"].*evil['"], which is outside """ - + "the destination"): - self.extractall_fail(chk.open(), filter='tar') - else: - with self.check_context(chk.open(), 'tar'): + + "the destination") + else: self.expect_file('current/') self.expect_file('parent/evil') - with self.assertRaisesRegex( + with self.check_context(arc.open(), 'data'): + self.expect_exception( tarfile.LinkOutsideDestinationError, """'current/parent' would link to ['"].*['"], """ - + "which is outside the destination"): - self.extractall_fail(chk.open(), filter='data') + + "which is outside the destination") def test_absolute_symlink(self): # Test symlink to an absolute path # Inspired by 'dirsymlink' in jwilk/traversal-archives - with TestArchiveMaker() as chk: - chk.add('parent', symlink_to=self.outerdir) - chk.add('parent/evil') + with ArchiveMaker() as arc: + arc.add('parent', symlink_to=self.outerdir) + arc.add('parent/evil') - with self.check_context(chk.open(), 'fully_trusted'): + with self.check_context(arc.open(), 'fully_trusted'): if os_helper.can_symlink(): self.expect_file('parent', symlink_to=self.outerdir) self.expect_file('../evil') else: self.expect_file('parent/evil') - if os_helper.can_symlink(): - with self.assertRaisesRegex( + with self.check_context(arc.open(), 'tar'): + if os_helper.can_symlink(): + self.expect_exception( tarfile.OutsideDestinationError, "'parent/evil' would be extracted to " + """['"].*evil['"], which is outside """ - + "the destination"): - self.extractall_fail(chk.open(), filter='tar') - else: - with self.check_context(chk.open(), 'tar'): + + "the destination") + else: self.expect_file('parent/evil') - with self.assertRaisesRegex( - tarfile.AbsoluteLinkError, - f"'parent' is a symlink to an absolute path"): - self.extractall_fail(chk.open(), filter='data') + with self.check_context(arc.open(), 'data'): + self.expect_exception( + tarfile.AbsoluteLinkError, + f"'parent' is a symlink to an absolute path") def test_sly_relative0(self): # Inspired by 'relative0' in jwilk/traversal-archives - with TestArchiveMaker() as chk: - chk.add('../moo', symlink_to='..//tmp/moo') + with ArchiveMaker() as arc: + arc.add('../moo', symlink_to='..//tmp/moo') try: - with self.check_context(chk.open(), filter='fully_trusted'): + with self.check_context(arc.open(), filter='fully_trusted'): if os_helper.can_symlink(): - self.expect_file('../moo', symlink_to='..//tmp/moo') + if isinstance(self.raised_exception, FileExistsError): + # XXX TarFile happens to fail creating a parent + # directory. + # This might be a bug, but fixing it would hurt + # security. + # Note that e.g. GNU `tar` rejects '..' components, + # so you could argue this is an invalid archive and we + # just raise an bad type of exception. + self.expect_exception(FileExistsError) + else: + self.expect_file('../moo', symlink_to='..//tmp/moo') + else: + # The symlink can't be extracted and is ignored + pass except FileExistsError: - # XXX TarFile happens to fail creating a parent directory. - # This might be a bug, but fixing it hurts security. - # Note that GNU `tar` rejects '..' components, so you could - # argue this is an invalid archive and we just raise an - # incorrect type of exception. - # - @encukou, when adding extraction filters pass - for filter in 'tar', 'data': - with self.assertRaisesRegex( - tarfile.OutsideDestinationError, - f"'../moo' would be extracted to " - + f"'.*moo', which is outside " - + "the destination"): - self.extractall_fail(chk.open(), filter=filter) + for filter in 'tar', 'data': + with self.check_context(arc.open(), filter): + self.expect_exception( + tarfile.OutsideDestinationError, + f"'../moo' would be extracted to " + + f"'.*moo', which is outside " + + "the destination") def test_sly_relative2(self): # Inspired by 'relative2' in jwilk/traversal-archives - with TestArchiveMaker() as chk: - chk.add('tmp/') - chk.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo') + with ArchiveMaker() as arc: + arc.add('tmp/') + arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo') - with self.check_context(chk.open(), 'fully_trusted'): + with self.check_context(arc.open(), 'fully_trusted'): self.expect_file('tmp', type=tarfile.DIRTYPE) if os_helper.can_symlink(): self.expect_file('../moo', symlink_to='tmp/../../tmp/moo') for filter in 'tar', 'data': - with self.assertRaisesRegex( + with self.check_context(arc.open(), filter): + self.expect_exception( tarfile.OutsideDestinationError, "'tmp/../../moo' would be extracted to " + """['"].*moo['"], which is outside the """ - + "destination"): - self.extractall_fail(chk.open(), filter=filter) + + "destination") def test_modes(self): # Test how file modes are extracted - with TestArchiveMaker() as chk: - chk.add('all_bits', mode='?rwsrwsrwt') - chk.add('perm_bits', mode='?rwxrwxrwx') - chk.add('exec_group_other', mode='?rw-rwxrwx') - chk.add('read_group_only', mode='?---r-----') - chk.add('no_bits', mode='?---------') - chk.add('dir/', mode='?---rwsrwt', type=tarfile.DIRTYPE) - - with self.check_context(chk.open(), 'fully_trusted'): + # (Note that the modes are ignored on platforms without working chmod) + with ArchiveMaker() as arc: + arc.add('all_bits', mode='?rwsrwsrwt') + arc.add('perm_bits', mode='?rwxrwxrwx') + arc.add('exec_group_other', mode='?rw-rwxrwx') + arc.add('read_group_only', mode='?---r-----') + arc.add('no_bits', mode='?---------') + arc.add('dir/', mode='?---rwsrwt', type=tarfile.DIRTYPE) + + with self.check_context(arc.open(), 'fully_trusted'): self.expect_file('all_bits', mode='?rwsrwsrwt') self.expect_file('perm_bits', mode='?rwxrwxrwx') self.expect_file('exec_group_other', mode='?rw-rwxrwx') @@ -3560,7 +3611,7 @@ def test_modes(self): self.expect_file('no_bits', mode='?---------') self.expect_file('dir', type=tarfile.DIRTYPE, mode='?---rwsrwt') - with self.check_context(chk.open(), 'tar'): + with self.check_context(arc.open(), 'tar'): self.expect_file('all_bits', mode='?rwxr-xr-x') self.expect_file('perm_bits', mode='?rwxr-xr-x') self.expect_file('exec_group_other', mode='?rw-r-xr-x') @@ -3568,7 +3619,7 @@ def test_modes(self): self.expect_file('no_bits', mode='?---------') self.expect_file('dir/', type=tarfile.DIRTYPE, mode='?---r-xr-x') - with self.check_context(chk.open(), 'data'): + with self.check_context(arc.open(), 'data'): normal_dir_mode = stat.filemode(stat.S_IMODE( self.outerdir.stat().st_mode)) self.expect_file('all_bits', mode='?rwxr-xr-x') @@ -3579,20 +3630,26 @@ def test_modes(self): self.expect_file('dir/', type=tarfile.DIRTYPE, mode=normal_dir_mode) def test_pipe(self): - with TestArchiveMaker() as chk: - chk.add('foo', type=tarfile.FIFOTYPE) + # Test handling of a special file + with ArchiveMaker() as arc: + arc.add('foo', type=tarfile.FIFOTYPE) for filter in 'fully_trusted', 'tar': - with self.check_context(chk.open(), filter): + with self.check_context(arc.open(), filter): if hasattr(os, 'mkfifo'): self.expect_file('foo', type=tarfile.FIFOTYPE) + else: + # The pipe can't be extracted and is skipped. + pass - with self.assertRaisesRegex( + with self.check_context(arc.open(), 'data'): + self.expect_exception( tarfile.SpecialFileError, - "'foo' is a special file"): - self.extractall_fail(chk.open(), filter='data') + "'foo' is a special file") def test_special_files(self): + # Creating device files is tricky. Instead of attempting that let's + # only check the filter result. for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE: tarinfo = tarfile.TarInfo('foo') tarinfo.type = special_type @@ -3606,12 +3663,16 @@ def test_special_files(self): self.assertEqual(cm.exception.tarinfo.name, 'foo') def test_fully_trusted_filter(self): + # The 'fully_trusted' filter returns the original TarInfo objects. with tarfile.TarFile.open(tarname) as tar: for tarinfo in tar.getmembers(): filtered = tarfile.fully_trusted_filter(tarinfo, '') self.assertIs(filtered, tarinfo) def test_tar_filter(self): + # The 'tar' filter returns TarInfo objects with the same name/type. + # (It can also fail for particularly "evil" input, but we don't have + # that in the test archive.) with tarfile.TarFile.open(tarname) as tar: for tarinfo in tar.getmembers(): filtered = tarfile.tar_filter(tarinfo, '') @@ -3619,6 +3680,8 @@ def test_tar_filter(self): self.assertIs(filtered.type, tarinfo.type) def test_data_filter(self): + # The 'data' filter either raises, or returns TarInfo with the same + # name/type. with tarfile.TarFile.open(tarname) as tar: for tarinfo in tar.getmembers(): try: @@ -3630,10 +3693,11 @@ def test_data_filter(self): def test_default_filter_warns(self): """Ensure the default filter warns""" - with TestArchiveMaker() as chk: - chk.add('foo') - with warnings_helper.check_warnings(('Python 3.14', DeprecationWarning)): - with self.check_context(chk.open(), None): + with ArchiveMaker() as arc: + arc.add('foo') + with warnings_helper.check_warnings( + ('Python 3.14', DeprecationWarning)): + with self.check_context(arc.open(), None): self.expect_file('foo') def test_change_default_filter_on_instance(self): @@ -3641,6 +3705,8 @@ def test_change_default_filter_on_instance(self): def strict_filter(tarinfo, path): if tarinfo.name == 'ustar/regtype': return tarinfo + else: + return None tar.extraction_filter = strict_filter with self.check_context(tar, None): self.expect_file('ustar/regtype') @@ -3649,14 +3715,25 @@ def test_change_default_filter_on_class(self): def strict_filter(tarinfo, path): if tarinfo.name == 'ustar/regtype': return tarinfo + else: + return None tar = tarfile.TarFile(tarname, 'r') - old_default = tarfile.TarFile.__dict__['extraction_filter'] - try: - tarfile.TarFile.extraction_filter = staticmethod(strict_filter) + with support.swap_attr(tarfile.TarFile, 'extraction_filter', + staticmethod(strict_filter)): with self.check_context(tar, None): self.expect_file('ustar/regtype') - finally: - tarfile.TarFile.extraction_filter = old_default + + def test_change_default_filter_on_subclass(self): + class TarSubclass(tarfile.TarFile): + def extraction_filter(self, tarinfo, path): + if tarinfo.name == 'ustar/regtype': + return tarinfo + else: + return None + + tar = TarSubclass(tarname, 'r') + with self.check_context(tar, None): + self.expect_file('ustar/regtype') def test_custom_filter(self): def custom_filter(tarinfo, path): @@ -3667,18 +3744,21 @@ def custom_filter(tarinfo, path): return None return tarinfo - with TestArchiveMaker() as chk: - chk.add('move_this') - chk.add('ignore_this') - chk.add('keep') - with self.check_context(chk.open(), custom_filter): + with ArchiveMaker() as arc: + arc.add('move_this') + arc.add('ignore_this') + arc.add('keep') + with self.check_context(arc.open(), custom_filter): self.expect_file('moved') self.expect_file('keep') def test_stateful_filter(self): + # Stateful filters should be possible. + # (This doesn't really test tarfile. Rather, it demonstrates + # that third parties can implement a stateful filter.) class StatefulFilter: def __enter__(self): - self.num_files = 0 + self.num_files_processed = 0 return self def __call__(self, tarinfo, path): @@ -3686,20 +3766,20 @@ def __call__(self, tarinfo, path): tarinfo = tarfile.data_filter(tarinfo, path) except tarfile.FilterError: return None - self.num_files += 1 + self.num_files_processed += 1 return tarinfo def __exit__(self, *exc_info): self.done = True - with TestArchiveMaker() as chk: - chk.add('good') - chk.add('bad', symlink_to='/') - chk.add('good') + with ArchiveMaker() as arc: + arc.add('good') + arc.add('bad', symlink_to='/') + arc.add('good') with StatefulFilter() as custom_filter: - with self.check_context(chk.open(), custom_filter): + with self.check_context(arc.open(), custom_filter): self.expect_file('good') - self.assertEqual(custom_filter.num_files, 2) + self.assertEqual(custom_filter.num_files_processed, 2) self.assertEqual(custom_filter.done, True) From bb53ce38f0e032712fd7151f95e3c08a39b549f2 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Tue, 21 Feb 2023 17:06:48 +0100 Subject: [PATCH 04/13] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lumír 'Frenzy' Balhar --- Lib/shutil.py | 2 +- Lib/tarfile.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/shutil.py b/Lib/shutil.py index 24ee23fccf8742..2a785e872089b8 100644 --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -1276,7 +1276,7 @@ def unpack_archive(filename, extract_dir=None, format=None, filter=None): In case none is found, a ValueError is raised. - If `filter` isgiven, it is passed to he underlying + If `filter` is given, it is passed to he underlying extraction function. """ sys.audit("shutil.unpack_archive", filename, extract_dir, format) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 65638322cb6d43..271e055e3cbc99 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -716,7 +716,7 @@ def __init__(self, tarfile, tarinfo): #----------------------------- -# extraction filters (PEP XXX) +# extraction filters (PEP 706) #----------------------------- class FilterError(TarError): From 9d1206b926ca412996dcb327f5965b3878e12293 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Tue, 21 Feb 2023 17:04:34 +0100 Subject: [PATCH 05/13] Add `tarfile` docs --- Doc/library/tarfile.rst | 424 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 412 insertions(+), 12 deletions(-) diff --git a/Doc/library/tarfile.rst b/Doc/library/tarfile.rst index 741d40da152101..8a22cc1c120e7a 100644 --- a/Doc/library/tarfile.rst +++ b/Doc/library/tarfile.rst @@ -36,6 +36,13 @@ Some facts and figures: .. versionchanged:: 3.3 Added support for :mod:`lzma` compression. +.. versionchanged:: 3.12 + Archives are extracted using a :ref:`filter `, + which makes easy to either limit surprising/dangerous features, + or to acknowledge that they are expected and the archive is fully trusted. + By default, archives are fully trusted, but this default is deprecated + and slated to change in Python 3.14. + .. function:: open(name=None, mode='r', fileobj=None, bufsize=10240, **kwargs) @@ -209,6 +216,38 @@ The :mod:`tarfile` module defines the following exceptions: Is raised by :meth:`TarInfo.frombuf` if the buffer it gets is invalid. +.. exception:: FilterError + + Base class for members :ref:`refused ` by + filters. + + .. attribute:: tarinfo + + Information about the member that the filter refused to extract, + as :ref:`TarInfo `. + +.. exception:: AbsolutePathError + + Raised to refuse extracting a member with an absolute path. + +.. exception:: OutsideDestinationError + + Raised to refuse extracting a member outside the destination directory. + +.. exception:: SpecialFileError + + Raised to refuse extracting a special file (e.g. a device or pipe). + +.. exception:: AbsoluteLinkError + + Raised to refuse extracting a symbolic link with an absolute path. + +.. exception:: LinkOutsideDestinationError + + Raised to refuse extracting a symbolic link pointing outside the destination + directory. + + The following constants are available at the module level: .. data:: ENCODING @@ -319,11 +358,8 @@ be finalized; only the internally used file object will be closed. See the *debug* can be set from ``0`` (no debug messages) up to ``3`` (all debug messages). The messages are written to ``sys.stderr``. - If *errorlevel* is ``0``, all errors are ignored when using :meth:`TarFile.extract`. - Nevertheless, they appear as error messages in the debug output, when debugging - is enabled. If ``1``, all *fatal* errors are raised as :exc:`OSError` - exceptions. If ``2``, all *non-fatal* errors are raised as :exc:`TarError` - exceptions as well. + If *errorlevel* controls how extraction errors are handled, + see :attr:`the corresponding attribute <~TarFile.errorlevel>`. The *encoding* and *errors* arguments define the character encoding to be used for reading or writing the archive and how conversion errors are going @@ -390,7 +426,7 @@ be finalized; only the internally used file object will be closed. See the available. -.. method:: TarFile.extractall(path=".", members=None, *, numeric_owner=False) +.. method:: TarFile.extractall(path=".", members=None, *, numeric_owner=False, filter=None) Extract all members from the archive to the current working directory or directory *path*. If optional *members* is given, it must be a subset of the @@ -404,6 +440,12 @@ be finalized; only the internally used file object will be closed. See the are used to set the owner/group for the extracted files. Otherwise, the named values from the tarfile are used. + The *filter* argument specifies how ``members`` are modified or rejected + before extraction. + See :ref:`tarfile-extraction-filter` for details. + It is recommended to set this explicitly depending on which *tar* features + you need to support. + .. warning:: Never extract archives from untrusted sources without prior inspection. @@ -411,14 +453,20 @@ be finalized; only the internally used file object will be closed. See the that have absolute filenames starting with ``"/"`` or filenames with two dots ``".."``. + Set ``filter='data'`` to prevent the most dangerous security issues, + and read the :ref:`tarfile-extraction-filter` section for details. + .. versionchanged:: 3.5 Added the *numeric_owner* parameter. .. versionchanged:: 3.6 The *path* parameter accepts a :term:`path-like object`. + .. versionchanged:: 3.12 + Added the *filter* parameter. + -.. method:: TarFile.extract(member, path="", set_attrs=True, *, numeric_owner=False) +.. method:: TarFile.extract(member, path="", set_attrs=True, *, numeric_owner=False, filter=None) Extract a member from the archive to the current working directory, using its full name. Its file information is extracted as accurately as possible. *member* @@ -426,9 +474,8 @@ be finalized; only the internally used file object will be closed. See the directory using *path*. *path* may be a :term:`path-like object`. File attributes (owner, mtime, mode) are set unless *set_attrs* is false. - If *numeric_owner* is :const:`True`, the uid and gid numbers from the tarfile - are used to set the owner/group for the extracted files. Otherwise, the named - values from the tarfile are used. + The *numeric_owner* and *filter* arguments are the same as + for :meth:`extractall`. .. note:: @@ -439,6 +486,9 @@ be finalized; only the internally used file object will be closed. See the See the warning for :meth:`extractall`. + Set ``filter='data'`` to prevent the most dangerous security issues, + and read the :ref:`tarfile-extraction-filter` section for details. + .. versionchanged:: 3.2 Added the *set_attrs* parameter. @@ -448,6 +498,9 @@ be finalized; only the internally used file object will be closed. See the .. versionchanged:: 3.6 The *path* parameter accepts a :term:`path-like object`. + .. versionchanged:: 3.12 + Added the *filter* parameter. + .. method:: TarFile.extractfile(member) @@ -460,6 +513,37 @@ be finalized; only the internally used file object will be closed. See the .. versionchanged:: 3.3 Return an :class:`io.BufferedReader` object. +.. attribute:: TarFile.errorlevel + + If *errorlevel* is ``0``, all errors are ignored when using :meth:`TarFile.extract` + and :meth:`TarFile.extractall`. + Nevertheless, they appear as error messages in the debug output, when debugging + is enabled. + If ``1`` (the default), all *fatal* errors are raised as :exc:`OSError` + exceptions. If ``2``, all *non-fatal* errors are raised as :exc:`TarError` + exceptions as well. + + Note that when an exception is raised, the archive may be partially + extracted. It is the user’s responsibility to clean up. + +.. attribute:: TarFile.extraction_filter + + .. versionadded:: 3.12 + + The default :ref:`extraction filter ` used + as a default for the *filter* argument of :meth:`~TarFile.extract` + and :meth:`~TarFile.extractall`. + + The attribute may be ``None`` (default) or a callable. + String names are not allowed. + + The attribute may be set on instances or overridden in subclasses. + It also is possible to set it on the ``TarFile`` class itself to set a + global default, although, since it affects all uses of *tarfile*, + it is best practice to only do so in top-level applications or + :mod:`site configuration `. + To set a global default this way, a filter function needs to be wrapped in + :func:`staticmethod()` to prevent injection of a ``self`` argument. .. method:: TarFile.add(name, arcname=None, recursive=True, *, filter=None) @@ -535,8 +619,23 @@ permissions, owner etc.), it provides some useful methods to determine its type. It does *not* contain the file's data itself. :class:`TarInfo` objects are returned by :class:`TarFile`'s methods -:meth:`getmember`, :meth:`getmembers` and :meth:`gettarinfo`. +:meth:`~TarFile.getmember`, :meth:`~TarFile.getmembers` and +:meth:`~TarFile.gettarinfo`. + +Modifying the objects returned by :meth:`~!TarFile.getmember` or +:meth:`~!TarFile.getmembers` will affect all subsequent +operations on the archive. +For cases where this is unwanted, you can use :mod:`copy.copy() ` or +call the :meth:`~TarInfo.replace` method to create a modified copy in one step. + +Several attributes can be set to ``None`` to indicate that a piece of metadata +is unused or unknown. +Different :class:`TarInfo` methods handle ``None`` differently: +- The :meth:`~TarFile.extract` or :meth:`~TarFile.extractall` methods will + ignore the corresponding metadata, leaving it set to a default. +- :meth:`~TarFile.addfile` will fail. +- :meth:`~TarFile.list` will print a placeholder string. .. class:: TarInfo(name="") @@ -569,24 +668,36 @@ A ``TarInfo`` object has the following public data attributes: .. attribute:: TarInfo.name + :type: str Name of the archive member. .. attribute:: TarInfo.size + :type: int Size in bytes. .. attribute:: TarInfo.mtime + :type: int Time of last modification. + .. versionchanged:: 3.12 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`. .. attribute:: TarInfo.mode + :type: int Permission bits. + .. versionchanged:: 3.12 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`. .. attribute:: TarInfo.type @@ -598,35 +709,72 @@ A ``TarInfo`` object has the following public data attributes: .. attribute:: TarInfo.linkname + :type: str Name of the target file name, which is only present in :class:`TarInfo` objects of type :const:`LNKTYPE` and :const:`SYMTYPE`. .. attribute:: TarInfo.uid + :type: int User ID of the user who originally stored this member. + .. versionchanged:: 3.12 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`. .. attribute:: TarInfo.gid + :type: int Group ID of the user who originally stored this member. + .. versionchanged:: 3.12 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`. .. attribute:: TarInfo.uname + :type: str User name. + .. versionchanged:: 3.12 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`. .. attribute:: TarInfo.gname + :type: str Group name. + .. versionchanged:: 3.12 + + Can be set to ``None`` for :meth:`~TarFile.extract` and + :meth:`~TarFile.extractall`. .. attribute:: TarInfo.pax_headers + :type: dict A dictionary containing key-value pairs of an associated pax extended header. +.. method:: TarInfo.replace(name=..., mtime=..., mode=..., linkname=..., \ + uid=..., gid=..., uname=..., gname=..., \ + deep=True) + + .. versionadded:: 3.12 + + Return a *new* copy of the :class:`!TarInfo` object with the given attributes + changed. For example, to return a ``TarInfo`` with the modification + set to the current time, use:: + + new_tarinfo = old_tarinfo.replace(mtime=time.time()) + + By default, a deep copy is made. + If *deep* is false, the copy is shallow, i.e. ``pax_headers`` + and any custom attributes are shared with the original ``TarInfo`` object. A :class:`TarInfo` object also provides some convenient query methods: @@ -676,9 +824,254 @@ A :class:`TarInfo` object also provides some convenient query methods: Return :const:`True` if it is one of character device, block device or FIFO. +.. _tarfile-extraction-filter: + +Extraction filters +------------------ + +.. versionadded:: 3.12 + +The *tar* format is designed to capture all details of a UNIX-like ecosystem, +which makes it very powerful. +Unfortunately, the features make it easy to create tar files that have +unintended -- and possibly malicious -- effects when extracted. +For example, extracting a tar file can overwrite arbitrary files in various +ways (e.g. by using absolute paths, ``..`` path components, or symlinks that +affect later members). + +In most cases, the full functionality is not needed. +Therefore, *tarfile* supports extraction filters: a mechanism to limit +functionality, and thus mitigate some of the security issues. + +.. seealso:: + + :pep:`706` + Contains further motivation and rationale behind the design. + +The *filter* argument to :meth:`TarFile.extract` or :meth:`~TarFile.extractall` +can be: + +* the string ``'fully_trusted'``: Honor all metadata as specified in the + archive. + Should be used if the user trusts the archive completely, or implements + their own complex verification. + +* the string ``'tar'``: Honor most *tar*-specific features (i.e. features of + UNIX-like filesystems), but block features that are very likely to be + surprising or malicious. See :func:`tar_filter` for details. + +* the string ``'data'``: Ignore or block most features specific to UNIX-like + filesystems. Intended for extracting cross-platform data archives. + See :func:`data_filter` for details. + +* ``None`` (default): Use :attr:`TarFile.extraction_filter`. + + If that is also ``None`` (the default), raise a ``DeprecationWarning``, + and fall back to the ``'fully_trusted'`` filter, whose dangerous behavior + matches previous versions of Python. + + In Python 3.14, the ``'data'`` filter will become the default instead. + It's possible to switch earlier; see :attr:`TarFile.extraction_filter`. + +* A callable which will be called for each extracted member with a + :ref:`TarInfo ` describing the member and the destination + path to where the archive is extracted (i.e. the same path is used for all + members):: + + filter(/, member: TarInfo, path: str) -> TarInfo | None + + The callable is called just before each member is extracted, so it can + take the current state of the disk into account. + It can: + + - return a :class:`TarInfo` object which will be used instead of the metadata + in the archive, or + - return ``None``, in which case the member will be skipped, or + - raise an exception to abort the operation or skip the member, + depending on :attr:`~TarFile.errorlevel`. + +Default named filters +~~~~~~~~~~~~~~~~~~~~~ + +The pre-defined, named filters are available as functions, so they can be +reused in custom filters: + +.. function:: fully_trusted_filter(/, member, path) + + Return *member* unchanged. + +.. function:: tar_filter(/, member, path) + + Implements the ``'tar'`` filter. + + - Strip leading slashes (`/` and :attr:`os.sep`) from filenames. + - :ref:`Refuse ` to extract files with absolute + paths (in case the name is absolute + even after stripping slashes, e.g. ``C:/foo`` on Windows). + This raises :class:`~tarfile.AbsolutePathError`. + - :ref:`Refuse ` to extract files whose absolute + path (after following symlinks) would end up outside the destination. + This raises :class:`~tarfile.OutsideDestinationError`. + - Clear high mode bits (setuid, setgid, sticky) and group/other write bits + (:attr:`~stat.S_IWGRP`|:attr:`~stat.S_IWOTH`). + + Return the modified ``TarInfo`` member. + +.. function:: data_filter(/, member, path) + + Implements the ``'data'`` filter. + In addition to what ``tar_filter`` does: + + - :ref:`Refuse ` to extract links (hard or soft) + that link to absolute paths, or ones that link outside the destination. + + This raises :class:`~tarfile.AbsoluteLinkError` or + :class:`~tarfile.LinkOutsideDestinationError`. + + Note that such files are refused even on platforms that do not support + symbolic links. + + - :ref:`Refuse ` to extract device files + (including pipes). + This raises :class:`~tarfile.SpecialFileError`. + + - For regular files, including hard links: + + - Set the owner read and write permissions + (:attr:`~stat.S_IRUSR`|:attr:`~stat.S_IWUSR`). + - Remove the group & other executable permission + (:attr:`~stat.S_IXGRP`|:attr:`~stat.S_IXOTH`) + if the owner doesn’t have it (:attr:`~stat.S_IXUSR`). + - Remove the group & other read permission + (:attr:`~stat.S_IRGRP`|:attr:`~stat.S_IROTH`) if the owner + doesn’t have it (:attr:`~stat.S_IRUSR`). + + - For other files (directories), ignore permission bits entirely. + - Ignore user and group info (``uid``, ``gid``, ``uname``, ``gname``). + + Return the modified ``TarInfo`` member. + + +.. _tarfile-extraction-refuse: + +Filter errors +~~~~~~~~~~~~~ + +When a filter refuses to extract a file, it will raise an appropriate exception, +a subclass of :class:`~tarfile.FilterError`. +This will abort the extraction if :attr:`TarFile.errorlevel` is 1 or more. +With ``errorlevel=0`` the error will be logged and the member will be ignored, +but extraction will continue. + + +Hints for further verification +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Even with ``filter='data'``, *tarfile* is not suited for extracting untrusted +files without prior inspection. +Among other issues, the pre-defined filters do not prevent denial-of-service +attacks. Users should do additional checks. + +Here is an incomplete list of things to consider: + +* Extract to a new empty directory, to prevent e.g. exploiting pre-existing + links, and to make it easier to clean up after a failed extraction. +* When working with untrusted data, use external (e.g. OS-level) limits on + ·disk, memory and CPU usage. +* Check filenames against an allow-list of characters + (to filter out control characters, confusables, foreign path separators, + etc.). +* Check that filenames have expected extensions (discouraging files that + execute when you “click on them”, or extension-less files like Windows special device names), +* Limiting the number of extracted files, total size of extracted data, + filename length (including symlink length), size of individual files. +* Check for files that would be shadowed on case-insensitive filesystems. + +Also note that: + +* Tar files often contain multiple versions of the same file. + Later ones are expected to overwrite any earlier ones. + This feature is crucial to allow updating tape archives, but can be abused + maliciously. +* *tarfile* does not protect against issues with “live” data, + e.g. an attacker tinkering with the destination (or source) directory while + extraction (or archiving) is in progress. + + +Supporting older Python versions +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Extraction filters were added to Python 3.12, but may be backported to older +versions as security updates. +To check whether the feature is available, use e.g. +``hasattr(tarfile, 'data_filter')`` rather than checking the Python version. + +The following examples show how to support Python versions with and without +the feature. +Note that setting ``extraction_filter`` will affect any subsequent operations. + +* Fully trusted archive:: + + my_tarfile.extraction_filter = (lambda member, path: member) + my_tarfile.extractall() + +* Use the ``'data'`` filter if available, but revert to Python 3.11 behavior + (``'fully_trusted'``) if this feature is not available:: + + my_tarfile.extraction_filter = getattr(tarfile, 'data_filter', + (lambda member, path: member)) + my_tarfile.extractall() + +* Use the ``'data'`` filter; *fail* if it is not available:: + + my_tarfile.extractall(filter=tarfile.data_filter) + + or:: + + my_tarfile.extraction_filter = tarfile.data_filter + my_tarfile.extractall() + +* Use the ``'data'`` filter; *warn* if it is not available:: + + if hasattr(tarfile, 'data_filter'): + my_tarfile.extractall(filter='data') + else: + # remove this when no longer needed + warn_the_user('Extracting may be unsafe; consider updating Python') + my_tarfile.extractall() + + +Stateful extraction filter example +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +While *tarfile*'s extraction methods take a simple *filter* callable, +custom filters may be more complex objects with internal state. +It may be useful to write these as context managers, to be used like this:: + + with StatefulFilter() as filter_func: + tar.extractall(path, filter=filter_func) + +Such a filter can be written as, for example:: + + class StatefulFilter: + def __init__(self): + self.file_count = 0 + + def __enter__(self): + return self + + def __call__(self, member, path): + self.file_count += 1 + return member + + def __exit__(self, *exc_info): + print(f'{self.file_count} files extracted') + + .. _tarfile-commandline: .. program:: tarfile + Command-Line Interface ---------------------- @@ -748,6 +1141,13 @@ Command-line options Verbose output. +.. cmdoption:: --filter + + Specifies the *filter* for ``--extract``. + See :ref:`tarfile-extraction-filter` for details. + Only string names are accepted (that is, ``fully_trusted``, ``tar``, + and ``data``). + .. _tar-examples: Examples @@ -757,7 +1157,7 @@ How to extract an entire tar archive to the current working directory:: import tarfile tar = tarfile.open("sample.tar.gz") - tar.extractall() + tar.extractall(filter='data') tar.close() How to extract a subset of a tar archive with :meth:`TarFile.extractall` using From bc600c80146c738c2236b7b5ba516afa3b8916fb Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Wed, 22 Feb 2023 15:54:56 +0100 Subject: [PATCH 06/13] Raise a descriptive error when extraction_filter is set to a string (This is disallowed for forward-compatibility reasons, see the PEP.) --- Lib/tarfile.py | 5 +++++ Lib/test/test_tarfile.py | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index 271e055e3cbc99..f0b8672daab03f 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -2216,6 +2216,11 @@ def _get_filter_function(self, filter): + 'Use the filter argument to control this behavior.', DeprecationWarning) return fully_trusted_filter + if isinstance(filter, str): + raise TypeError( + 'String names are not supported for ' + + 'TarFile.extraction_filter. Use a function such as ' + + 'tarfile.data_filter directly.') return filter if callable(filter): return filter diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index d871259d06055e..d9b56e779ce246 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -3735,6 +3735,12 @@ def extraction_filter(self, tarinfo, path): with self.check_context(tar, None): self.expect_file('ustar/regtype') + def test_change_default_filter_to_string(self): + tar = tarfile.TarFile(tarname, 'r') + tar.extraction_filter = 'data' + with self.check_context(tar, None): + self.expect_exception(TypeError) + def test_custom_filter(self): def custom_filter(tarinfo, path): self.assertIs(path, self.destdir) From 09256f17c0752a497a1b731b6a63ec5c71b0b925 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Thu, 23 Feb 2023 14:53:47 +0100 Subject: [PATCH 07/13] Fix, docs, tests for shutil.unpack_archive --- Doc/library/shutil.rst | 16 +++++++++++++++- Lib/shutil.py | 6 +++--- Lib/test/test_shutil.py | 41 ++++++++++++++++++++++++++++------------- 3 files changed, 46 insertions(+), 17 deletions(-) diff --git a/Doc/library/shutil.rst b/Doc/library/shutil.rst index b33dbe21b1fa19..47e4f9539d60b7 100644 --- a/Doc/library/shutil.rst +++ b/Doc/library/shutil.rst @@ -636,7 +636,7 @@ provided. They rely on the :mod:`zipfile` and :mod:`tarfile` modules. Remove the archive format *name* from the list of supported formats. -.. function:: unpack_archive(filename[, extract_dir[, format]]) +.. function:: unpack_archive(filename[, extract_dir[, format[, filter]]]) Unpack an archive. *filename* is the full path of the archive. @@ -650,6 +650,14 @@ provided. They rely on the :mod:`zipfile` and :mod:`tarfile` modules. registered for that extension. In case none is found, a :exc:`ValueError` is raised. + *filter* is passed to the underlying unpacking function. + For zip files, *filter* is not accepted. + For tar files, it is recommended to set it to ``'data'``, + unless using features specific to tar and UNIX filesystems. + (See :ref:`tarfile-extraction-filter` for details.) + The ``'data'`` filter ·will become the default for tar files + in Python 3.14. + .. audit-event:: shutil.unpack_archive filename,extract_dir,format shutil.unpack_archive .. warning:: @@ -662,6 +670,9 @@ provided. They rely on the :mod:`zipfile` and :mod:`tarfile` modules. .. versionchanged:: 3.7 Accepts a :term:`path-like object` for *filename* and *extract_dir*. + .. versionchanged:: 3.12 + Added the *filter* argument. + .. function:: register_unpack_format(name, extensions, function[, extra_args[, description]]) Registers an unpack format. *name* is the name of the format and @@ -672,6 +683,9 @@ provided. They rely on the :mod:`zipfile` and :mod:`tarfile` modules. callable will receive the path of the archive, followed by the directory the archive must be extracted to. + If *filter* is used with ``unpack_archive``, it will be passed to *function* + as an additional named argument. + When provided, *extra_args* is a sequence of ``(name, value)`` tuples that will be passed as keywords arguments to the callable. diff --git a/Lib/shutil.py b/Lib/shutil.py index 2a785e872089b8..0c08249fd723aa 100644 --- a/Lib/shutil.py +++ b/Lib/shutil.py @@ -1218,7 +1218,7 @@ def _unpack_zipfile(filename, extract_dir): finally: zip.close() -def _unpack_tarfile(filename, extract_dir): +def _unpack_tarfile(filename, extract_dir, filter=None): """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` """ import tarfile # late import for breaking circular dependency @@ -1228,7 +1228,7 @@ def _unpack_tarfile(filename, extract_dir): raise ReadError( "%s is not a compressed or uncompressed tar file" % filename) try: - tarobj.extractall(extract_dir) + tarobj.extractall(extract_dir, filter=filter) finally: tarobj.close() @@ -1306,7 +1306,7 @@ def unpack_archive(filename, extract_dir=None, format=None, filter=None): raise ReadError("Unknown archive format '{0}'".format(filename)) func = _UNPACK_FORMATS[format][1] - kwargs = filter_kwargs | dict(_UNPACK_FORMATS[format][2]) + kwargs = dict(_UNPACK_FORMATS[format][2]) | filter_kwargs func(filename, extract_dir, **kwargs) diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py index 8fe62216ecdca0..563b054ad27d8b 100644 --- a/Lib/test/test_shutil.py +++ b/Lib/test/test_shutil.py @@ -32,6 +32,7 @@ from test import support from test.support import os_helper from test.support.os_helper import TESTFN, FakePath +from test.support import warnings_helper TESTFN2 = TESTFN + "2" TESTFN_SRC = TESTFN + "_SRC" @@ -1667,12 +1668,14 @@ def test_register_archive_format(self): ### shutil.unpack_archive - def check_unpack_archive(self, format): - self.check_unpack_archive_with_converter(format, lambda path: path) - self.check_unpack_archive_with_converter(format, pathlib.Path) - self.check_unpack_archive_with_converter(format, FakePath) + def check_unpack_archive(self, format, **kwargs): + self.check_unpack_archive_with_converter( + format, lambda path: path, **kwargs) + self.check_unpack_archive_with_converter( + format, pathlib.Path, **kwargs) + self.check_unpack_archive_with_converter(format, FakePath, **kwargs) - def check_unpack_archive_with_converter(self, format, converter): + def check_unpack_archive_with_converter(self, format, converter, **kwargs): root_dir, base_dir = self._create_files() expected = rlistdir(root_dir) expected.remove('outer') @@ -1682,36 +1685,48 @@ def check_unpack_archive_with_converter(self, format, converter): # let's try to unpack it now tmpdir2 = self.mkdtemp() - unpack_archive(converter(filename), converter(tmpdir2)) + unpack_archive(converter(filename), converter(tmpdir2), **kwargs) self.assertEqual(rlistdir(tmpdir2), expected) # and again, this time with the format specified tmpdir3 = self.mkdtemp() - unpack_archive(converter(filename), converter(tmpdir3), format=format) + unpack_archive(converter(filename), converter(tmpdir3), format=format, + **kwargs) self.assertEqual(rlistdir(tmpdir3), expected) - self.assertRaises(shutil.ReadError, unpack_archive, converter(TESTFN)) - self.assertRaises(ValueError, unpack_archive, converter(TESTFN), format='xxx') + with self.assertRaises(shutil.ReadError): + unpack_archive(converter(TESTFN), **kwargs) + with self.assertRaises(ValueError): + unpack_archive(converter(TESTFN), format='xxx', **kwargs) + + def check_unpack_tarball(self, format): + self.check_unpack_archive(format, filter='fully_trusted') + self.check_unpack_archive(format, filter='data') + with warnings_helper.check_warnings( + ('Python 3.14', DeprecationWarning)): + self.check_unpack_archive(format) def test_unpack_archive_tar(self): - self.check_unpack_archive('tar') + self.check_unpack_tarball('tar') @support.requires_zlib() def test_unpack_archive_gztar(self): - self.check_unpack_archive('gztar') + self.check_unpack_tarball('gztar') @support.requires_bz2() def test_unpack_archive_bztar(self): - self.check_unpack_archive('bztar') + self.check_unpack_tarball('bztar') @support.requires_lzma() @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger") def test_unpack_archive_xztar(self): - self.check_unpack_archive('xztar') + self.check_unpack_tarball('xztar') @support.requires_zlib() def test_unpack_archive_zip(self): self.check_unpack_archive('zip') + with self.assertRaises(TypeError): + self.check_unpack_archive('zip', filter='data') def test_unpack_registry(self): From 33bde67c9bdc6e0e5edb64985427e675ccce203b Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Thu, 23 Feb 2023 16:01:17 +0100 Subject: [PATCH 08/13] Add What's New entries --- Doc/whatsnew/3.12.rst | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/Doc/whatsnew/3.12.rst b/Doc/whatsnew/3.12.rst index 45a5e5062d55b6..526421dbdf4191 100644 --- a/Doc/whatsnew/3.12.rst +++ b/Doc/whatsnew/3.12.rst @@ -137,6 +137,13 @@ New Features (Design by Pablo Galindo. Contributed by Pablo Galindo and Christian Heimes with contributions from Gregory P. Smith [Google] and Mark Shannon in :gh:`96123`.) +* The extraction methods in :mod:`tarfile`, and :func:`shutil.unpack_archive`, + have a new a *filter* argument that allows limiting tar features than may be + surprising or dangerous, such as creating files outside the destination + directory. + See :ref:`tarfile-extraction-filter` for details. + In Python 3.14, the default will switch to ``'data'``. + (Contributed by Petr Viktorin in :pep:`706`.) Other Language Changes @@ -440,6 +447,11 @@ Deprecated warning at compile time. This field will be removed in Python 3.14. (Contributed by Ramvikrams and Kumar Aditya in :gh:`101193`. PEP by Ken Jin.) +* Extracting tar archives without specifying *filter* is deprecated until + Python 3.14, when ``'data'`` filter will become the default. + See :ref:`tarfile-extraction-filter` for details. + + Pending Removal in Python 3.13 ------------------------------ @@ -516,6 +528,7 @@ Pending Removal in Python 3.14 functions that have been deprecated since Python 2 but only gained a proper :exc:`DeprecationWarning` in 3.12. Remove them in 3.14. + Pending Removal in Future Versions ---------------------------------- @@ -754,6 +767,11 @@ Changes in the Python API around process-global resources, which are best managed from the main interpreter. (Contributed by Dong-hee Na in :gh:`99127`.) +* When extracting tar files using :mod:`tarfile` or + :func:`shutil.unpack_archive`, pass the *filter* argument to limit features + that may be surprising or dangerous. + See :ref:`tarfile-extraction-filter` for details. + Build Changes ============= From 615f060db6dff6083b4d10065a3a932997cb037c Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Mon, 6 Mar 2023 14:49:56 +0100 Subject: [PATCH 09/13] Docs note about leaving the archive partially extracted --- Doc/library/tarfile.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Doc/library/tarfile.rst b/Doc/library/tarfile.rst index 8a22cc1c120e7a..1c8f7a736c2b4c 100644 --- a/Doc/library/tarfile.rst +++ b/Doc/library/tarfile.rst @@ -889,6 +889,8 @@ can be: - return ``None``, in which case the member will be skipped, or - raise an exception to abort the operation or skip the member, depending on :attr:`~TarFile.errorlevel`. + Note that when extraction is aborted, :meth:`~TarFile.extractall` may leave + the archive partially extracted. It does not attempt to clean up. Default named filters ~~~~~~~~~~~~~~~~~~~~~ From c270b063161df7ef917617681548c48391720da9 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Mon, 6 Mar 2023 14:50:22 +0100 Subject: [PATCH 10/13] Raise ValueError on unknown filter names --- Lib/tarfile.py | 2 +- Lib/test/test_tarfile.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/Lib/tarfile.py b/Lib/tarfile.py index f0b8672daab03f..ce727aec2c05b3 100755 --- a/Lib/tarfile.py +++ b/Lib/tarfile.py @@ -2227,7 +2227,7 @@ def _get_filter_function(self, filter): try: return _NAMED_FILTERS[filter] except KeyError: - raise LookupError(f"filter {filter!r} not found") + raise ValueError(f"filter {filter!r} not found") from None def extractall(self, path=".", members=None, *, numeric_owner=False, filter=None): diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index d9b56e779ce246..1c17480aff96ff 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -3758,6 +3758,12 @@ def custom_filter(tarinfo, path): self.expect_file('moved') self.expect_file('keep') + def test_bad_filter_name(self): + with ArchiveMaker() as arc: + arc.add('foo') + with self.check_context(arc.open(), 'bad filter name'): + self.expect_exception(ValueError) + def test_stateful_filter(self): # Stateful filters should be possible. # (This doesn't really test tarfile. Rather, it demonstrates From 0ab3bedbeb3613062547e79a3a5425d453746af7 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Mon, 6 Mar 2023 15:26:20 +0100 Subject: [PATCH 11/13] Fix resource warnings in tests --- Lib/test/test_tarfile.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_tarfile.py b/Lib/test/test_tarfile.py index 1c17480aff96ff..e34c8725b09167 100644 --- a/Lib/test/test_tarfile.py +++ b/Lib/test/test_tarfile.py @@ -3189,8 +3189,8 @@ def test_list(self): for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'}, {'uname'}, {'gname'}, {'uid', 'uname'}, {'gid', 'gname'}): - with self.subTest(attr_names=attr_names): - tar = tarfile.open(tarname, encoding="iso8859-1") + with (self.subTest(attr_names=attr_names), + tarfile.open(tarname, encoding="iso8859-1") as tar): tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') with support.swap_attr(sys, 'stdout', tio_prev): tar.list() @@ -3358,8 +3358,10 @@ def check_context(self, tar, filter): self.raised_exception = None self.expected_paths = set(self.outerdir.glob('**/*')) self.expected_paths.discard(self.destdir) - yield - tar.close() + try: + yield + finally: + tar.close() if self.raised_exception: raise self.raised_exception self.assertEqual(self.expected_paths, set()) From dcc4f0653b4f4c0d2c981e7f832585c6c3e62470 Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Tue, 7 Mar 2023 11:24:06 +0100 Subject: [PATCH 12/13] Fix docs symntax --- Doc/library/shutil.rst | 2 +- Doc/library/tarfile.rst | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/library/shutil.rst b/Doc/library/shutil.rst index 47e4f9539d60b7..1ba9cd00c9982c 100644 --- a/Doc/library/shutil.rst +++ b/Doc/library/shutil.rst @@ -655,7 +655,7 @@ provided. They rely on the :mod:`zipfile` and :mod:`tarfile` modules. For tar files, it is recommended to set it to ``'data'``, unless using features specific to tar and UNIX filesystems. (See :ref:`tarfile-extraction-filter` for details.) - The ``'data'`` filter ·will become the default for tar files + The ``'data'`` filter will become the default for tar files in Python 3.14. .. audit-event:: shutil.unpack_archive filename,extract_dir,format shutil.unpack_archive diff --git a/Doc/library/tarfile.rst b/Doc/library/tarfile.rst index 1c8f7a736c2b4c..6954b5ca6417bc 100644 --- a/Doc/library/tarfile.rst +++ b/Doc/library/tarfile.rst @@ -906,7 +906,7 @@ reused in custom filters: Implements the ``'tar'`` filter. - - Strip leading slashes (`/` and :attr:`os.sep`) from filenames. + - Strip leading slashes (``/`` and :attr:`os.sep`) from filenames. - :ref:`Refuse ` to extract files with absolute paths (in case the name is absolute even after stripping slashes, e.g. ``C:/foo`` on Windows). From ce42e637e28ae122b126b9f47566bc2e2338807d Mon Sep 17 00:00:00 2001 From: Petr Viktorin Date: Thu, 23 Mar 2023 14:57:02 +0100 Subject: [PATCH 13/13] Update Doc/library/tarfile.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Lumír 'Frenzy' Balhar --- Doc/library/tarfile.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/tarfile.rst b/Doc/library/tarfile.rst index 6954b5ca6417bc..1f53d984aa3a01 100644 --- a/Doc/library/tarfile.rst +++ b/Doc/library/tarfile.rst @@ -358,7 +358,7 @@ be finalized; only the internally used file object will be closed. See the *debug* can be set from ``0`` (no debug messages) up to ``3`` (all debug messages). The messages are written to ``sys.stderr``. - If *errorlevel* controls how extraction errors are handled, + *errorlevel* controls how extraction errors are handled, see :attr:`the corresponding attribute <~TarFile.errorlevel>`. The *encoding* and *errors* arguments define the character encoding to be