diff --git a/pypdf/filters.py b/pypdf/filters.py index bec1153f2..39e66b466 100644 --- a/pypdf/filters.py +++ b/pypdf/filters.py @@ -291,6 +291,65 @@ def decode( return retval +class RunLengthDecode: + """ + The RunLengthDecode filter decodes data that has been encoded in a + simple byte-oriented format based on run length. + The encoded data is a sequence of runs, where each run consists of + a length byte followed by 1 to 128 bytes of data. If the length byte is + in the range 0 to 127, + the following length + 1 (1 to 128) bytes are copied literally during + decompression. + If length is in the range 129 to 255, the following single byte is to be + copied 257 − length (2 to 128) times during decompression. A length value + of 128 denotes EOD. + """ + + @staticmethod + def decode( + data: bytes, + decode_parms: Union[None, ArrayObject, DictionaryObject] = None, + **kwargs: Any, + ) -> bytes: + """ + Decode an ASCII-Hex encoded data stream. + + Args: + data: a bytes sequence of length/data + decode_parms: ignored. + + Returns: + A bytes decompressed sequence. + + Raises: + PdfStreamError: + """ + if "decodeParms" in kwargs: # deprecated + deprecate_with_replacement("decodeParms", "parameters", "4.0.0") + decode_parms = kwargs["decodeParms"] # noqa: F841 + lst = [] + index = 0 + while True: + if index >= len(data): + raise PdfStreamError("Unexpected EOD in RunLengthDecode") + length = data[index] + index += 1 + if length == 128: + if index < len(data): + raise PdfStreamError("early EOD in RunLengthDecode") + else: + break + elif length < 128: + length += 1 + lst.append(data[index : (index + length)]) + index += length + else: # >128 + length = 257 - length + lst.append(bytes((data[index],)) * length) + index += 1 + return b"".join(lst) + + class LZWDecode: """ Taken from: @@ -582,7 +641,7 @@ def decode_stream_data(stream: Any) -> Union[str, bytes]: # utils.StreamObject This function decodes the stream data using the filters provided in the stream. It supports various filter types, including FlateDecode, - ASCIIHexDecode, LZWDecode, ASCII85Decode, DCTDecode, JPXDecode, and + ASCIIHexDecode, RunLengthDecode, LZWDecode, ASCII85Decode, DCTDecode, JPXDecode, and CCITTFaxDecode. Args: @@ -613,6 +672,8 @@ def decode_stream_data(stream: Any) -> Union[str, bytes]: # utils.StreamObject data = FlateDecode.decode(data, params) elif filter_type in (FT.ASCII_HEX_DECODE, FTA.AHx): data = ASCIIHexDecode.decode(data) # type: ignore + elif filter_type in (FT.RUN_LENGTH_DECODE, FTA.RL): + data = RunLengthDecode.decode(data) elif filter_type in (FT.LZW_DECODE, FTA.LZW): data = LZWDecode.decode(data, params) # type: ignore elif filter_type in (FT.ASCII_85_DECODE, FTA.A85): diff --git a/tests/test_filters.py b/tests/test_filters.py index 9f52e5d7a..f1dc38baf 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -555,6 +555,34 @@ def test_gray_devicen_cmyk(): assert d < 0.001 +@pytest.mark.enable_socket() +def test_runlengthdecode(): + """From #1954, test with 2bits image. TODO: 4bits also""" + url = "https://github.com/py-pdf/pypdf/files/12159941/out.pdf" + name = "RunLengthDecode.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + url_png = "https://user-images.githubusercontent.com/4083478/255940800-6d63972e-a3d6-4cf9-aa6f-0793af24cded.png" + name_png = "RunLengthDecode.png" + refimg = Image.open( + BytesIO(get_pdf_from_url(url_png, name=name_png)) + ) # not a pdf but it works + data = reader.pages[0].images[0] + diff = ImageChops.difference(data.image, refimg) + d = sqrt(sum([(a * a) for a in diff.getdata()])) / (diff.size[0] * diff.size[1]) + assert d < 0.001 + url = "https://github.com/py-pdf/pypdf/files/12162905/out.pdf" + name = "FailedRLE1.pdf" + reader = PdfReader(BytesIO(get_pdf_from_url(url, name=name))) + with pytest.raises(PdfStreamError) as exc: + reader.pages[0].images[0] + assert exc.value.args[0] == "Unexpected EOD in RunLengthDecode" + url = "https://github.com/py-pdf/pypdf/files/12162926/out.pdf" + name = "FailedRLE2.pdf" + with pytest.raises(PdfStreamError) as exc: + reader.pages[0].images[0] + assert exc.value.args[0] == "Unexpected EOD in RunLengthDecode" + + @pytest.mark.enable_socket() def test_gray_separation_cmyk(): """