Skip to content

Commit

Permalink
url: fix private IP protection
Browse files Browse the repository at this point in the history
  • Loading branch information
half-duplex committed May 25, 2022
1 parent b4076d9 commit a85541e
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 55 deletions.
176 changes: 121 additions & 55 deletions sopel/modules/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@
"""
from __future__ import annotations

from email.message import EmailMessage
from ipaddress import ip_address
import logging
import re
from socket import getaddrinfo, IPPROTO_TCP
from typing import Generator, List, Optional, Tuple
from urllib.parse import urlparse
from urllib.parse import urlparse, urlunparse

import dns.resolver
import requests
from urllib3.exceptions import LocationValueError # type: ignore[import]

from sopel import plugin, tools
from sopel.bot import Sopel, SopelWrapper
from sopel.config import Config, types
from sopel.tools import web
from sopel.tools import SopelMemory, web
from sopel.trigger import Trigger


Expand Down Expand Up @@ -68,6 +69,20 @@ class UrlSection(types.StaticSection):
"""Enable requests to private and local network IP addresses"""


class HostAdapter(requests.adapters.HTTPAdapter):
"""SNI and certificate validation based on Host header value.
See also: https://github.com/requests/toolbelt/pull/293
"""
def send(self, request, *args, **kwargs):
host = request.headers.get("Host")
if host:
self.poolmanager.connection_pool_kw["server_hostname"] = host
elif "server_hostname" in self.poolmanager.connection_pool_kw:
self.poolmanager.connection_pool_kw.pop("server_hostname")
return super().send(request, *args, **kwargs)


def configure(config: Config):
"""
| name | example | purpose |
Expand Down Expand Up @@ -295,11 +310,6 @@ def title_auto(bot: SopelWrapper, trigger: Trigger):
if re.match(bot.config.core.prefix + r'\S+', trigger):
return

# Avoid fetching known malicious links
if 'safety_cache' in bot.memory and trigger in bot.memory['safety_cache']:
if bot.memory['safety_cache'][trigger]['positives'] > 1:
return

urls = web.search_urls(
trigger, exclusion_char=bot.config.url.exclusion_char, clean=True)

Expand Down Expand Up @@ -341,44 +351,29 @@ def process_urls(
if not requested and url.startswith(bot.config.url.exclusion_char):
continue

parsed_url = urlparse(url)

# Check the URL does not match an existing URL callback
if check_callbacks(bot, url, use_excludes=not requested):
yield (url, None, None, None, True)
return

# Prevent private addresses from being queried if enable_private_resolution is False
# FIXME: This does nothing when an attacker knows how to host a 302
# FIXME: This whole concept has a TOCTOU issue
if not bot.config.url.enable_private_resolution:
try:
ips = [ip_address(parsed_url.hostname)]
except ValueError:
ips = [ip_address(ip) for ip in dns.resolver.query(parsed_url.hostname)]

private = False
for ip in ips:
if ip.is_private or ip.is_loopback:
private = True
break
if private:
LOGGER.debug('Ignoring private URL: %s', url)
continue

# Call the URL to get a title, if possible
title = find_title(url)
if not title:
title_results = find_title(
url,
allow_local=bot.config.url.enable_private_resolution,
memory=bot.memory,
)
if not title_results:
# No title found: don't handle this URL
LOGGER.debug('No title found; ignoring URL: %s', url)
continue
title, final_hostname = title_results

# If the URL is over bot.config.url.shorten_url_length, shorten the URL
tinyurl = None
if (shorten_url_length > 0) and (len(url) > shorten_url_length):
tinyurl = get_or_create_shorturl(bot, url)

yield (url, title, parsed_url.hostname, tinyurl, False)
yield (url, title, final_hostname, tinyurl, False)


def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> bool:
Expand Down Expand Up @@ -416,32 +411,103 @@ def check_callbacks(bot: SopelWrapper, url: str, use_excludes: bool = True) -> b
)


