diff --git a/sopel/modules/url.py b/sopel/modules/url.py index 92126cfe13..7d808b67dd 100644 --- a/sopel/modules/url.py +++ b/sopel/modules/url.py @@ -10,20 +10,21 @@ """ from __future__ import annotations +from email.message import EmailMessage from ipaddress import ip_address import logging import re +from socket import getaddrinfo, IPPROTO_TCP from typing import Generator, List, Optional, Tuple -from urllib.parse import urlparse +from urllib.parse import urlparse, urlunparse -import dns.resolver import requests from urllib3.exceptions import LocationValueError # type: ignore[import] from sopel import plugin, tools from sopel.bot import Sopel, SopelWrapper from sopel.config import Config, types -from sopel.tools import web +from sopel.tools import SopelMemory, web from sopel.trigger import Trigger @@ -68,6 +69,20 @@ class UrlSection(types.StaticSection): """Enable requests to private and local network IP addresses""" +class HostAdapter(requests.adapters.HTTPAdapter): + """SNI and certificate validation based on Host header value. + + See also: https://github.com/requests/toolbelt/pull/293 + """ + def send(self, request, *args, **kwargs): + host = request.headers.get("Host") + if host: + self.poolmanager.connection_pool_kw["server_hostname"] = host + elif "server_hostname" in self.poolmanager.connection_pool_kw: + self.poolmanager.connection_pool_kw.pop("server_hostname") + return super().send(request, *args, **kwargs) + + def configure(config: Config): """ | name | example | purpose | @@ -295,11 +310,6 @@ def title_auto(bot: SopelWrapper, trigger: Trigger): if re.match(bot.config.core.prefix + r'\S+', trigger): return - # Avoid fetching known malicious links - if 'safety_cache' in bot.memory and trigger in bot.memory['safety_cache']: - if bot.memory['safety_cache'][trigger]['positives'] > 1: - return - urls = web.search_urls( trigger, exclusion_char=bot.config.url.exclusion_char, clean=True) @@ -341,44 +351,29 @@ def process_urls( if not requested and url.startswith(bot.config.url.exclusion_char): continue - parsed_url = urlparse(url) - # Check the URL does not match an existing URL callback if check_callbacks(bot, url, use_excludes=not requested): yield (url, None, None, None, True) return - # Prevent private addresses from being queried if enable_private_resolution is False - # FIXME: This does nothing when an attacker knows how to host a 302 - # FIXME: This whole concept has a TOCTOU issue - if not bot.config.url.enable_private_resolution: - try: - ips = [ip_address(parsed_url.hostname)] - except ValueError: - ips = [ip_address(ip) for ip in dns.resolver.query(parsed_url.hostname)] - - private = False - for ip in ips: - if ip.is_private or ip.is_loopback: - private = True - break - if private: - LOGGER.debug('Ignoring private URL: %s', url) - continue - # Call the URL to get a title, if possible - title = find_title(url) - if not title: + title_results = find_title( + url, + allow_local=bot.config.url.enable_private_resolution, + memory=bot.memory, + ) + if not title_results: # No title found: don't handle this URL LOGGER.debug('No title found; ignoring URL: %s', url) continue + title, final_hostname = title_results # If the URL is over bot.config.url.shorten_url_length, shorten the URL tinyurl = None if (shorten_url_length > 0) and (len(url) > shorten_url_length): tinyurl = get_or_create_shorturl(bot, url) - yield (url, title, parsed_url.hostname, tinyurl, False) + yield (url, title, final_hostname, tinyurl, False) def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> bool: @@ -416,32 +411,103 @@ def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> b ) -def find_title(url: str, verify: bool = True) -> Optional[str]: - """Return the title for the given URL. +def find_title( + url: str, + verify: bool = True, + allow_local: bool = False, + memory: SopelMemory = {}, +) -> Optional[Tuple[str, str]]: + """Fetch the title for the given URL. :param verify: Whether to require a valid certificate when using https + :param allow_local: Allow requests to non-global addresses (RFC1918, etc.) + :param memory: The bot.memory to search for safety.py caches + :return: A tuple of the (title, final_hostname) that were found, or None """ - try: - response = requests.get(url, stream=True, verify=verify, - headers=DEFAULT_HEADERS) - content = b'' - for byte in response.iter_content(chunk_size=512): - content += byte - if b'' in content or len(content) > MAX_BYTES: - break - content = content.decode('utf-8', errors='ignore') - # Need to close the connection because we have not read all - # the data - response.close() - except requests.exceptions.ConnectionError as e: - LOGGER.debug("Unable to reach URL: %r: %s", url, e) - return None - except ( - requests.exceptions.InvalidURL, # e.g. http:/// - UnicodeError, # e.g. http://.example.com (urllib3<1.26) - LocationValueError, # e.g. http://.example.com (urllib3>=1.26) - ): - LOGGER.debug('Invalid URL: %s', url) + original_url = url + redirects_left = 5 + session = requests.Session() + session.mount("https://", HostAdapter()) + session.headers = dict(DEFAULT_HEADERS) + while redirects_left > 0: + redirects_left -= 1 + parsed_url = urlparse(url) + + # Avoid fetching known malicious links + if "safety_cache" in memory and parsed_url.hostname in memory["safety_cache"]: + if memory["safety_cache"][parsed_url.hostname]["positives"] > 1: + LOGGER.debug("Ignoring unsafe URL: %r", url) + return None + + # Prevent private addresses from being queried + try: + # If link is to an IP + ips = [ip_address(parsed_url.hostname)] + except ValueError: # Nope, hostname + try: + # getaddrinfo instead of dns.resolver so we use normal OS + # name resolution. notably, this includes hosts files + addr_info = getaddrinfo(parsed_url.hostname, 443, proto=IPPROTO_TCP) + ips = [ip_address(info[4][0]) for info in addr_info] + # dns.resolver skips hosts file, but is simpler + #answers = dns.resolver.resolve(parsed_url.hostname) + #ips = [ip_address(ip) for ip in answers] + except Exception as e: + LOGGER.debug("Failed to get IPs for %r: %s", url, e) + return None + + if not allow_local and not all(ip.is_global for ip in ips): + LOGGER.debug("Ignoring private URL: %r", url) + return None + + # Prevent DNS TTL=0 TOCTOU from making us talk to unchecked IP + ip_host = ("{}" if ips[0].version == 4 else "[{}]").format(ips[0].compressed) + ip_url = urlunparse((parsed_url.scheme, ip_host) + parsed_url[2:]) + try: + response = session.get( + ip_url, + stream=True, + verify=verify, + headers={"Host": parsed_url.hostname.strip(".")}, # "example.com." + allow_redirects=False, + ) + if response.is_redirect: + LOGGER.debug( + "URL %r redirected to %r", url, response.headers.get("Location") + ) + url = response.headers.get("Location") + if url is None: + return None + continue + + content_bytes = b'' + for chunk in response.iter_content(chunk_size=512): + content_bytes += chunk + if b"" in content_bytes or len(content_bytes) > MAX_BYTES: + break + + encoding = None + if "Content-Type" in response.headers: + msg = EmailMessage() + msg["Content-Type"] = response.headers["Content-Type"] + encoding = msg.get_content_charset() + content = content_bytes.decode(encoding or "utf-8", errors="ignore") + + # Need to close the connection because we haven't read all the data + response.close() + except requests.exceptions.ConnectionError as e: + LOGGER.debug("Unable to reach URL: %r: %s", url, e) + return None + except ( + requests.exceptions.InvalidURL, # e.g. http:/// + UnicodeError, # e.g. http://.example.com (urllib3<1.26) + LocationValueError, # e.g. http://.example.com (urllib3>=1.26) + ): + LOGGER.debug('Invalid URL: %s', url) + return None + break + else: + LOGGER.debug("Redirects exhausted for %r", original_url) return None # Some cleanup that I don't really grok, but was in the original, so @@ -461,7 +527,7 @@ def find_title(url: str, verify: bool = True) -> Optional[str]: # More cryptic regex substitutions. This one looks to be myano's invention. title = RE_DCC.sub('', title) - return title or None + return (title, parsed_url.hostname) def get_or_create_shorturl(bot: SopelWrapper, url: str) -> str: diff --git a/test/modules/test_modules_url.py b/test/modules/test_modules_url.py index d8a6d087c1..56b6acf8ae 100644 --- a/test/modules/test_modules_url.py +++ b/test/modules/test_modules_url.py @@ -22,6 +22,12 @@ "http://example..com/", # empty label "http://?", # no host ) +PRIVATE_URLS = ( + # "https://httpbin.org/redirect-to?url=http://127.0.0.1/", # online + "http://127.1.1.1/", + "http://10.1.1.1/", + "http://169.254.1.1/", +) @pytest.fixture @@ -76,6 +82,11 @@ def test_find_title_invalid(site): assert url.find_title(site) is None +@pytest.mark.parametrize("site", PRIVATE_URLS) +def test_find_title_private(site): + assert url.find_title(site) is None + + def test_check_callbacks(mockbot): """Test that check_callbacks works with both new & legacy URL callbacks.""" assert url.check_callbacks(mockbot, 'https://example.com/test')