diff --git a/src/monty/io.py b/src/monty/io.py index 135bec899..81110534e 100644 --- a/src/monty/io.py +++ b/src/monty/io.py @@ -16,7 +16,7 @@ import time import warnings from pathlib import Path -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Literal, cast if TYPE_CHECKING: from typing import IO, Any, Generator, Union @@ -90,6 +90,7 @@ def zopen( kwargs["encoding"] = "utf-8" _name, ext = os.path.splitext(filename) + ext = ext.lower() if ext == ".bz2": @@ -112,7 +113,64 @@ def zopen( return open(filename, mode, **kwargs) -def reverse_readfile(filename: Union[str, Path]) -> Generator[str, str, None]: +def _get_line_ending( + file: str + | Path + | io.TextIOWrapper + | io.BufferedReader + | gzip.GzipFile + | bz2.BZ2File, +) -> Literal["\r\n", "\n"]: + """Helper function to get line ending of a file. + + This function assumes the file has a single consistent line ending. + + WARNING: as per the POSIX standard, a line is: "A sequence of zero or + more non- characters plus a terminating char.", as such + this func might fail if the only line misses a terminating newline character. + https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/V1_chap03.html + + Returns: + "\n": Unix line ending. + "\r\n": Windows line ending. + + Raises: + ValueError: If line ending is unknown. + + Warnings: + If file is empty, "\n" would be used as default. + """ + if isinstance(file, (str, Path)): + with zopen(file, "rb") as f: + first_line = f.readline() + elif isinstance(file, io.TextIOWrapper): + first_line = file.buffer.readline() # type: ignore[attr-defined] + elif isinstance(file, (io.BufferedReader, gzip.GzipFile, bz2.BZ2File)): + first_line = file.readline() + else: + raise TypeError(f"Unknown file type {type(file).__name__}") + + # Reset pointer to start of file if possible + if hasattr(file, "seek"): + file.seek(0) + + # Return Unix "\n" line ending as default if file is empty + if not first_line: + warnings.warn("File is empty, return Unix line ending \n.", stacklevel=2) + return "\n" + + if first_line.endswith(b"\r\n"): + return "\r\n" + if first_line.endswith(b"\n"): + return "\n" + + # It's likely the line is missing a line ending for the first line + raise ValueError(f"Unknown line ending in line {repr(first_line)}.") + + +def reverse_readfile( + filename: Union[str, Path], +) -> Iterator[str]: """ A much faster reverse read of file by using Python's mmap to generate a memory-mapped file. It is slower for very small files than @@ -125,108 +183,154 @@ def reverse_readfile(filename: Union[str, Path]) -> Generator[str, str, None]: Yields: Lines from the file in reverse order. """ - try: - with zopen(filename, "rb") as file: - if isinstance(file, (gzip.GzipFile, bz2.BZ2File)): - for line in reversed(file.readlines()): - yield line.decode("utf-8").rstrip(os.linesep) - else: + # Get line ending + l_end = _get_line_ending(filename) + len_l_end = len(l_end) + + with zopen(filename, "rb") as file: + if isinstance(file, (gzip.GzipFile, bz2.BZ2File)): + for line in reversed(file.readlines()): + # "readlines" would keep the line end character + yield line.decode("utf-8") + + else: + try: filemap = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) - n = len(filemap) - while n > 0: - i = filemap.rfind(os.linesep.encode(), 0, n) - yield filemap[i + 1 : n].decode("utf-8").rstrip(os.linesep) - n = i + except ValueError: + warnings.warn("trying to mmap an empty file.", stacklevel=2) + return - except ValueError: - return + file_size = len(filemap) + while file_size > 0: + # Find line segment start and end positions + seg_start_pos = filemap.rfind(l_end.encode(), 0, file_size) + seg_end_pos = file_size + len_l_end + + # The first (originally) line doesn't have an ending character at its head + if seg_start_pos == -1: + yield (filemap[:seg_end_pos].decode("utf-8")) + + # Skip the first match (the last line ending character) + elif file_size != len(filemap): + yield ( + filemap[seg_start_pos + len_l_end : seg_end_pos].decode("utf-8") + ) + file_size = seg_start_pos def reverse_readline( - m_file, blk_size: int = 4096, max_mem: int = 4000000 -) -> Generator[str, str, None]: + m_file: io.BufferedReader | io.TextIOWrapper | gzip.GzipFile | bz2.BZ2File, + blk_size: int = 4096, + max_mem: int = 4_000_000, +) -> Iterator[str]: """ - Generator function to read a file line-by-line, but backwards. - This allows one to efficiently get data at the end of a file. + Read a file backwards line-by-line, and behave similarly to + the file.readline function. This allows one to efficiently + get data from the end of a file. - Read file forwards and reverse in memory for files smaller than the - max_mem parameter, or for gzip files where reverse seeks are not supported. + Supported file stream formats: + - TextIOWrapper (text mode) | BufferedReader (binary mode) + - gzip/bzip2 file stream - Files larger than max_mem are dynamically read backwards. + Cases where file would be read forwards and reversed in RAM: + - If file size is smaller than RAM usage limit (max_mem). + - Gzip files, as reverse seeks are not supported. Reference: - Based on code by Peter Astrand , using modifications - by Raymond Hettinger and Kevin German. - http://code.activestate.com/recipes/439045-read-a-text-file-backwards - -yet-another-implementat/ + Based on code by Peter Astrand , using + modifications by Raymond Hettinger and Kevin German. + http://code.activestate.com/recipes/439045-read-a-text- + file-backwards-yet-another-implementat/ Args: - m_file (File): File stream to read (backwards) - blk_size (int): The buffer size. Defaults to 4096. - max_mem (int): The maximum amount of memory to involve in this - operation. This is used to determine when to reverse a file - in-memory versus seeking portions of a file. For bz2 files, - this sets the maximum block size. + m_file: File stream to read (backwards). + blk_size (int): The block size to read each time in bytes. + Defaults to 4096. + max_mem (int): Threshold to determine when to reverse a file + in-memory versus reading blocks of a file each time. + For bz2 files, this sets the block size. - Returns: - Generator that yields lines from the file. Behave similarly to the - file.readline() function, except the lines are returned from the back - of the file. + Yields: + Lines from the back of the file. + + Raises: + TypeError: If m_file is the name of the file (expect file stream). + + Warnings: + If max_mem is smaller than blk_size. """ - # Check if the file stream is a bit stream or not - is_text = isinstance(m_file, io.TextIOWrapper) - - try: - file_size = os.path.getsize(m_file.name) - except AttributeError: - # Bz2 files do not have name attribute. Just set file_size to above - # max_mem for now. - file_size = max_mem + 1 - - # If the file size is within our desired RAM use, just reverse it in memory - # GZip files must use this method because there is no way to negative seek - # For windows, we also read the whole file. - if file_size < max_mem or isinstance(m_file, gzip.GzipFile) or os.name == "nt": + # Check for illegal usage + if isinstance(m_file, (str, Path)): + raise TypeError("expect a file stream, not file name") + + # Generate line ending + l_end: Literal["\r\n", "\n"] = _get_line_ending(m_file) + len_l_end: Literal[1, 2] = cast(Literal[1, 2], len(l_end)) + + # Bz2 files do not have "name" attribute, just set to max_mem for now + if hasattr(m_file, "name"): + file_size: int = os.path.getsize(m_file.name) + else: + file_size = max_mem + + # If the file size is within desired RAM limit, just reverse it in memory. + # Gzip files must use this method because there is no way to negative seek. + if file_size < max_mem or isinstance(m_file, gzip.GzipFile): for line in reversed(m_file.readlines()): - yield line.rstrip() + yield line if isinstance(line, str) else cast(bytes, line).decode("utf-8") + else: + # RAM limit should be greater than block size, + # as file is read into RAM one block each time. + if max_mem < blk_size: + warnings.warn(f"{max_mem=} smaller than {blk_size=}", stacklevel=2) + + # For bz2 files, seek is expensive. It is therefore in our best + # interest to maximize the block size within RAM usage limit. if isinstance(m_file, bz2.BZ2File): - # for bz2 files, seeks are expensive. It is therefore in our best - # interest to maximize the blk_size within limits of desired RAM - # use. blk_size = min(max_mem, file_size) - buf = "" - m_file.seek(0, 2) - lastchar = m_file.read(1) if is_text else m_file.read(1).decode("utf-8") + # Check if the file stream is text (instead of binary) + is_text: bool = isinstance(m_file, io.TextIOWrapper) - trailing_newline = lastchar == os.linesep + buffer: str = "" + m_file.seek(0, 2) + skipped_1st_l_end: bool = False while True: - newline_pos = buf.rfind(os.linesep) - pos = m_file.tell() - if newline_pos != -1: - # Found a newline - line = buf[newline_pos + 1 :] - buf = buf[:newline_pos] - if pos or newline_pos or trailing_newline: - line += os.linesep - yield line - - elif pos: - # Need to fill buffer - toread = min(blk_size, pos) - m_file.seek(pos - toread, 0) + l_end_pos: int = buffer.rfind(l_end) + # Pointer position (also size of remaining file) + pt_pos: int = m_file.tell() + + # Line ending found within buffer + if l_end_pos != -1: + line = buffer[l_end_pos + len_l_end :] + buffer = buffer[:l_end_pos] # buffer doesn't include l_end + + # Skip first match (the last line ending) + if skipped_1st_l_end: + yield line + l_end + else: + skipped_1st_l_end = True + + # Line ending not in current buffer, load next block into the buffer + elif pt_pos > 0: + to_read: int = min(blk_size, pt_pos) + m_file.seek(pt_pos - to_read) if is_text: - buf = m_file.read(toread) + buf + buffer = cast(str, m_file.read(to_read)) + buffer else: - buf = m_file.read(toread).decode("utf-8") + buf - m_file.seek(pos - toread, 0) - if pos == toread: - buf = os.linesep + buf + buffer = cast(bytes, m_file.read(to_read)).decode("utf-8") + buffer + + # Move pointer forward + m_file.seek(pt_pos - to_read) - else: - # Start-of-file + # Add a l_end to the start of file + if pt_pos == to_read: + buffer = l_end + buffer + + # Start of file + else: # l_end_pos == -1 and pt_post == 0 return @@ -328,8 +432,7 @@ def get_open_fds() -> int: """ Get the number of open file descriptors for current process. - Warnings: - Will only work on UNIX-like OS-es. + Warning, this will only work on UNIX-like OS. Returns: int: The number of open file descriptors for current process. diff --git a/src/monty/json.py b/src/monty/json.py index ff6874c62..7a2c00fb4 100644 --- a/src/monty/json.py +++ b/src/monty/json.py @@ -37,7 +37,7 @@ try: import orjson except ImportError: - orjson = None + orjson = None # type: ignore[assignment] __version__ = "3.0.0" diff --git a/src/monty/re.py b/src/monty/re.py index 7fddbd72f..76152ff13 100644 --- a/src/monty/re.py +++ b/src/monty/re.py @@ -62,5 +62,5 @@ def regrep( with contextlib.suppress(Exception): # Try to close open file handle. Pass if it is a generator. - gen.close() # type: ignore[attr-defined] + gen.close() # type: ignore[attr-defined, union-attr] return matches diff --git a/tests/test_files/3000_lines.txt b/tests/test_files/3000_lines.txt index 2f8c055bc..1127304b4 100644 --- a/tests/test_files/3000_lines.txt +++ b/tests/test_files/3000_lines.txt @@ -2997,4 +2997,4 @@ 2997 2998 2999 -3000 \ No newline at end of file +3000 diff --git a/tests/test_files/3000_lines.txt.bz2 b/tests/test_files/3000_lines.txt.bz2 index 3a028199a..177141f0a 100644 Binary files a/tests/test_files/3000_lines.txt.bz2 and b/tests/test_files/3000_lines.txt.bz2 differ diff --git a/tests/test_files/3000_lines.txt.gz b/tests/test_files/3000_lines.txt.gz index fd8a1bf51..cc9ecf78b 100644 Binary files a/tests/test_files/3000_lines.txt.gz and b/tests/test_files/3000_lines.txt.gz differ diff --git a/tests/test_files/3000lines.txt.gz b/tests/test_files/3000lines.txt.gz deleted file mode 100644 index 55e186f55..000000000 Binary files a/tests/test_files/3000lines.txt.gz and /dev/null differ diff --git a/tests/test_io.py b/tests/test_io.py index 10ad865ba..f4422bf2a 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -1,10 +1,10 @@ from __future__ import annotations +import bz2 import gzip import os import warnings from pathlib import Path -from unittest.mock import patch import pytest @@ -12,6 +12,7 @@ EncodingWarning, FileLock, FileLockException, + _get_line_ending, reverse_readfile, reverse_readline, zopen, @@ -21,7 +22,112 @@ TEST_DIR = os.path.join(os.path.dirname(__file__), "test_files") +class TestGetLineEnding: + @pytest.mark.parametrize("l_end", ["\n", "\r\n"]) + def test_get_line_ending(self, l_end): + """Test files with: + Unix line ending (\n). + Windows line ending (\r\n). + + For: + - Text file: both text mode and binary mode + - gzip file and bzip2 file + """ + test_file = "test_l_end.txt" + test_line = f"This is a test{l_end}Second line{l_end}".encode() + + with ScratchDir("."): + with open(test_file, "wb") as f: + f.write(test_line) + + assert _get_line_ending(test_file) == l_end + assert _get_line_ending(Path(test_file)) == l_end + + # Test text mode + with open(test_file, "r", encoding="utf-8") as f: + start_pos = f.tell() + assert _get_line_ending(f) == l_end + assert f.tell() == start_pos + + # Test binary mode + with open(test_file, "rb") as f: + start_pos = f.tell() + assert _get_line_ending(f) == l_end + assert f.tell() == start_pos + + # Test gzip file + gzip_filename = f"{test_file}.gz" + with gzip.open(gzip_filename, "wb") as f: + f.write(test_line) + + # Opened file stream + with gzip.open(gzip_filename, "rb") as f: + start_pos = f.tell() + assert _get_line_ending(f) == l_end + assert f.tell() == start_pos + + # Filename directly + assert _get_line_ending(gzip_filename) == l_end + + # Test bzip2 file stream + bz2_filename = f"{test_file}.bz2" + with bz2.open(bz2_filename, "wb") as f: + f.write(test_line) + + # Opened file stream + with bz2.open(bz2_filename, "rb") as f: + start_pos = f.tell() + assert _get_line_ending(f) == l_end + assert f.tell() == start_pos + + # Filename directly + assert _get_line_ending(bz2_filename) == l_end + + @pytest.mark.parametrize("l_end", ["\n", "\r\n"]) + def test_miss_last_l_end(self, l_end): + """Make sure this still works if the last l_end is missing.""" + test_line = f"This is a test{l_end}Second line".encode() + test_file = "test_l_end.txt" + + with ScratchDir("."): + with open(test_file, "wb") as f: + f.write(test_line) + + assert _get_line_ending(test_file) == l_end + + def test_unknown_file_type(self): + unknown_file = 123 + + with pytest.raises(TypeError, match="Unknown file type int"): + _get_line_ending(unknown_file) + + def test_empty_file(self): + with ScratchDir("."): + test_file = "empty_file.txt" + open(test_file, "w").close() + + with pytest.warns(match="File is empty, return Unix line ending \n"): + assert _get_line_ending(test_file) == "\n" + + def test_unknown_line_ending(self): + with ScratchDir("."): + test_file = "test_unknown.txt" + with open(test_file, "wb") as f: + f.write(b"This is a test\036") + + with pytest.raises(ValueError, match="Unknown line ending"): + _get_line_ending(test_file) + + class TestReverseReadline: + """WARNING for future coder: + "reverse_readline" has two branches, one is the in-RAM + reverse reading for un-supported file types or small files. + As the default RAM threshold is "big" at around 4 MB (usually + people just write a few lines to test), you could easily be + testing/debugging the in-RAM branch all the time (me for example). + """ + NUMLINES = 3000 def test_reverse_readline(self): @@ -30,23 +136,49 @@ def test_reverse_readline(self): order, i.e. the first line that is read corresponds to the last line. number """ - with open(os.path.join(TEST_DIR, "3000_lines.txt"), encoding="utf-8") as f: + # Test text mode + with open( + os.path.join(TEST_DIR, "3000_lines.txt"), encoding="utf-8", newline="" + ) as f: for idx, line in enumerate(reverse_readline(f)): - assert ( - int(line) == self.NUMLINES - idx - ), f"read_backwards read {line} whereas it should have read {self.NUMLINES - idx}" + assert isinstance(line, str) + assert line == f"{str(self.NUMLINES - idx)}{os.linesep}" - def test_reverse_readline_fake_big(self): + # Test binary mode + with open(os.path.join(TEST_DIR, "3000_lines.txt"), mode="rb") as f: + for idx, line in enumerate(reverse_readline(f)): + assert line == f"{str(self.NUMLINES - idx)}{os.linesep}" + + @pytest.mark.parametrize("l_end", ["\n", "\r\n"]) + def test_big_file(self, l_end): """ - Make sure that large text files are read properly. + Test read big file. + + A file of 300,000 lines is about 2 MB, but the default max_mem + is still around 4 MB, so we have to reduce it. """ - with open(os.path.join(TEST_DIR, "3000_lines.txt"), encoding="utf-8") as f: - for idx, line in enumerate(reverse_readline(f, max_mem=0)): - assert ( - int(line) == self.NUMLINES - idx - ), f"read_backwards read {line} whereas it should have read {self.NUMLINES - idx}" + file_name = "big_file.txt" + num_lines = 300_000 + + with ScratchDir("."): + # Write test file (~ 2 MB) + with open(file_name, "wb") as file: + for num in range(1, num_lines + 1): + file.write(f"{num}{l_end}".encode()) + + assert os.path.getsize(file_name) > 1_000_000 # 1 MB - def test_reverse_readline_bz2(self): + # Test text mode + with open(file_name, mode="r", encoding="utf-8", newline="") as file: + for idx, line in enumerate(reverse_readline(file, max_mem=4096)): + assert line == f"{str(num_lines - idx)}{l_end}" + + # Test binary mode + with open(file_name, mode="rb") as file: + for idx, line in enumerate(reverse_readline(file, max_mem=4096)): + assert line == f"{str(num_lines - idx)}{l_end}" + + def test_read_bz2(self): """ Make sure a file containing line numbers is read in reverse order, i.e. the first line that is read corresponds to the last line number. @@ -54,58 +186,100 @@ def test_reverse_readline_bz2(self): lines = [] with zopen(os.path.join(TEST_DIR, "myfile_bz2.bz2"), "rb") as f: for line in reverse_readline(f): - lines.append(line.strip()) - assert lines[-1].strip(), ["HelloWorld." in b"HelloWorld."] + lines.append(line) + assert lines == ["\n", "HelloWorld.\n"] # test file has one empty line - def test_empty_file(self): + def test_read_empty_file(self): """ Make sure an empty file does not throw an error when reverse_readline is called, which was a problem with an earlier implementation. """ - with open(os.path.join(TEST_DIR, "empty_file.txt"), encoding="utf-8") as f: - for _line in reverse_readline(f): - raise ValueError("an empty file is being read!") - - @pytest.fixture() - def test_line_ending(self): - contents = ("Line1", "Line2", "Line3") - - # Mock Linux/MacOS - with patch("os.name", "posix"): - linux_line_end = os.linesep - assert linux_line_end == "\n" - - with ScratchDir("./test_files"): - with open( - "sample_unix_mac.txt", "w", newline=linux_line_end, encoding="utf-8" - ) as file: - file.write(linux_line_end.join(contents)) - - with open("sample_unix_mac.txt", encoding="utf-8") as file: - for idx, line in enumerate(reverse_readfile(file)): - assert line == contents[len(contents) - idx - 1] - - # Mock Windows - with patch("os.name", "nt"): - windows_line_end = os.linesep - assert windows_line_end == "\r\n" - - with ScratchDir("./test_files"): - with open( - "sample_windows.txt", - "w", - newline=windows_line_end, - encoding="utf-8", - ) as file: - file.write(windows_line_end.join(contents)) - - with open("sample_windows.txt", encoding="utf-8") as file: - for idx, line in enumerate(reverse_readfile(file)): - assert line == contents[len(contents) - idx - 1] + with pytest.warns(match="File is empty, return Unix line ending \n."): + with open(os.path.join(TEST_DIR, "empty_file.txt"), encoding="utf-8") as f: + for _line in reverse_readline(f): + pytest.fail("No error should be thrown.") + + @pytest.mark.parametrize("ram", [4, 4096, 4_0000_000]) + @pytest.mark.parametrize("l_end", ["\n", "\r\n"]) + def test_read_file_with_empty_lines(self, l_end, ram): + """Empty lines should not be skipped. + Using a very small RAM size to force non in-RAM mode. + """ + contents = (f"line1{l_end}", f"{l_end}", f"line3{l_end}") + filename = "test_empty_line.txt" + + with ScratchDir("."), warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", message="max_mem=4 smaller than blk_size=4096" + ) + # Test text file + with open(filename, "wb") as file: + for line in contents: + file.write(line.encode()) + + with open(filename, mode="r", newline="") as file: + revert_contents = tuple(reverse_readline(file, max_mem=ram)) + assert revert_contents[::-1] == contents + + # Test gzip file + gzip_filename = f"{filename}.gz" + with gzip.open(gzip_filename, "w") as file_out: + for line in contents: + file_out.write(line.encode()) + + with gzip.open(gzip_filename) as g_file: + revert_contents_gzip = tuple(reverse_readline(g_file)) + assert revert_contents_gzip[::-1] == contents + + # Test bzip2 file + bz2_filename = f"{filename}.bz2" + with bz2.open(bz2_filename, "w") as file_out: + for line in contents: + file_out.write(line.encode()) + + with bz2.open(bz2_filename) as b_file: + revert_contents_bz2 = tuple(reverse_readline(b_file)) + assert revert_contents_bz2[::-1] == contents + + @pytest.mark.parametrize("ram", [4, 4096, 4_0000_000]) + @pytest.mark.parametrize("l_end", ["\n", "\r\n"]) + def test_different_line_endings(self, l_end, ram): + """Using a very small RAM size to force non in-RAM mode.""" + contents = (f"Line1{l_end}", f"Line2{l_end}", f"Line3{l_end}") + file_name = "test_file.txt" + + with ScratchDir("."), warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", message="max_mem=4 smaller than blk_size=4096" + ) + + with open(file_name, "wb") as file: + for line in contents: + file.write(line.encode()) + + # Test text mode + with open(file_name, "r", encoding="utf-8") as file: + for idx, line in enumerate(reverse_readline(file, max_mem=ram)): + # OS would automatically change line ending in text mode + assert ( + line.rstrip(os.linesep) + l_end + == contents[len(contents) - idx - 1] + ) + assert isinstance(line, str) + + # Test binary mode + with open(file_name, "rb") as file: + for idx, line in enumerate(reverse_readline(file)): + assert line == contents[len(contents) - idx - 1] + + @pytest.mark.parametrize("file", ["./file", Path("./file")]) + def test_illegal_file_type(self, file): + with pytest.raises(TypeError, match="expect a file stream, not file name"): + next(reverse_readline(file)) class TestReverseReadfile: - NUMLINES = 3000 + NUM_LINES = 3000 def test_reverse_readfile(self): """ @@ -114,68 +288,87 @@ def test_reverse_readfile(self): """ fname = os.path.join(TEST_DIR, "3000_lines.txt") for idx, line in enumerate(reverse_readfile(fname)): - assert int(line) == self.NUMLINES - idx + assert isinstance(line, str) + # OS would automatically convert line ending in text mode + assert line == f"{str(self.NUM_LINES - idx)}{os.linesep}" - def test_reverse_readfile_gz(self): + def test_read_gz(self): """ Make sure a file containing line numbers is read in reverse order, i.e. the first line that is read corresponds to the last line number. """ fname = os.path.join(TEST_DIR, "3000_lines.txt.gz") for idx, line in enumerate(reverse_readfile(fname)): - assert int(line) == self.NUMLINES - idx + assert isinstance(line, str) + assert line == f"{str(self.NUM_LINES - idx)}\n" - def test_reverse_readfile_bz2(self): + def test_read_bz2(self): """ Make sure a file containing line numbers is read in reverse order, i.e. the first line that is read corresponds to the last line number. """ fname = os.path.join(TEST_DIR, "3000_lines.txt.bz2") for idx, line in enumerate(reverse_readfile(fname)): - assert int(line) == self.NUMLINES - idx + assert isinstance(line, str) + assert line == f"{str(self.NUM_LINES - idx)}\n" - def test_empty_file(self): + def test_read_empty_file(self): """ Make sure an empty file does not throw an error when reverse_readline is called, which was a problem with an earlier implementation. """ - for _line in reverse_readfile(os.path.join(TEST_DIR, "empty_file.txt")): - raise ValueError("an empty file is being read!") + with ( + pytest.warns(match="File is empty, return Unix line ending \n."), + pytest.warns(match="trying to mmap an empty file"), + ): + for _line in reverse_readfile(os.path.join(TEST_DIR, "empty_file.txt")): + pytest.fail("No error should be thrown.") - @pytest.fixture - def test_line_ending(self): - contents = ("Line1", "Line2", "Line3") + @pytest.mark.parametrize("l_end", ["\n", "\r\n"]) + def test_read_file_with_empty_lines(self, l_end): + """Empty lines should not be skipped.""" + contents = (f"line1{l_end}", f"{l_end}", f"line3{l_end}") + filename = "test_empty_line.txt" - # Mock Linux/MacOS - with patch("os.name", "posix"): - linux_line_end = os.linesep - assert linux_line_end == "\n" + with ScratchDir("."): + # Test text file + with open(filename, "w", newline="", encoding="utf-8") as file: + for line in contents: + file.write(line) - with ScratchDir("./test_files"): - with open( - "sample_unix_mac.txt", "w", newline=linux_line_end, encoding="utf-8" - ) as file: - file.write(linux_line_end.join(contents)) + revert_contents = tuple(reverse_readfile(filename)) + assert revert_contents[::-1] == contents - for idx, line in enumerate(reverse_readfile("sample_unix_mac.txt")): - assert line == contents[len(contents) - idx - 1] + # Test gzip file + gzip_filename = f"{filename}.gz" + with gzip.open(gzip_filename, "w") as file_out: + for line in contents: + file_out.write(line.encode()) - # Mock Windows - with patch("os.name", "nt"): - windows_line_end = os.linesep - assert windows_line_end == "\r\n" - - with ScratchDir("./test_files"): - with open( - "sample_windows.txt", - "w", - newline=windows_line_end, - encoding="utf-8", - ) as file: - file.write(windows_line_end.join(contents)) - - for idx, line in enumerate(reverse_readfile("sample_windows.txt")): - assert line == contents[len(contents) - idx - 1] + revert_contents_gzip = tuple(reverse_readfile(gzip_filename)) + assert revert_contents_gzip[::-1] == contents + + # Test bzip2 file + bz2_filename = f"{filename}.bz2" + with bz2.open(bz2_filename, "w") as file_out: + for line in contents: + file_out.write(line.encode()) + + revert_contents_bz2 = tuple(reverse_readfile(bz2_filename)) + assert revert_contents_bz2[::-1] == contents + + @pytest.mark.parametrize("l_end", ["\n", "\r\n"]) + def test_different_line_endings(self, l_end): + contents = (f"Line1{l_end}", f"Line2{l_end}", f"Line3{l_end}") + filename = "test_file.txt" + + with ScratchDir("."): + with open(filename, "w", newline="", encoding="utf-8") as file: + for line in contents: + file.write(line) + + revert_contents = tuple(reverse_readfile(filename)) + assert revert_contents[::-1] == contents class TestZopen: diff --git a/tests/test_multiprocessing.py b/tests/test_multiprocessing.py index e3fe1b9fe..e507c321b 100644 --- a/tests/test_multiprocessing.py +++ b/tests/test_multiprocessing.py @@ -6,12 +6,12 @@ def test_imap_tqdm(): - results = imap_tqdm(4, sqrt, range(10000)) - assert len(results) == 10000 + results = imap_tqdm(4, sqrt, range(10_000)) + assert len(results) == 10_000 assert results[0] == 0 assert results[400] == 20 assert results[9999] == 99.99499987499375 - results = imap_tqdm(4, sqrt, (i**2 for i in range(10000))) - assert len(results) == 10000 + results = imap_tqdm(4, sqrt, (i**2 for i in range(10_000))) + assert len(results) == 10_000 assert results[0] == 0 assert results[400] == 400 diff --git a/tests/test_shutil.py b/tests/test_shutil.py index 1ece739e0..81e0ea859 100644 --- a/tests/test_shutil.py +++ b/tests/test_shutil.py @@ -35,7 +35,7 @@ def setup_method(self): os.path.join(test_dir, "cpr_src", "sub", "testr"), "w", encoding="utf-8" ) as f: f.write("what2") - if os.name != "nt": + if platform.system() != "Windows": os.symlink( os.path.join(test_dir, "cpr_src", "test"), os.path.join(test_dir, "cpr_src", "mysymlink"), diff --git a/tests/test_tempfile.py b/tests/test_tempfile.py index eae851891..70ed8d4b1 100644 --- a/tests/test_tempfile.py +++ b/tests/test_tempfile.py @@ -1,6 +1,7 @@ from __future__ import annotations import os +import platform import shutil import pytest @@ -121,7 +122,7 @@ def test_no_copy(self): assert "scratch_text" not in files def test_symlink(self): - if os.name != "nt": + if platform.system() != "Windows": with ScratchDir( self.scratch_root, copy_from_current_on_enter=False,