def find_title(url: str, verify: bool = True) -> Optional[str]:
"""Return the title for the given URL.
def find_title(
url: str,
verify: bool = True,
allow_local: bool = False,
memory: SopelMemory = {},
) -> Optional[Tuple[str, str]]:
"""Fetch the title for the given URL.
:param verify: Whether to require a valid certificate when using https
:param allow_local: Allow requests to non-global addresses (RFC1918, etc.)
:param memory: The bot.memory to search for safety.py caches
:return: A tuple of the (title, final_hostname) that were found, or None
"""
try:
response = requests.get(url, stream=True, verify=verify,
headers=DEFAULT_HEADERS)
content = b''
for byte in response.iter_content(chunk_size=512):
content += byte
if b'</title>' in content or len(content) > MAX_BYTES:
break
content = content.decode('utf-8', errors='ignore')
# Need to close the connection because we have not read all
# the data
response.close()
except requests.exceptions.ConnectionError as e:
LOGGER.debug("Unable to reach URL: %r: %s", url, e)
return None
except (
requests.exceptions.InvalidURL, # e.g. http:///
UnicodeError, # e.g. http://.example.com (urllib3<1.26)
LocationValueError, # e.g. http://.example.com (urllib3>=1.26)
):
LOGGER.debug('Invalid URL: %s', url)
original_url = url
redirects_left = 5
session = requests.Session()
session.mount("https://", HostAdapter())
session.headers = dict(DEFAULT_HEADERS)
while redirects_left > 0:
redirects_left -= 1
parsed_url = urlparse(url)

# Avoid fetching known malicious links
if "safety_cache" in memory and parsed_url.hostname in memory["safety_cache"]:
if memory["safety_cache"][parsed_url.hostname]["positives"] > 1:
LOGGER.debug("Ignoring unsafe URL: %r", url)
return None

# Prevent private addresses from being queried
try:
# If link is to an IP
ips = [ip_address(parsed_url.hostname)]
except ValueError: # Nope, hostname
try:
# getaddrinfo instead of dns.resolver so we use normal OS
# name resolution. notably, this includes hosts files
addr_info = getaddrinfo(parsed_url.hostname, 443, proto=IPPROTO_TCP)
ips = [ip_address(info[4][0]) for info in addr_info]
# dns.resolver skips hosts file, but is simpler
# answers = dns.resolver.resolve(parsed_url.hostname)
# ips = [ip_address(ip) for ip in answers]
except Exception as e:
LOGGER.debug("Failed to get IPs for %r: %s", url, e)
return None

if not allow_local and not all(ip.is_global for ip in ips):
LOGGER.debug("Ignoring private URL: %r", url)
return None

# Prevent DNS TTL=0 TOCTOU from making us talk to unchecked IP
ip_host = ("{}" if ips[0].version == 4 else "[{}]").format(ips[0].compressed)
ip_url = urlunparse((parsed_url.scheme, ip_host) + parsed_url[2:])
try:
response = session.get(
ip_url,
stream=True,
verify=verify,
headers={"Host": parsed_url.hostname.strip(".")}, # "example.com."
allow_redirects=False,
)
if response.is_redirect:
LOGGER.debug(
"URL %r redirected to %r", url, response.headers.get("Location")
)
url = response.headers.get("Location")
if url is None:
return None
continue

content_bytes = b''
for chunk in response.iter_content(chunk_size=512):
content_bytes += chunk
if b"</title>" in content_bytes or len(content_bytes) > MAX_BYTES:
break

encoding = None
if "Content-Type" in response.headers:
msg = EmailMessage()
msg["Content-Type"] = response.headers["Content-Type"]
encoding = msg.get_content_charset()
content = content_bytes.decode(encoding or "utf-8", errors="ignore")

# Need to close the connection because we haven't read all the data
response.close()
except requests.exceptions.ConnectionError as e:
LOGGER.debug("Unable to reach URL: %r: %s", url, e)
return None
except (
requests.exceptions.InvalidURL, # e.g. http:///
UnicodeError, # e.g. http://.example.com (urllib3<1.26)
LocationValueError, # e.g. http://.example.com (urllib3>=1.26)
):
LOGGER.debug('Invalid URL: %s', url)
return None
break
else:
LOGGER.debug("Redirects exhausted for %r", original_url)
return None

# Some cleanup that I don't really grok, but was in the original, so
Expand All @@ -461,7 +527,7 @@ def find_title(url: str, verify: bool = True) -> Optional[str]:
# More cryptic regex substitutions. This one looks to be myano's invention.
title = RE_DCC.sub('', title)

return title or None
return (title, parsed_url.hostname)


def get_or_create_shorturl(bot: SopelWrapper, url: str) -> str:
Expand Down
11 changes: 11 additions & 0 deletions test/modules/test_modules_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
"http://example..com/", # empty label
"http://?", # no host
)
PRIVATE_URLS = (
# "https://httpbin.org/redirect-to?url=http://127.0.0.1/", # online
"http://127.1.1.1/",
"http://10.1.1.1/",
"http://169.254.1.1/",
)


@pytest.fixture
Expand Down Expand Up @@ -76,6 +82,11 @@ def test_find_title_invalid(site):
assert url.find_title(site) is None


@pytest.mark.parametrize("site", PRIVATE_URLS)
def test_find_title_private(site):
assert url.find_title(site) is None


def test_check_callbacks(mockbot):
"""Test that check_callbacks works with both new & legacy URL callbacks."""
assert url.check_callbacks(mockbot, 'https://example.com/test')
Expand Down

0 comments on commit a85541e

Please sign in to comment.