From 18eeda9a9e1e893b77787bf00282e5773a750518 Mon Sep 17 00:00:00 2001 From: immerrr Date: Fri, 12 Nov 2021 17:08:03 +0100 Subject: [PATCH 1/2] Enable brotli decompression if it is available --- tests/integration/test_filter.py | 17 +++++++++++++++++ vcr/filters.py | 31 ++++++++++++++++++++++++++----- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/tests/integration/test_filter.py b/tests/integration/test_filter.py index f036dd48a..c2a0a23f8 100644 --- a/tests/integration/test_filter.py +++ b/tests/integration/test_filter.py @@ -7,6 +7,7 @@ import pytest import vcr +from vcr.filters import brotli from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes @@ -138,6 +139,22 @@ def test_decompress_deflate(tmpdir, httpbin): assert_is_json_bytes(decoded_response) +def test_decompress_brotli(tmpdir, httpbin): + if brotli is None: + # XXX: this is never true, because brotlipy is installed with "httpbin" + pytest.skip("Brotli is not installed") + + url = httpbin.url + "/brotli" + request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]}) + cass_file = str(tmpdir.join("brotli_response.yaml")) + with vcr.use_cassette(cass_file, decode_compressed_response=True): + urlopen(request) + with vcr.use_cassette(cass_file) as cass: + decoded_response = urlopen(url).read() + assert_cassette_has_one_response(cass) + assert_is_json_bytes(decoded_response) + + def test_decompress_regular(tmpdir, httpbin): """Test that it doesn't try to decompress content that isn't compressed""" url = httpbin.url + "/get" diff --git a/vcr/filters.py b/vcr/filters.py index 7f33155e8..efb9b2938 100644 --- a/vcr/filters.py +++ b/vcr/filters.py @@ -6,6 +6,20 @@ from .util import CaseInsensitiveDict +try: + # This supports both brotli & brotlipy packages + import brotli +except ImportError: + try: + import brotlicffi as brotli + except ImportError: + brotli = None + + +AVAILABLE_DECOMPRESSORS = {"gzip", "deflate"} +if brotli is not None: + AVAILABLE_DECOMPRESSORS.add("br") + def replace_headers(request, replacements): """Replace headers in request according to replacements. @@ -136,15 +150,16 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove): def decode_response(response): """ - If the response is compressed with gzip or deflate: + If the response is compressed with any supported compression (gzip, + deflate, br if available): 1. decompress the response body 2. delete the content-encoding header 3. update content-length header to decompressed length """ - def is_compressed(headers): + def is_decompressable(headers): encoding = headers.get("content-encoding", []) - return encoding and encoding[0] in ("gzip", "deflate") + return encoding and encoding[0] in AVAILABLE_DECOMPRESSORS def decompress_body(body, encoding): """Returns decompressed body according to encoding using zlib. @@ -157,17 +172,23 @@ def decompress_body(body, encoding): return zlib.decompress(body, zlib.MAX_WBITS | 16) except zlib.error: return body # assumes that the data was already decompressed - else: # encoding == 'deflate' + elif encoding == 'deflate': try: return zlib.decompress(body) except zlib.error: return body # assumes that the data was already decompressed + else: # encoding == 'br' + try: + return brotli.decompress(body) + except brotli.error: + return body # assumes that the data was already decompressed + # Deepcopy here in case `headers` contain objects that could # be mutated by a shallow copy and corrupt the real response. response = copy.deepcopy(response) headers = CaseInsensitiveDict(response["headers"]) - if is_compressed(headers): + if is_decompressable(headers): encoding = headers["content-encoding"][0] headers["content-encoding"].remove(encoding) if not headers["content-encoding"]: From 2b93d2af107b95f27ade6f85ca3065933e671810 Mon Sep 17 00:00:00 2001 From: immerrr Date: Wed, 30 Apr 2025 09:30:29 +0200 Subject: [PATCH 2/2] Apply PR feedback --- vcr/filters.py | 85 +++++++++++++++++++++++++++----------------------- 1 file changed, 46 insertions(+), 39 deletions(-) diff --git a/vcr/filters.py b/vcr/filters.py index efb9b2938..2f97d0960 100644 --- a/vcr/filters.py +++ b/vcr/filters.py @@ -16,9 +16,38 @@ brotli = None -AVAILABLE_DECOMPRESSORS = {"gzip", "deflate"} +def decompress_deflate(body): + try: + return zlib.decompress(body) + except zlib.error: + # Assume the response was already decompressed + return body + + +def decompress_gzip(body): + # To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16. + try: + return zlib.decompress(body, zlib.MAX_WBITS | 16) + except zlib.error: + # Assume the response was already decompressed + return body + + +AVAILABLE_DECOMPRESSORS = { + "deflate": decompress_deflate, + "gzip": decompress_gzip, +} + if brotli is not None: - AVAILABLE_DECOMPRESSORS.add("br") + + def decompress_brotli(body): + try: + return brotli.decompress(body) + except brotli.error: + # Assume the response was already decompressed + return body + + AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli def replace_headers(request, replacements): @@ -157,45 +186,23 @@ def decode_response(response): 3. update content-length header to decompressed length """ - def is_decompressable(headers): - encoding = headers.get("content-encoding", []) - return encoding and encoding[0] in AVAILABLE_DECOMPRESSORS - - def decompress_body(body, encoding): - """Returns decompressed body according to encoding using zlib. - to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16 - """ - if not body: - return "" - if encoding == "gzip": - try: - return zlib.decompress(body, zlib.MAX_WBITS | 16) - except zlib.error: - return body # assumes that the data was already decompressed - elif encoding == 'deflate': - try: - return zlib.decompress(body) - except zlib.error: - return body # assumes that the data was already decompressed - else: # encoding == 'br' - try: - return brotli.decompress(body) - except brotli.error: - return body # assumes that the data was already decompressed - - # Deepcopy here in case `headers` contain objects that could # be mutated by a shallow copy and corrupt the real response. response = copy.deepcopy(response) headers = CaseInsensitiveDict(response["headers"]) - if is_decompressable(headers): - encoding = headers["content-encoding"][0] - headers["content-encoding"].remove(encoding) - if not headers["content-encoding"]: - del headers["content-encoding"] - - new_body = decompress_body(response["body"]["string"], encoding) - response["body"]["string"] = new_body - headers["content-length"] = [str(len(new_body))] - response["headers"] = dict(headers) + content_encoding = headers.get("content-encoding") + if not content_encoding: + return response + decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0]) + if not decompressor: + return response + + headers["content-encoding"].remove(content_encoding[0]) + if not headers["content-encoding"]: + del headers["content-encoding"] + + new_body = decompressor(response["body"]["string"]) + response["body"]["string"] = new_body + headers["content-length"] = [str(len(new_body))] + response["headers"] = dict(headers) return response