diff --git a/docs/pages/api-reference.md b/docs/pages/api-reference.md index 75d2935..95a0f58 100644 --- a/docs/pages/api-reference.md +++ b/docs/pages/api-reference.md @@ -263,6 +263,36 @@ return `/home/somedir/config/config.json`. Credits to https://stackoverflow.com/a/59004672/8255842 +##### `read_last_n_lines` + +```python +def read_last_n_lines(file_path: str, + n: int, + ignore_trailing_whitespace: bool = False) -> List[str] +``` + +Read the last `n` lines of a file. + + The function returns less than `n` lines if the file has less than `n` lines. + The last element in the list is the last line of the file. + + This function uses seeking in order not to read the full file. The simple + approach of reading the last n lines would be: + + ```python + # read the last 10 lines + with open(path, "r") as f: + return f.read().split(" +")[:-10] + ``` + + However, this would read the full file and if we only need to read 10 lines + out of a 2GB file, this would be a big waste of resources. + + The `ignore_trailing_whitespace` option to crop off trailing whitespace, i.e. + only return the last `n` lines that are not empty or only contain whitespace. + + ## `tum_esm_utils.github` Functions for interacting with GitHub. diff --git a/tests/test_files.py b/tests/test_files.py index 53735b3..01e4ebf 100644 --- a/tests/test_files.py +++ b/tests/test_files.py @@ -1,5 +1,5 @@ import os -import shutil +import tempfile import polars as pl import tum_esm_utils @@ -75,3 +75,63 @@ def test_rel_to_abs_path() -> None: os.path.dirname(os.path.abspath(__file__)), "tests", "data", "some.csv" ) assert a1 == expected + + +def test_read_last_n_lines() -> None: + with tempfile.TemporaryDirectory() as d: + filepath = os.path.join(d, "file.txt") + with open(filepath, "w") as f: + for i in range(10): + f.write(f"{i} {'c'*(i+1)}\n") + + r1 = tum_esm_utils.files.read_last_n_lines( + filepath, + 3, + ignore_trailing_whitespace=True, + ) + print(f"r1 = {r1}") + assert r1 == [ + "7 cccccccc", + "8 ccccccccc", + "9 cccccccccc", + ] + + r2 = tum_esm_utils.files.read_last_n_lines( + filepath, + 3, + ignore_trailing_whitespace=False, + ) + print(f"r2 = {r2}") + assert r2 == [ + "8 ccccccccc", + "9 cccccccccc", + "", + ] + + with open(filepath, "w") as f: + for i in range(3): + f.write(f"{i} {'c'*(i+1)}\n") + + r3 = tum_esm_utils.files.read_last_n_lines( + filepath, + 2, + ignore_trailing_whitespace=True, + ) + print(f"r3 = {r3}") + assert r3 == ["1 cc", "2 ccc"] + + r4 = tum_esm_utils.files.read_last_n_lines( + filepath, + 3, + ignore_trailing_whitespace=True, + ) + print(f"r4 = {r4}") + assert r4 == ["0 c", "1 cc", "2 ccc"] + + r5 = tum_esm_utils.files.read_last_n_lines( + filepath, + 4, + ignore_trailing_whitespace=True, + ) + print(f"r5 = {r5}") + assert r5 == ["0 c", "1 cc", "2 ccc"] diff --git a/tum_esm_utils/files.py b/tum_esm_utils/files.py index 35daedc..cb05460 100644 --- a/tum_esm_utils/files.py +++ b/tum_esm_utils/files.py @@ -6,7 +6,7 @@ from __future__ import annotations import traceback -from typing import Any, Optional +from typing import Any, List, Optional import hashlib import json import os @@ -156,3 +156,62 @@ def rel_to_abs_path(*path: str) -> str: *path, ) ) + + +def read_last_n_lines( + file_path: str, + n: int, + ignore_trailing_whitespace: bool = False, +) -> List[str]: + """Read the last `n` lines of a file. + + The function returns less than `n` lines if the file has less than `n` lines. + The last element in the list is the last line of the file. + + This function uses seeking in order not to read the full file. The simple + approach of reading the last n lines would be: + + ```python + # read the last 10 lines + with open(path, "r") as f: + return f.read().split("\n")[:-10] + ``` + + However, this would read the full file and if we only need to read 10 lines + out of a 2GB file, this would be a big waste of resources. + + The `ignore_trailing_whitespace` option to crop off trailing whitespace, i.e. + only return the last `n` lines that are not empty or only contain whitespace.""" + + with open(file_path, "rb") as f: + f.seek(-1, os.SEEK_END) + + if ignore_trailing_whitespace: + while f.read(1) in [b"\n", b" ", b"\t"]: + try: + f.seek(-2, os.SEEK_CUR) + except OSError: + # reached the beginning of the file + return [""] + + f.seek(-1, os.SEEK_CUR) + # now the cursor is right before the last + # character that is not a newline or a space + + last_characters: bytes = b"" + new_line_chars_seen: int = 0 + + while True: + try: + new_character = f.read(1) + if new_character == b"\n": + new_line_chars_seen += 1 + if new_line_chars_seen == n: + break + last_characters += new_character + f.seek(-2, os.SEEK_CUR) + except OSError: + # reached the beginning of the file + break + + return last_characters.decode()[::-1].split("\n")