diff --git a/docs/source/usage.rst b/docs/source/usage.rst index e4eb9bf..900f0b5 100644 --- a/docs/source/usage.rst +++ b/docs/source/usage.rst @@ -118,9 +118,10 @@ Memento API. We implement a Python client that can speak both. .. automethod:: close .. automethod:: parse_memento_headers -.. autoclass:: wayback.WaybackSession +.. autoclass:: wayback.WaybackHttpAdapter - .. automethod:: reset + .. automethod:: request + .. automethod:: close Utilities --------- @@ -150,4 +151,4 @@ Exception Classes .. autoclass:: wayback.exceptions.WaybackRetryError -.. autoclass:: wayback.exceptions.SessionClosedError +.. autoclass:: wayback.exceptions.AlreadyClosedError diff --git a/src/wayback/__init__.py b/src/wayback/__init__.py index 1a8b5f2..125d50d 100644 --- a/src/wayback/__init__.py +++ b/src/wayback/__init__.py @@ -1,12 +1,13 @@ from ._version import __version__, __version_tuple__ # noqa: F401 -from ._utils import memento_url_data, RateLimit # noqa +from ._utils import memento_url_data, RateLimit # noqa: F401 -from ._models import ( # noqa +from ._models import ( # noqa: F401 CdxRecord, Memento) -from ._client import ( # noqa +from ._client import ( # noqa: F401 Mode, - WaybackClient, - WaybackSession) + WaybackClient) + +from ._http import WaybackHttpAdapter # noqa: F401 diff --git a/src/wayback/_client.py b/src/wayback/_client.py index 0593e8f..e971ad6 100644 --- a/src/wayback/_client.py +++ b/src/wayback/_client.py @@ -19,30 +19,18 @@ import hashlib import logging import re -import requests -from requests.exceptions import (ChunkedEncodingError, - ConnectionError, - ContentDecodingError, - ProxyError, - RetryError, - Timeout) -import time -from urllib.parse import urljoin, urlparse -from urllib3.connectionpool import HTTPConnectionPool -from urllib3.exceptions import (ConnectTimeoutError, - MaxRetryError, - ReadTimeoutError) +from urllib.parse import urljoin from warnings import warn -from . import _utils, __version__ +from . import _utils from ._models import CdxRecord, Memento +from ._http import (WaybackRequestsAdapter, + WaybackHttpAdapter) from .exceptions import (WaybackException, UnexpectedResponseFormat, BlockedByRobotsError, BlockedSiteError, MementoPlaybackError, - NoMementoError, - WaybackRetryError, - RateLimitError) + NoMementoError) logger = logging.getLogger(__name__) @@ -70,16 +58,6 @@ # Make sure it roughly starts with a valid protocol + domain + port? URL_ISH = re.compile(r'^[\w+\-]+://[^/?=&]+\.\w\w+(:\d+)?(/|$)') -# Global default rate limits for various endpoints. Internet Archive folks have -# asked us to set the defaults at 80% of the hard limits. -DEFAULT_CDX_RATE_LIMIT = _utils.RateLimit(0.8 * 60 / 60) -DEFAULT_TIMEMAP_RATE_LIMIT = _utils.RateLimit(0.8 * 100 / 60) -DEFAULT_MEMENTO_RATE_LIMIT = _utils.RateLimit(0.8 * 600 / 60) - -# If a rate limit response (i.e. a response with status == 429) does not -# include a `Retry-After` header, recommend pausing for this long. -DEFAULT_RATE_LIMIT_DELAY = 60 - class Mode(Enum): """ @@ -148,28 +126,12 @@ def is_malformed_url(url): return False -def is_memento_response(response): - return 'Memento-Datetime' in response.headers - - def cdx_hash(content): if isinstance(content, str): content = content.encode() return b32encode(hashlib.sha1(content).digest()).decode() -def read_and_close(response): - # Read content so it gets cached and close the response so - # we can release the connection for reuse. See: - # https://github.com/psf/requests/blob/eedd67462819f8dbf8c1c32e77f9070606605231/requests/sessions.py#L160-L163 - try: - response.content - except (ChunkedEncodingError, ContentDecodingError, RuntimeError): - response.raw.read(decode_content=False) - finally: - response.close() - - REDIRECT_PAGE_PATTERN = re.compile(r'Got an? HTTP 3\d\d response at crawl time', re.IGNORECASE) @@ -271,308 +233,30 @@ def clean_memento_links(links, mode): return result -##################################################################### -# HACK: handle malformed Content-Encoding headers from Wayback. -# When you send `Accept-Encoding: gzip` on a request for a memento, Wayback -# will faithfully gzip the response body. However, if the original response -# from the web server that was snapshotted was gzipped, Wayback screws up the -# `Content-Encoding` header on the memento response, leading any HTTP client to -# *not* decompress the gzipped body. Wayback folks have no clear timeline for -# a fix, hence the workaround here. -# -# More info in this issue: -# https://github.com/edgi-govdata-archiving/web-monitoring-processing/issues/309 -# -# Example broken Wayback URL: -# http://web.archive.org/web/20181023233237id_/http://cwcgom.aoml.noaa.gov/erddap/griddap/miamiacidification.graph -# -if hasattr(HTTPConnectionPool, 'ResponseCls'): - # urllib3 v1.x: - # - # This subclass of urllib3's response class identifies the malformed headers - # and repairs them before instantiating the actual response object, so when - # it reads the body, it knows to decode it correctly. - # - # See what we're overriding from urllib3: - # https://github.com/urllib3/urllib3/blob/a6ec68a5c5c5743c59fe5c62c635c929586c429b/src/urllib3/response.py#L499-L526 - class WaybackResponse(HTTPConnectionPool.ResponseCls): - @classmethod - def from_httplib(cls, httplib_response, **response_kwargs): - headers = httplib_response.msg - pairs = headers.items() - if ('content-encoding', '') in pairs and ('Content-Encoding', 'gzip') in pairs: - del headers['content-encoding'] - headers['Content-Encoding'] = 'gzip' - return super().from_httplib(httplib_response, **response_kwargs) - - HTTPConnectionPool.ResponseCls = WaybackResponse -else: - # urllib3 v2.x: - # - # Unfortunately, we can't wrap the `HTTPConnection.getresponse` method in - # urllib3 v2, since it may have read the response body before returning. So - # we patch the HTTPHeaderDict class instead. - from urllib3._collections import HTTPHeaderDict as Urllib3HTTPHeaderDict - _urllib3_header_init = Urllib3HTTPHeaderDict.__init__ - - def _new_header_init(self, headers=None, **kwargs): - if headers: - if isinstance(headers, (Urllib3HTTPHeaderDict, dict)): - pairs = list(headers.items()) - else: - pairs = list(headers) - if ( - ('content-encoding', '') in pairs and - ('Content-Encoding', 'gzip') in pairs - ): - headers = [pair for pair in pairs - if pair[0].lower() != 'content-encoding'] - headers.append(('Content-Encoding', 'gzip')) - - return _urllib3_header_init(self, headers, **kwargs) - - Urllib3HTTPHeaderDict.__init__ = _new_header_init -# END HACK -##################################################################### - - -class WaybackSession(_utils.DisableAfterCloseSession, requests.Session): - """ - A custom session object that pools network connections and resources for - requests to the Wayback Machine. - - Parameters - ---------- - retries : int, default: 6 - The maximum number of retries for requests. - backoff : int or float, default: 2 - Number of seconds from which to calculate how long to back off and wait - when retrying requests. The first retry is always immediate, but - subsequent retries increase by powers of 2: - - seconds = backoff * 2 ^ (retry number - 1) - - So if this was `4`, retries would happen after the following delays: - 0 seconds, 4 seconds, 8 seconds, 16 seconds, ... - timeout : int or float or tuple of (int or float, int or float), default: 60 - A timeout to use for all requests. - See the Requests docs for more: - https://docs.python-requests.org/en/master/user/advanced/#timeouts - user_agent : str, optional - A custom user-agent string to use in all requests. Defaults to: - `wayback/{version} (+https://github.com/edgi-govdata-archiving/wayback)` - search_calls_per_second : wayback.RateLimit or int or float, default: 0.8 - The maximum number of calls per second made to the CDX search API. - To disable the rate limit, set this to 0. - - To have multiple sessions share a rate limit (so requests made by one - session count towards the limit of the other session), use a - single :class:`wayback.RateLimit` instance and pass it to each - ``WaybackSession`` instance. If you do not set a limit, the default - limit is shared globally across all sessions. - memento_calls_per_second : wayback.RateLimit or int or float, default: 8 - The maximum number of calls per second made to the memento API. - To disable the rate limit, set this to 0. - - To have multiple sessions share a rate limit (so requests made by one - session count towards the limit of the other session), use a - single :class:`wayback.RateLimit` instance and pass it to each - ``WaybackSession`` instance. If you do not set a limit, the default - limit is shared globally across all sessions. - timemap_calls_per_second : wayback.RateLimit or int or float, default: 1.33 - The maximum number of calls per second made to the timemap API (the - Wayback Machine's new, beta CDX search is part of the timemap API). - To disable the rate limit, set this to 0. - - To have multiple sessions share a rate limit (so requests made by one - session count towards the limit of the other session), use a - single :class:`wayback.RateLimit` instance and pass it to each - ``WaybackSession`` instance. If you do not set a limit, the default - limit is shared globally across all sessions. - """ - - # It seems Wayback sometimes produces 500 errors for transient issues, so - # they make sense to retry here. Usually not in other contexts, though. - retryable_statuses = frozenset((413, 421, 500, 502, 503, 504, 599)) - - retryable_errors = (ConnectTimeoutError, MaxRetryError, ReadTimeoutError, - ProxyError, RetryError, Timeout) - # Handleable errors *may* be retryable, but need additional logic beyond - # just the error type. See `should_retry_error()`. - handleable_errors = (ConnectionError,) + retryable_errors - - def __init__(self, retries=6, backoff=2, timeout=60, user_agent=None, - search_calls_per_second=DEFAULT_CDX_RATE_LIMIT, - memento_calls_per_second=DEFAULT_MEMENTO_RATE_LIMIT, - timemap_calls_per_second=DEFAULT_TIMEMAP_RATE_LIMIT): - super().__init__() - self.retries = retries - self.backoff = backoff - self.timeout = timeout - self.headers = { - 'User-Agent': (user_agent or - f'wayback/{__version__} (+https://github.com/edgi-govdata-archiving/wayback)'), - 'Accept-Encoding': 'gzip, deflate' - } - self.rate_limts = { - '/web/timemap': _utils.RateLimit.make_limit(timemap_calls_per_second), - '/cdx': _utils.RateLimit.make_limit(search_calls_per_second), - # The memento limit is actually a generic Wayback limit. - '/': _utils.RateLimit.make_limit(memento_calls_per_second), - } - # NOTE: the nice way to accomplish retry/backoff is with a urllib3: - # adapter = requests.adapters.HTTPAdapter( - # max_retries=Retry(total=5, backoff_factor=2, - # status_forcelist=(503, 504))) - # self.mount('http://', adapter) - # But Wayback mementos can have errors, which complicates things. See: - # https://github.com/urllib3/urllib3/issues/1445#issuecomment-422950868 - # - # Also note that, if we are ever able to switch to that, we may need to - # get more fancy with log filtering, since we *expect* lots of retries - # with Wayback's APIs, but urllib3 logs a warning on every retry: - # https://github.com/urllib3/urllib3/blob/5b047b645f5f93900d5e2fc31230848c25eb1f5f/src/urllib3/connectionpool.py#L730-L737 - - # Customize the built-in `send()` with retryability and rate limiting. - def send(self, request: requests.PreparedRequest, **kwargs): - start_time = time.time() - maximum = self.retries - retries = 0 - - url = urlparse(request.url) - for path, limit in self.rate_limts.items(): - if url.path.startswith(path): - rate_limit = limit - break - else: - rate_limit = DEFAULT_MEMENTO_RATE_LIMIT - - while True: - retry_delay = 0 - try: - logger.debug('sending HTTP request %s "%s", %s', request.method, request.url, kwargs) - rate_limit.wait() - response = super().send(request, **kwargs) - retry_delay = self.get_retry_delay(retries, response) - - if retries >= maximum or not self.should_retry(response): - if response.status_code == 429: - read_and_close(response) - raise RateLimitError(response, retry_delay) - return response - else: - logger.debug('Received error response (status: %s), will retry', response.status_code) - read_and_close(response) - except WaybackSession.handleable_errors as error: - response = getattr(error, 'response', None) - if response is not None: - read_and_close(response) - - if retries >= maximum: - raise WaybackRetryError(retries, time.time() - start_time, error) from error - elif self.should_retry_error(error): - retry_delay = self.get_retry_delay(retries, response) - logger.info('Caught exception during request, will retry: %s', error) - else: - raise - - logger.debug('Will retry after sleeping for %s seconds...', retry_delay) - time.sleep(retry_delay) - retries += 1 - - # Customize `request` in order to set a default timeout from the session. - # We can't do this in `send` because `request` always passes a `timeout` - # keyword to `send`. Inside `send`, we can't tell the difference between a - # user explicitly requesting no timeout and not setting one at all. - def request(self, method, url, **kwargs): - """ - Perform an HTTP request using this session. For arguments and return - values, see: - https://requests.readthedocs.io/en/latest/api/#requests.Session.request - - If the ``timeout`` keyword argument is not set, it will default to the - session's ``timeout`` attribute. - """ - if 'timeout' not in kwargs: - kwargs['timeout'] = self.timeout - return super().request(method, url, **kwargs) - - def should_retry(self, response): - # A memento may actually be a capture of an error, so don't retry it :P - if is_memento_response(response): - return False - - return response.status_code in self.retryable_statuses - - def should_retry_error(self, error): - if isinstance(error, WaybackSession.retryable_errors): - return True - elif isinstance(error, ConnectionError): - # ConnectionErrors from requests actually wrap a whole family of - # more detailed errors from urllib3, so we need to do some string - # checking to determine whether the error is retryable. - text = str(error) - # NOTE: we have also seen this, which may warrant retrying: - # `requests.exceptions.ConnectionError: ('Connection aborted.', - # RemoteDisconnected('Remote end closed connection without - # response'))` - if 'NewConnectionError' in text or 'Max retries' in text: - return True - - return False - - def get_retry_delay(self, retries, response=None): - delay = 0 - - # As of 2023-11-27, the Wayback Machine does not set a `Retry-After` - # header, so this parsing is just future-proofing. - if response is not None: - delay = _utils.parse_retry_after(response.headers.get('Retry-After')) or delay - if response.status_code == 429 and delay == 0: - delay = DEFAULT_RATE_LIMIT_DELAY - - # No default backoff on the first retry. - if retries > 0: - delay = max(self.backoff * 2 ** (retries - 1), delay) - - return delay - - def reset(self): - "Reset any network connections the session is using." - # Close really just closes all the adapters in `self.adapters`. We - # could do that directly, but `self.adapters` is not documented/public, - # so might be somewhat risky. - self.close(disable=False) - # Re-build the standard adapters. See: - # https://github.com/kennethreitz/requests/blob/v2.22.0/requests/sessions.py#L415-L418 - self.mount('https://', requests.adapters.HTTPAdapter()) - self.mount('http://', requests.adapters.HTTPAdapter()) - - -# TODO: add retry, backoff, cross_thread_backoff, and rate_limit options that -# create a custom instance of urllib3.utils.Retry class WaybackClient(_utils.DepthCountedContext): """ A client for retrieving data from the Internet Archive's Wayback Machine. You can use a WaybackClient as a context manager. When exiting, it will - close the session it's using (if you've passed in a custom session, make + close the adapter it's using (if you've passed in a custom adapter, make sure not to use the context manager functionality unless you want to live dangerously). Parameters ---------- - session : WaybackSession, optional + adapter : WaybackHttpAdapter, optional """ - def __init__(self, session=None): - self.session = session or WaybackSession() + adapter: WaybackHttpAdapter + + def __init__(self, adapter=None): + self.adapter = adapter or WaybackRequestsAdapter() def __exit_all__(self, type, value, traceback): self.close() def close(self): - "Close the client's session." - self.session.close() + "Close the client's adapter." + self.adapter.close() def search(self, url, *, match_type=None, limit=1000, offset=None, fast_latest=None, from_date=None, to_date=None, @@ -783,24 +467,23 @@ def search(self, url, *, match_type=None, limit=1000, offset=None, previous_result = None while next_query: sent_query, next_query = next_query, None - response = self.session.request('GET', CDX_SEARCH_URL, + response = self.adapter.request('GET', CDX_SEARCH_URL, params=sent_query) - try: - # Read/cache the response and close straightaway. If we need - # to raise for status, we want to pre-emptively close the - # response so a user handling the error doesn't need to - # worry about it. If we don't raise here, we still want to - # close the connection so it doesn't leak when we move onto - # the next of results or when this iterator ends. - read_and_close(response) - response.raise_for_status() - except requests.exceptions.HTTPError as error: + + # Read/cache the response and close straightaway. If we need + # to raise for status, we want to pre-emptively close the + # response so a user handling the error doesn't need to + # worry about it. If we don't raise here, we still want to + # close the connection so it doesn't leak when we move onto + # the next of results or when this iterator ends. + response.close() + if response.status_code >= 400: if 'AdministrativeAccessControlException' in response.text: raise BlockedSiteError(query['url']) elif 'RobotAccessControlException' in response.text: raise BlockedByRobotsError(query['url']) else: - raise WaybackException(str(error)) + raise WaybackException(f'HTTP {response.status_code} error for CDX search: "{query}"') lines = iter(response.content.splitlines()) @@ -1011,25 +694,22 @@ def get_memento(self, url, timestamp=None, mode=Mode.original, *, urls = set() previous_was_memento = False - response = self.session.request('GET', url, allow_redirects=False) + response = self.adapter.request('GET', url, follow_redirects=False) protocol_and_www = re.compile(r'^https?://(www\d?\.)?') memento = None while True: current_url, current_date, current_mode = _utils.memento_url_data(response.url) # In view mode, redirects need special handling. - if current_mode == Mode.view.value: + redirect_url = response.redirect_url + if current_mode == Mode.view.value and not redirect_url: redirect_url = detect_view_mode_redirect(response, current_date) if redirect_url: # Fix up response properties to be like other modes. - redirect = requests.Request('GET', redirect_url) - response._next = self.session.prepare_request(redirect) response.headers['Memento-Datetime'] = current_date.strftime( '%a, %d %b %Y %H:%M:%S %Z' ) - is_memento = is_memento_response(response) - # A memento URL will match possible captures based on its SURT # form, which means we might be getting back a memento captured # from a different URL than the one specified in the request. @@ -1037,7 +717,7 @@ def get_memento(self, url, timestamp=None, mode=Mode.original, *, if response.links and ('original' in response.links): current_url = response.links['original']['url'] - if is_memento: + if response.is_memento: links = clean_memento_links(response.links, mode) memento = Memento(url=current_url, timestamp=current_date, @@ -1062,11 +742,11 @@ def get_memento(self, url, timestamp=None, mode=Mode.original, *, # rarely have been captured at the same time as the # redirect itself. (See 2b) playable = False - if response.next and ( + if redirect_url and ( (len(history) == 0 and not exact) or (len(history) > 0 and (previous_was_memento or not exact_redirects)) ): - target_url, target_date, _ = _utils.memento_url_data(response.next.url) + target_url, target_date, _ = _utils.memento_url_data(redirect_url) # A non-memento redirect is generally taking us to the # closest-in-time capture of the same URL. Note that is # NOT the next capture -- i.e. the one that would have @@ -1095,7 +775,7 @@ def get_memento(self, url, timestamp=None, mode=Mode.original, *, playable = True if not playable: - read_and_close(response) + response.close() message = response.headers.get('X-Archive-Wayback-Runtime-Error', '') if ( ('AdministrativeAccessControlException' in message) or @@ -1109,7 +789,7 @@ def get_memento(self, url, timestamp=None, mode=Mode.original, *, raise BlockedByRobotsError(f'{url} is blocked by robots.txt') elif message: raise MementoPlaybackError(f'Memento at {url} could not be played: {message}') - elif response.ok: + elif response.is_success: # TODO: Raise more specific errors for the possible # cases here. We *should* only arrive here when # there's a redirect and: @@ -1124,21 +804,21 @@ def get_memento(self, url, timestamp=None, mode=Mode.original, *, raise MementoPlaybackError(f'{response.status_code} error while loading ' f'memento at {url}') - if response.next: - previous_was_memento = is_memento - read_and_close(response) + if redirect_url: + previous_was_memento = response.is_memento + response.close() # Wayback sometimes has circular memento redirects ¯\_(ツ)_/¯ urls.add(response.url) - if response.next.url in urls: + if redirect_url in urls: raise MementoPlaybackError(f'Memento at {url} is circular') # All requests are included in `debug_history`, but # `history` only shows redirects that were mementos. debug_history.append(response.url) - if is_memento: + if response.is_memento: history.append(memento) - response = self.session.send(response.next, allow_redirects=False) + response = self.adapter.request('GET', redirect_url, follow_redirects=False) else: break diff --git a/src/wayback/_http.py b/src/wayback/_http.py new file mode 100644 index 0000000..4af214f --- /dev/null +++ b/src/wayback/_http.py @@ -0,0 +1,591 @@ +""" +HTTP tooling used by Wayback when making requests to and handling responses +from Wayback Machine servers. +""" + +import logging +from numbers import Real +import threading +import time +from typing import Dict, Optional, Tuple, Union +from urllib.parse import urljoin, urlparse +import requests +from requests.exceptions import (ChunkedEncodingError, + ConnectionError, + ContentDecodingError, + ProxyError, + RetryError, + Timeout) +from urllib3.connectionpool import HTTPConnectionPool +from urllib3.exceptions import (ConnectTimeoutError, + MaxRetryError, + ReadTimeoutError) +from . import __version__ +from ._utils import DisableAfterCloseAdapter, RateLimit, parse_retry_after +from .exceptions import RateLimitError, WaybackRetryError + +logger = logging.getLogger(__name__) + +# Global default rate limits for various endpoints. Internet Archive folks have +# asked us to set the defaults at 80% of the hard limits. +DEFAULT_CDX_RATE_LIMIT = RateLimit(0.8 * 60 / 60) +DEFAULT_TIMEMAP_RATE_LIMIT = RateLimit(0.8 * 100 / 60) +DEFAULT_MEMENTO_RATE_LIMIT = RateLimit(0.8 * 600 / 60) + +# If a rate limit response (i.e. a response with status == 429) does not +# include a `Retry-After` header, recommend pausing for this long. +DEFAULT_RATE_LIMIT_DELAY = 60 + + +##################################################################### +# HACK: handle malformed Content-Encoding headers from Wayback. +# When you send `Accept-Encoding: gzip` on a request for a memento, Wayback +# will faithfully gzip the response body. However, if the original response +# from the web server that was snapshotted was gzipped, Wayback screws up the +# `Content-Encoding` header on the memento response, leading any HTTP client to +# *not* decompress the gzipped body. Wayback folks have no clear timeline for +# a fix, hence the workaround here. +# +# More info in this issue: +# https://github.com/edgi-govdata-archiving/web-monitoring-processing/issues/309 +# +# Example broken Wayback URL: +# http://web.archive.org/web/20181023233237id_/http://cwcgom.aoml.noaa.gov/erddap/griddap/miamiacidification.graph +# +if hasattr(HTTPConnectionPool, 'ResponseCls'): + # urllib3 v1.x: + # + # This subclass of urllib3's response class identifies the malformed headers + # and repairs them before instantiating the actual response object, so when + # it reads the body, it knows to decode it correctly. + # + # See what we're overriding from urllib3: + # https://github.com/urllib3/urllib3/blob/a6ec68a5c5c5743c59fe5c62c635c929586c429b/src/urllib3/response.py#L499-L526 + class WaybackUrllib3Response(HTTPConnectionPool.ResponseCls): + @classmethod + def from_httplib(cls, httplib_response, **response_kwargs): + headers = httplib_response.msg + pairs = headers.items() + if ('content-encoding', '') in pairs and ('Content-Encoding', 'gzip') in pairs: + del headers['content-encoding'] + headers['Content-Encoding'] = 'gzip' + return super().from_httplib(httplib_response, **response_kwargs) + + HTTPConnectionPool.ResponseCls = WaybackUrllib3Response +else: + # urllib3 v2.x: + # + # Unfortunately, we can't wrap the `HTTPConnection.getresponse` method in + # urllib3 v2, since it may have read the response body before returning. So + # we patch the HTTPHeaderDict class instead. + from urllib3._collections import HTTPHeaderDict as Urllib3HTTPHeaderDict + _urllib3_header_init = Urllib3HTTPHeaderDict.__init__ + + def _new_header_init(self, headers=None, **kwargs): + if headers: + if isinstance(headers, (Urllib3HTTPHeaderDict, dict)): + pairs = list(headers.items()) + else: + pairs = list(headers) + if ( + ('content-encoding', '') in pairs and + ('Content-Encoding', 'gzip') in pairs + ): + headers = [pair for pair in pairs + if pair[0].lower() != 'content-encoding'] + headers.append(('Content-Encoding', 'gzip')) + + return _urllib3_header_init(self, headers, **kwargs) + + Urllib3HTTPHeaderDict.__init__ = _new_header_init +# END HACK +##################################################################### + + +class WaybackHttpResponse: + """ + Represents an HTTP response from a server. This might be included as an + attribute of an exception, but should otherwise not be exposed to user + code in normal circumstances. It's meant to wrap to provide a standard, + thread-safe interface to response objects from whatever underlying HTTP + tool is being used (e.g. requests, httpx, etc.). + """ + status_code: int + headers: dict + encoding: Optional[str] = None + url: str + links: dict + + def __init__(self, url: str, status_code: int, headers: dict, links: dict = None, encoding: str = None): + self.url = url + self.status_code = status_code + self.headers = headers + self.links = links or {} + self.encoding = encoding + + @property + def redirect_url(self) -> str: + """ + The absolute URL this response redirects to. It will always be a + complete URL with a scheme and host. If the response is not a redirect, + this returns an empty string. + """ + if self.status_code >= 300 and self.status_code < 400: + location = self.headers.get('location') + if location: + return urljoin(self.url, location) + + return '' + + @property + def is_success(self) -> bool: + """Whether the status code indicated success (2xx) or an error.""" + return self.status_code >= 200 and self.status_code < 300 + + @property + def is_memento(self) -> bool: + return 'Memento-Datetime' in self.headers + + @property + def content(self) -> bytes: + """ + The response body as bytes. This is the *decompressed* bytes, so + responses with `Content-Encoding: gzip` will be unzipped here. + """ + raise NotImplementedError() + + @property + def text(self) -> str: + """ + Gets the response body as a text string. it will try to decode the raw + bytes of the response based on response's declared encoding (i.e. from + the ``Content-Type`` header), falling back to sniffing the encoding or + using UTF-8. + """ + encoding = self.encoding or self.sniff_encoding() or 'utf-8' + try: + return str(self.content, encoding, errors="replace") + except (LookupError, TypeError): + return str(self.content, errors="replace") + + def sniff_encoding(self) -> Optional[str]: + """ + Sniff the text encoding from the raw bytes of the content, if possible. + """ + return None + + def close(self, cache: bool = True) -> None: + """ + Read the rest of the response off the wire and release the connection. + If the full response is not read, the connection can hang and waste + both local and server resources. + + Parameters + ---------- + cache : bool, default: True + Whether to cache the response body so it can still be used via the + ``content`` and ``text`` properties. + """ + raise NotImplementedError() + + +class WaybackRequestsResponse(WaybackHttpResponse): + """ + Wraps an HTTP response from the requests library. + """ + _read_lock: threading.RLock + _raw: requests.Response + _content: Optional[bytes] = None + + def __init__(self, raw: requests.Response) -> None: + # NOTE: if we drop down to urllib3, we need the requested URL to be + # passed in so we can join it with the response's URL (in urllib3, + # `response.url` does not include the scheme/host/etc data that belongs + # to the connection pool). + super().__init__( + url=raw.url, + status_code=raw.status_code, + headers=raw.headers, + links=raw.links, + encoding=raw.encoding + ) + self._read_lock = threading.RLock() + self._raw = raw + + @property + def content(self) -> bytes: + with self._read_lock: + if self._content is None: + self._content = self._raw.content + + return self._content + + def sniff_encoding(self) -> None: + self.content + return self._raw.apparent_encoding + + def close(self, cache: bool = True) -> None: + with self._read_lock: + # Reading bytes potentially involves decoding data from compressed + # gzip/brotli/etc. responses, so we need to handle those errors by + # continuing to just read the raw data off the socket instead. + # + # This fallback behavior is inspired by requests: + # https://github.com/psf/requests/blob/eedd67462819f8dbf8c1c32e77f9070606605231/requests/sessions.py#L160-L163 + # For urllib3, the appropriate errors to handle would be: + # `(DecodeError, ProtocolError, RuntimeError)` + try: + if cache: + try: + self.content + except (ChunkedEncodingError, ContentDecodingError, RuntimeError): + self._raw.raw.read(decode_content=False) + else: + self._raw.raw.read(decode_content=False) + finally: + self._raw.close() + + +class WaybackHttpAdapter: + """ + Handles making actual HTTP requests over the network. This is an abstract + base class that defines the API an adapter must implement. + """ + + def request( + self, + method: str, + url: str, + *, + params: dict = None, + headers: dict = None, + follow_redirects: bool = True, + timeout: Union[int, Tuple[int, int]] = None + ) -> WaybackHttpResponse: + """ + Send an HTTP request and return a :class:`WaybackHttpResponse` object + representing the response. + + Parameters + ---------- + method : str + Method to use for the request, e.g. ``'GET'``, ``'POST'``. + url : str + The URL to reqeust. + params : dict, optional + For POST/PUT requests, data to be encoded in the response body. For + other methods, this should be encoded as the querystring. + headers : dict, optional + The HTTP headers to send with the request. + follow_redirects : bool, default: True + Whether to follow redirects before returning a response. + timeout : int or float or tuple of (int or float, int or float), optional + How long to wait, in seconds, before timing out. If this is a single + number, it will be used as both a connect timeout (how long to wait + for the first bit of response data) and a read timeout (how long + to wait between bits of response data). If a tuple, the values will + be used as the connect and read timeouts, respectively. + + Returns + ------- + WaybackHttpResponse + """ + raise NotImplementedError() + + def close(self) -> None: + """ + Close the adapter and release any long-lived resources, like pooled + HTTP connections. + """ + ... + + +class RequestsAdapter(WaybackHttpAdapter): + """ + Wrap the requests library for making HTTP requests. + """ + + def __init__(self) -> None: + self._session = requests.Session() + + def request( + self, + method: str, + url: str, + *, + params: dict = None, + headers: dict = None, + follow_redirects: bool = True, + timeout: Union[int, Tuple[int, int]] = None + ) -> WaybackHttpResponse: + logger.debug('Sending HTTP request <%s "%s">', method, url) + response = self._session.request( + method=method, + url=url, + params=params, + headers=headers, + allow_redirects=follow_redirects, + timeout=timeout, + stream=True + ) + return WaybackRequestsResponse(response) + + def close(self) -> None: + self._session.close() + + +class RetryAndRateLimitAdapter(WaybackHttpAdapter): + """ + Adds rate limiting and retry functionality to an HTTP adapter. This class + can't actually make HTTP requests and should usually be used in a + multiple-inheritance situation. Alternatively, you can override the + ``request_raw()`` method. + + Parameters + ---------- + retries : int, default: 6 + The maximum number of retries for failed HTTP requests. + backoff : int or float, default: 2 + Number of seconds from which to calculate how long to back off and wait + when retrying requests. The first retry is always immediate, but + subsequent retries increase by powers of 2: + + seconds = backoff * 2 ^ (retry number - 1) + + So if this was `4`, retries would happen after the following delays: + 0 seconds, 4 seconds, 8 seconds, 16 seconds, ... + timeout : int or float or tuple of (int or float, int or float), default: 60 + A timeout to use for all requests. + See the Requests docs for more: + https://docs.python-requests.org/en/master/user/advanced/#timeouts + rate_limits : dict of (str, RateLimit) + The rate limits that should be applied to different URL paths. The keys + are URL path prefixes, e.g. ``"/cdx/search/"``, and the values are + :class:`wayback.RateLimit` objects. When requests are made, the rate + limit from the most specific matching path is used and execution will + pause to ensure the rate limit is not exceeded. + + + Examples + -------- + + Usage via multiple inheritance: + + >>> class CombinedAdapter(RetryAndRateLimitAdapter, SomeActualHttpAdapterWithoutRateLimits): + >>> ... + + Usage via override: + + >>> class MyHttpAdapter(RetryAndRateLimitAdapter): + >>> def request_raw( + >>> self, + >>> method: str, + >>> url: str, + >>> *, + >>> params: dict = None, + >>> headers: dict = None, + >>> follow_redirects: bool = True, + >>> timeout: Union[int, Tuple[int, int]] = None + >>> ) -> WaybackHttpResponse: + >>> response = urllib.urlopen(...) + >>> return make_response_from_urllib(response) + """ + + rate_limits: Dict[str, RateLimit] + + # It seems Wayback sometimes produces 500 errors for transient issues, so + # they make sense to retry here. Usually not in other contexts, though. + retryable_statuses = frozenset((413, 421, 500, 502, 503, 504, 599)) + + # XXX: Some of these are requests-specific and should move WaybackRequestsAdapter. + retryable_errors = (ConnectTimeoutError, MaxRetryError, ReadTimeoutError, + ProxyError, RetryError, Timeout) + # Handleable errors *may* be retryable, but need additional logic beyond + # just the error type. See `should_retry_error()`. + handleable_errors = (ConnectionError,) + retryable_errors + + def __init__( + self, + retries: int = 6, + backoff: Real = 2, + rate_limits: Dict[str, RateLimit] = {} + ) -> None: + super().__init__() + self.retries = retries + self.backoff = backoff + # Sort rate limits by longest path first, so we always match the most + # specific path when looking for the right rate limit on any given URL. + self.rate_limits = {path: rate_limits[path] + for path in sorted(rate_limits.keys(), + key=lambda k: len(k), + reverse=True)} + + # The implementation of different features is split up by method here, so + # `request()` calls down through a stack of overrides: + # + # request (ensure valid types/values/etc.) + # └> _request_redirectable (handle redirects) + # └> _request_retryable (handle retries for errors) + # └> _request_rate_limited (handle rate limiting/throttling) + # └> request_raw (handle actual HTTP) + def request( + self, + method: str, + url: str, + *, + params: dict = None, + headers: dict = None, + follow_redirects: bool = True, + timeout: Union[int, Tuple[int, int]] = None + ) -> WaybackHttpResponse: + return self._request_redirectable(method, + url, + params=params, + headers=headers, + follow_redirects=follow_redirects, + timeout=timeout) + + def _request_redirectable(self, *args, follow_redirects: bool = True, **kwargs) -> WaybackHttpResponse: + # FIXME: this method should implement redirect following (if + # `follow_redirects` is true), rather than passing it to the underlying + # implementation, since redirects need to be rate limited. + return self._request_retryable(*args, follow_redirects=follow_redirects, **kwargs) + + def _request_retryable(self, method: str, url: str, **kwargs) -> WaybackHttpResponse: + start_time = time.time() + maximum = self.retries + retries = 0 + + while True: + retry_delay = 0 + try: + response = self._request_rate_limited(method, url, **kwargs) + retry_delay = self.get_retry_delay(retries, response) + + if retries >= maximum or not self.should_retry(response): + if response.status_code == 429: + response.close() + raise RateLimitError(response, retry_delay) + return response + else: + logger.debug('Received error response (status: %s), will retry', response.status_code) + response.close(cache=False) + except self.handleable_errors as error: + response = getattr(error, 'response', None) + if response is not None: + response.close() + + if retries >= maximum: + raise WaybackRetryError(retries, time.time() - start_time, error) from error + elif self.should_retry_error(error): + retry_delay = self.get_retry_delay(retries, response) + logger.info('Caught exception during request, will retry: %s', error) + else: + raise + + logger.debug('Will retry after sleeping for %s seconds...', retry_delay) + time.sleep(retry_delay) + retries += 1 + + def _request_rate_limited(self, method: str, url: str, **kwargs) -> WaybackHttpResponse: + parsed_url = urlparse(url) + for path, limit in self.rate_limits.items(): + if parsed_url.path.startswith(path): + rate_limit = limit + break + else: + rate_limit = DEFAULT_MEMENTO_RATE_LIMIT + + rate_limit.wait() + return self.request_raw(method, url, **kwargs) + + def request_raw(self, *args, **kwargs) -> WaybackHttpResponse: + return super().request(*args, **kwargs) + + def should_retry(self, response: WaybackHttpResponse): + # A memento may actually be a capture of an error, so don't retry it :P + if response.is_memento: + return False + + return response.status_code in self.retryable_statuses + + def should_retry_error(self, error): + if isinstance(error, self.retryable_errors): + return True + elif isinstance(error, ConnectionError): + # ConnectionErrors from requests actually wrap a whole family of + # more detailed errors from urllib3, so we need to do some string + # checking to determine whether the error is retryable. + text = str(error) + # NOTE: we have also seen this, which may warrant retrying: + # `requests.exceptions.ConnectionError: ('Connection aborted.', + # RemoteDisconnected('Remote end closed connection without + # response'))` + if 'NewConnectionError' in text or 'Max retries' in text: + return True + + return False + + def get_retry_delay(self, retries, response=None): + delay = 0 + + # As of 2023-11-27, the Wayback Machine does not set a `Retry-After` + # header, so this parsing is really just future-proofing. + if response is not None: + delay = parse_retry_after(response.headers.get('Retry-After')) or delay + if response.status_code == 429 and delay == 0: + delay = DEFAULT_RATE_LIMIT_DELAY + + # No default backoff on the first retry. + if retries > 0: + delay = max(self.backoff * 2 ** (retries - 1), delay) + + return delay + + +class WaybackRequestsAdapter(RetryAndRateLimitAdapter, DisableAfterCloseAdapter, RequestsAdapter): + def __init__( + self, + retries: int = 6, + backoff: Real = 2, + timeout: Union[Real, Tuple[Real, Real]] = 60, + user_agent: str = None, + memento_rate_limit: Union[RateLimit, Real] = DEFAULT_MEMENTO_RATE_LIMIT, + search_rate_limit: Union[RateLimit, Real] = DEFAULT_CDX_RATE_LIMIT, + timemap_rate_limit: Union[RateLimit, Real] = DEFAULT_TIMEMAP_RATE_LIMIT, + ) -> None: + super().__init__( + retries=retries, + backoff=backoff, + rate_limits={ + '/web/timemap': RateLimit.make_limit(timemap_rate_limit), + '/cdx': RateLimit.make_limit(search_rate_limit), + # The memento limit is actually a generic Wayback limit. + '/': RateLimit.make_limit(memento_rate_limit), + } + ) + self.timeout = timeout + self._session.headers = { + 'User-Agent': (user_agent or + f'wayback/{__version__} (+https://github.com/edgi-govdata-archiving/wayback)'), + 'Accept-Encoding': 'gzip, deflate' + } + + def request( + self, + method: str, + url: str, + *, + params: dict = None, + headers: dict = None, + follow_redirects: bool = True, + timeout: Union[int, Tuple[int, int]] = -1 + ) -> WaybackHttpResponse: + timeout = self.timeout if timeout == -1 else timeout + + return super().request(method, + url, + params=params, + headers=headers, + follow_redirects=follow_redirects, + timeout=timeout) diff --git a/src/wayback/_utils.py b/src/wayback/_utils.py index 4d8fa05..5a2f9dd 100644 --- a/src/wayback/_utils.py +++ b/src/wayback/_utils.py @@ -4,13 +4,11 @@ import email.utils import logging import re -import requests -import requests.adapters import threading import time from typing import Union import urllib.parse -from .exceptions import SessionClosedError +from .exceptions import AlreadyClosedError logger = logging.getLogger(__name__) @@ -322,9 +320,9 @@ def __exit_all__(self, type, value, traceback): pass -class DisableAfterCloseSession(requests.Session): +class DisableAfterCloseAdapter: """ - A custom session object raises a :class:`SessionClosedError` if you try to + A custom session object raises a :class:`AlreadyClosedError` if you try to use it after closing it, to help identify and avoid potentially dangerous code patterns. (Standard session objects continue to be usable after closing, even if they may not work exactly as expected.) @@ -332,16 +330,15 @@ class DisableAfterCloseSession(requests.Session): _closed = False def close(self, disable=True): - super().close() if disable: self._closed = True - def send(self, *args, **kwargs): + def request(self, *args, **kwargs): if self._closed: - raise SessionClosedError('This session has already been closed ' + raise AlreadyClosedError('This session has already been closed ' 'and cannot send new HTTP requests.') - - return super().send(*args, **kwargs) + else: + return super().request(*args, **kwargs) class CaseInsensitiveDict(MutableMapping): diff --git a/src/wayback/exceptions.py b/src/wayback/exceptions.py index 2273c42..04bddf1 100644 --- a/src/wayback/exceptions.py +++ b/src/wayback/exceptions.py @@ -101,8 +101,8 @@ def __init__(self, response, retry_after): super().__init__(message) -class SessionClosedError(Exception): +class AlreadyClosedError(Exception): """ - Raised when a Wayback session is used to make a request after it has been + Raised when a Wayback client is used to make a request after it has been closed and disabled. """ diff --git a/src/wayback/tests/test_client.py b/src/wayback/tests/test_client.py index f236a76..82c9250 100644 --- a/src/wayback/tests/test_client.py +++ b/src/wayback/tests/test_client.py @@ -6,11 +6,11 @@ import requests from unittest import mock from .support import create_vcr -from .._utils import SessionClosedError +from .._utils import AlreadyClosedError from .._client import (CdxRecord, Mode, - WaybackSession, WaybackClient) +from .._http import WaybackRequestsAdapter from ..exceptions import (BlockedSiteError, MementoPlaybackError, NoMementoError, @@ -112,7 +112,7 @@ def test_search_multipage(): @ia_vcr.use_cassette() def test_search_cannot_iterate_after_session_closing(): - with pytest.raises(SessionClosedError): + with pytest.raises(AlreadyClosedError): with WaybackClient() as client: versions = client.search('nasa.gov', from_date=datetime(1996, 10, 1), @@ -671,7 +671,7 @@ def test_get_memento_returns_memento_with_accurate_url(): assert memento.url == 'https://www.fws.gov/' -def return_timeout(self, *args, **kwargs) -> requests.Response: +def return_timeout(request, **kwargs) -> requests.Response: """ Patch requests.Session.send with this in order to return a response with the provided timeout value as the response body. @@ -687,14 +687,31 @@ def return_timeout(self, *args, **kwargs) -> requests.Response: return res -class TestWaybackSession: +def return_user_agent(request, **kwargs) -> requests.Response: + """ + Patch requests.Session.send with this in order to return a response with + the provided ``User-Agent`` header value as the response body. + + Usage: + >>> @mock.patch('requests.Session.send', side_effect=return_user_agent) + >>> def test_timeout(self, mock_class): + >>> assert requests.get('http://test.com', + >>> headers={'User-Agent': 'x'}).text == 'x' + """ + response = requests.Response() + response.status_code = 200 + response._content = str(request.headers.get('User-Agent', None)).encode() + return response + + +class TestWaybackRequestsAdapter: def test_request_retries(self, requests_mock): requests_mock.get('http://test.com', [{'text': 'bad1', 'status_code': 503}, {'text': 'bad2', 'status_code': 503}, {'text': 'good', 'status_code': 200}]) - session = WaybackSession(retries=2, backoff=0.1) + session = WaybackRequestsAdapter(retries=2, backoff=0.1) response = session.request('GET', 'http://test.com') - assert response.ok + assert response.is_success session.close() @@ -702,7 +719,7 @@ def test_stops_after_given_retries(self, requests_mock): requests_mock.get('http://test.com', [{'text': 'bad1', 'status_code': 503}, {'text': 'bad2', 'status_code': 503}, {'text': 'good', 'status_code': 200}]) - session = WaybackSession(retries=1, backoff=0.1) + session = WaybackRequestsAdapter(retries=1, backoff=0.1) response = session.request('GET', 'http://test.com') assert response.status_code == 503 assert response.text == 'bad2' @@ -710,28 +727,28 @@ def test_stops_after_given_retries(self, requests_mock): def test_only_retries_some_errors(self, requests_mock): requests_mock.get('http://test.com', [{'text': 'bad1', 'status_code': 400}, {'text': 'good', 'status_code': 200}]) - session = WaybackSession(retries=1, backoff=0.1) + session = WaybackRequestsAdapter(retries=1, backoff=0.1) response = session.request('GET', 'http://test.com') assert response.status_code == 400 def test_raises_rate_limit_error(self, requests_mock): requests_mock.get('http://test.com', [WAYBACK_RATE_LIMIT_ERROR]) with pytest.raises(RateLimitError): - session = WaybackSession(retries=0) + session = WaybackRequestsAdapter(retries=0) session.request('GET', 'http://test.com') def test_rate_limit_error_includes_retry_after(self, requests_mock): requests_mock.get('http://test.com', [WAYBACK_RATE_LIMIT_ERROR]) with pytest.raises(RateLimitError) as excinfo: - session = WaybackSession(retries=0) + session = WaybackRequestsAdapter(retries=0) session.request('GET', 'http://test.com') assert excinfo.value.retry_after == 10 @mock.patch('requests.Session.send', side_effect=return_timeout) def test_timeout_applied_session(self, mock_class): - # Is the timeout applied through the WaybackSession - session = WaybackSession(timeout=1) + # Is the timeout applied through the adapter + session = WaybackRequestsAdapter(timeout=1) res = session.request('GET', 'http://test.com') assert res.text == '1' # Overwriting the default in the requests method @@ -743,7 +760,7 @@ def test_timeout_applied_session(self, mock_class): @mock.patch('requests.Session.send', side_effect=return_timeout) def test_timeout_applied_request(self, mock_class): # Using the default value - session = WaybackSession() + session = WaybackRequestsAdapter() res = session.request('GET', 'http://test.com') assert res.text == '60' # Overwriting the default @@ -755,13 +772,19 @@ def test_timeout_applied_request(self, mock_class): @mock.patch('requests.Session.send', side_effect=return_timeout) def test_timeout_empty(self, mock_class): # Disabling default timeout - session = WaybackSession(timeout=None) + session = WaybackRequestsAdapter(timeout=None) res = session.request('GET', 'http://test.com') assert res.text == 'None' # Overwriting the default res = session.request('GET', 'http://test.com', timeout=1) assert res.text == '1' + @mock.patch('requests.Session.send', side_effect=return_user_agent) + def test_user_agent(self, mock_class): + adapter = WaybackRequestsAdapter() + agent = adapter.request('GET', 'http://test.com').text + assert agent.startswith('wayback/') + @ia_vcr.use_cassette() def test_search_rate_limits(self): # The timing relies on the cassettes being present, @@ -778,7 +801,7 @@ def test_search_rate_limits(self): # Check that disabling the rate limits through the search API works. start_time = time.time() - with WaybackClient(WaybackSession(search_calls_per_second=0)) as client: + with WaybackClient(WaybackRequestsAdapter(search_rate_limit=0)) as client: for i in range(3): next(client.search('zew.de')) duration_without_limits = time.time() - start_time @@ -787,7 +810,7 @@ def test_search_rate_limits(self): # I need to sleep one half second in order to reset the rate limit. time.sleep(0.5) start_time = time.time() - with WaybackClient(WaybackSession(search_calls_per_second=2)) as client: + with WaybackClient(WaybackRequestsAdapter(search_rate_limit=2)) as client: for i in range(3): next(client.search('zew.de')) duration_with_limits_custom = time.time() - start_time @@ -798,11 +821,14 @@ def test_search_rate_limits(self): @ia_vcr.use_cassette() def test_memento_rate_limits(self): - # The timing relies on the cassettes being present, - # therefore the first run might fail. - # Since another test might run before this one, - # I have to wait one call before starting. + # NOTE: The timing relies on VCR cassettes being present (so HTTP + # responses come immediately). The first time you run this test, it + # might fail because it is recording real requests that take a while. + + # Since another test might run before this one, wait a bit before + # starting to get past rate limits from previous tests. time.sleep(1/30) + with WaybackClient() as client: cdx = next(client.search('zew.de')) # First test that the default rate limits are correctly applied. @@ -813,17 +839,15 @@ def test_memento_rate_limits(self): duration_with_limits = time.time() - start_time # Check that disabling the rate limits through the get_memento API works. - time.sleep(1) # Wait to exceed any previous rate limits. start_time = time.time() - with WaybackClient(WaybackSession(memento_calls_per_second=0)) as client: + with WaybackClient(WaybackRequestsAdapter(memento_rate_limit=0)) as client: for i in range(3): client.get_memento(cdx) duration_without_limits = time.time() - start_time # Check that a different rate limit set through the session is applied correctly. - time.sleep(1) # Wait to exceed any previous rate limits. start_time = time.time() - with WaybackClient(WaybackSession(memento_calls_per_second=10)) as client: + with WaybackClient(WaybackRequestsAdapter(memento_rate_limit=10)) as client: for i in range(6): client.get_memento(cdx) duration_with_limits_custom = time.time() - start_time