Skip to content

Enable brotli decompression if it is available #620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions tests/integration/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytest

import vcr
from vcr.filters import brotli

from ..assertions import assert_cassette_has_one_response, assert_is_json_bytes

Expand Down Expand Up @@ -138,6 +139,22 @@ def test_decompress_deflate(tmpdir, httpbin):
assert_is_json_bytes(decoded_response)


def test_decompress_brotli(tmpdir, httpbin):
if brotli is None:
# XXX: this is never true, because brotlipy is installed with "httpbin"
pytest.skip("Brotli is not installed")

url = httpbin.url + "/brotli"
request = Request(url, headers={"Accept-Encoding": ["gzip, deflate, br"]})
cass_file = str(tmpdir.join("brotli_response.yaml"))
with vcr.use_cassette(cass_file, decode_compressed_response=True):
urlopen(request)
with vcr.use_cassette(cass_file) as cass:
decoded_response = urlopen(url).read()
assert_cassette_has_one_response(cass)
assert_is_json_bytes(decoded_response)


def test_decompress_regular(tmpdir, httpbin):
"""Test that it doesn't try to decompress content that isn't compressed"""
url = httpbin.url + "/get"
Expand Down
92 changes: 60 additions & 32 deletions vcr/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,49 @@

from .util import CaseInsensitiveDict

try:
# This supports both brotli & brotlipy packages
import brotli
except ImportError:
try:
import brotlicffi as brotli
except ImportError:
brotli = None


def decompress_deflate(body):
try:
return zlib.decompress(body)
except zlib.error:
# Assume the response was already decompressed
return body


def decompress_gzip(body):
# To (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16.
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
# Assume the response was already decompressed
return body


AVAILABLE_DECOMPRESSORS = {
"deflate": decompress_deflate,
"gzip": decompress_gzip,
}

if brotli is not None:

def decompress_brotli(body):
try:
return brotli.decompress(body)
except brotli.error:
# Assume the response was already decompressed
return body

AVAILABLE_DECOMPRESSORS["br"] = decompress_brotli

Copy link
Contributor

@CharString CharString Oct 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is easier to expand with future or custom decompressors if AVAILABLE_DECOMPRESSORS is a Dict[[str], Callable[[bytes], str]]

AVAILABLE_DECOMPRESSORS = {
    "brotli": brotli.decompress,
    "deflate": zlib.decompress,
    "gzip": lambda body: zlib.decompress(body, zlib.MAX_WBITS | 16),
}

Then decompress_body can just be

def decompress_body(body, encoding):
    return AVAILABLE_DECOMPRESSORS[encoding](body)

Adding a new scheme will be as easy as adding a function to the dict.


def replace_headers(request, replacements):
"""Replace headers in request according to replacements.
Expand Down Expand Up @@ -136,45 +179,30 @@ def remove_post_data_parameters(request, post_data_parameters_to_remove):

def decode_response(response):
"""
If the response is compressed with gzip or deflate:
If the response is compressed with any supported compression (gzip,
deflate, br if available):
1. decompress the response body
2. delete the content-encoding header
3. update content-length header to decompressed length
"""

def is_compressed(headers):
encoding = headers.get("content-encoding", [])
return encoding and encoding[0] in ("gzip", "deflate")

def decompress_body(body, encoding):
"""Returns decompressed body according to encoding using zlib.
to (de-)compress gzip format, use wbits = zlib.MAX_WBITS | 16
"""
if not body:
return ""
if encoding == "gzip":
try:
return zlib.decompress(body, zlib.MAX_WBITS | 16)
except zlib.error:
return body # assumes that the data was already decompressed
else: # encoding == 'deflate'
try:
return zlib.decompress(body)
except zlib.error:
return body # assumes that the data was already decompressed

# Deepcopy here in case `headers` contain objects that could
# be mutated by a shallow copy and corrupt the real response.
response = copy.deepcopy(response)
headers = CaseInsensitiveDict(response["headers"])
if is_compressed(headers):
encoding = headers["content-encoding"][0]
headers["content-encoding"].remove(encoding)
if not headers["content-encoding"]:
del headers["content-encoding"]

new_body = decompress_body(response["body"]["string"], encoding)
response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers)
content_encoding = headers.get("content-encoding")
if not content_encoding:
return response
decompressor = AVAILABLE_DECOMPRESSORS.get(content_encoding[0])
if not decompressor:
return response

headers["content-encoding"].remove(content_encoding[0])
if not headers["content-encoding"]:
del headers["content-encoding"]

new_body = decompressor(response["body"]["string"])
response["body"]["string"] = new_body
headers["content-length"] = [str(len(new_body))]
response["headers"] = dict(headers)
return response
Loading