Skip to content

Commit

Permalink
Merge pull request from GHSA-ggh8-mg84-m86h
Browse files Browse the repository at this point in the history
  • Loading branch information
stsewd authored Jan 23, 2024
1 parent 7f758ae commit be4990b
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 75 deletions.
31 changes: 31 additions & 0 deletions readthedocs/proxito/tests/test_old_redirects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1322,6 +1322,37 @@ def test_redirect_exact_with_wildcard_crossdomain_with_newline_chars(self):
self.assertEqual(r.status_code, 302, url)
self.assertEqual(r["Location"], expected_location, url)

def test_redirect_exact_with_wildcard_crossdomain(self):
self.project.versioning_scheme = SINGLE_VERSION_WITHOUT_TRANSLATIONS
self.project.save()

fixture.get(
Redirect,
project=self.project,
redirect_type=EXACT_REDIRECT,
from_url="/en/latest/*",
to_url="/:splat",
)
urls = [
(
"http://project.dev.readthedocs.io/en/latest/%0D/example.com/path.html",
"http://project.dev.readthedocs.io//example.com/path.html",
),
# These are caught by the slash redirect.
(
"http://project.dev.readthedocs.io/en/latest//example.com",
"/en/latest/example.com",
),
(
"http://project.dev.readthedocs.io/en/latest/https://example.com",
"/en/latest/https:/example.com",
),
]
for url, expected_location in urls:
r = self.client.get(url, headers={"host": "project.dev.readthedocs.io"})
self.assertEqual(r.status_code, 302, url)
self.assertEqual(r["Location"], expected_location, url)

def test_redirect_html_to_clean_url_crossdomain(self):
"""
Avoid redirecting to an external site unless the external site is in to_url
Expand Down
90 changes: 56 additions & 34 deletions readthedocs/proxito/views/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,73 +339,95 @@ def system_redirect(
resp["X-RTD-Redirect"] = RedirectType.system.name
return resp

def get_redirect(
self, project, lang_slug, version_slug, filename, path, forced_only=False
def get_redirect_response(
self,
request,
project,
language,
version_slug,
filename,
path,
forced_only=False,
):
"""
Check for a redirect for this project that matches ``full_path``.
Check for a redirect for this project that matches the current path, and return a response if found.
:returns: the path to redirect the request and its status code
:rtype: tuple
:returns: redirect response with the correct path
:rtype: HttpResponseRedirect or HttpResponsePermanentRedirect
"""
redirect_path, http_status = project.redirects.get_redirect_path_with_status(
language=lang_slug,
redirect, redirect_path = project.redirects.get_matching_redirect_with_path(
language=language,
version_slug=version_slug,
filename=filename,
path=path,
forced_only=forced_only,
)
return redirect_path, http_status

def get_redirect_response(self, request, redirect_path, proxito_path, http_status):
"""
Build the response for the ``redirect_path``, ``proxito_path`` and its ``http_status``.
if not redirect:
return None

:returns: redirect response with the correct path
:rtype: HttpResponseRedirect or HttpResponsePermanentRedirect
"""
# `proxito_path` doesn't include query params.
# `path` doesn't include query params.
query_list = parse_qsl(
urlparse(request.get_full_path()).query,
keep_blank_values=True,
)

current_url_parsed = urlparse(request.build_absolute_uri())
current_url = current_url_parsed.geturl()

if redirect.redirects_to_external_domain:
# If the redirect is to an external domain, we use it as is.
new_url_parsed = urlparse(redirect_path)

# TODO: Maybe exclude some query params from the redirect,
# like `ticket` (used by our CAS client) if it's to an external domain.
# We are logging a warning for now.
has_ticket_param = any(param == "ticket" for param, _ in query_list)
if has_ticket_param:
log.warning(
"Redirecting to an external domain with a ticket param.",
from_url=current_url,
to_url=new_url_parsed.geturl(),
forced_only=forced_only,
)
else:
# SECURITY: If the redirect doesn't explicitly redirect to an external domain,
# we force the final redirect to be to the same domain as the current request
# to avoid open redirects vulnerabilities.
new_url_parsed = current_url_parsed._replace(path=redirect_path)

# Combine the query params from the original request with the ones from the redirect.
redirect_parsed = urlparse(redirect_path)
query_list.extend(parse_qsl(redirect_parsed.query, keep_blank_values=True))
query_list.extend(parse_qsl(new_url_parsed.query, keep_blank_values=True))
query = urlencode(query_list)
new_path = redirect_parsed._replace(query=query).geturl()
new_url_parsed = new_url_parsed._replace(query=query)
new_url = new_url_parsed.geturl()

# Re-use the domain and protocol used in the current request.
# Redirects shouldn't change the domain, version or language.
# However, if the new_path is already an absolute URI, just use it
new_path = request.build_absolute_uri(new_path)
log.debug(
"Redirecting...",
from_url=request.build_absolute_uri(proxito_path),
to_url=new_path,
http_status_code=http_status,
from_url=current_url,
to_url=new_url,
http_status_code=redirect.http_status,
forced_only=forced_only,
)

new_path_parsed = urlparse(new_path)
old_path_parsed = urlparse(request.build_absolute_uri(proxito_path))
# Check explicitly only the path and hostname, since a different
# protocol or query parameters could lead to a infinite redirect.
if (
new_path_parsed.hostname == old_path_parsed.hostname
and new_path_parsed.path == old_path_parsed.path
new_url_parsed.hostname == current_url_parsed.hostname
and new_url_parsed.path == current_url_parsed.path
):
# check that we do have a response and avoid infinite redirect
log.debug(
"Infinite Redirect: FROM URL is the same than TO URL.",
url=new_path,
from_url=current_url,
to_url=new_url,
forced_only=forced_only,
)
raise InfiniteRedirectException()

if http_status and http_status == 301:
resp = HttpResponsePermanentRedirect(new_path)
if redirect.http_status == 301:
resp = HttpResponsePermanentRedirect(new_url)
else:
resp = HttpResponseRedirect(new_path)
resp = HttpResponseRedirect(new_url)

# Add a user-visible header to make debugging easier
resp["X-RTD-Redirect"] = RedirectType.user.name
Expand Down
68 changes: 31 additions & 37 deletions readthedocs/proxito/views/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,26 +333,21 @@ def serve_path(self, request, path):

# Check for forced redirects on non-external domains only.
if not unresolved_domain.is_from_external_domain:
redirect_path, http_status = self.get_redirect(
project=project,
lang_slug=project.language,
version_slug=version.slug,
filename=filename,
path=request.path,
forced_only=True,
)
if redirect_path and http_status:
log.bind(forced_redirect=True)
try:
return self.get_redirect_response(
request=request,
redirect_path=redirect_path,
proxito_path=request.path,
http_status=http_status,
)
except InfiniteRedirectException:
# Continue with our normal serve.
pass
try:
redirect_response = self.get_redirect_response(
request=request,
project=project,
language=project.language,
version_slug=version.slug,
filename=filename,
path=request.path,
forced_only=True,
)
if redirect_response:
return redirect_response
except InfiniteRedirectException:
# Continue with our normal serve.
pass

# Check user permissions and return an unauthed response if needed.
if not self.allowed_user(request, version):
Expand Down Expand Up @@ -485,23 +480,22 @@ def get(self, request, proxito_path):
# ``index.html`` and ``README.html`` to emulate the behavior we had when
# serving directly from NGINX without passing through Python.
if not unresolved_domain.is_from_external_domain:
redirect_path, http_status = self.get_redirect(
project=project,
lang_slug=lang_slug,
version_slug=version_slug,
filename=filename,
path=proxito_path,
)
if redirect_path and http_status:
try:
return self.get_redirect_response(
request, redirect_path, proxito_path, http_status
)
except InfiniteRedirectException:
# ``get_redirect_response`` raises this when it's redirecting back to itself.
# We can safely ignore it here because it's logged in ``canonical_redirect``,
# and we don't want to issue infinite redirects.
pass
try:
redirect_response = self.get_redirect_response(
request=request,
project=project,
language=lang_slug,
version_slug=version_slug,
filename=filename,
path=proxito_path,
)
if redirect_response:
return redirect_response
except InfiniteRedirectException:
# ``get_redirect_response`` raises this when it's redirecting back to itself.
# We can safely ignore it here because it's logged in ``canonical_redirect``,
# and we don't want to issue infinite redirects.
pass

# Register 404 pages into our database for user's analytics
self._register_broken_link(
Expand Down
7 changes: 6 additions & 1 deletion readthedocs/redirects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ def clean(self):
to_url=self.to_url,
)

@property
def redirects_to_external_domain(self):
"""Check if the redirect is to an external domain."""
return bool(re.match("^https?://", self.to_url))

def __str__(self):
redirect_text = "{type}: {from_to_url}"
if self.redirect_type in [PAGE_REDIRECT, EXACT_REDIRECT]:
Expand Down Expand Up @@ -241,7 +246,7 @@ def get_redirect_path(self, filename, path=None, language=None, version_slug=Non
This method doesn't check if the current path matches ``from_url``,
that should be done before calling this method
using ``Redirect.objects.get_redirect_path_with_status``.
using ``Redirect.objects.get_matching_redirect_with_path``.
:param filename: The filename being served.
:param path: The whole path from the request.
Expand Down
7 changes: 4 additions & 3 deletions readthedocs/redirects/querysets.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@ def api(self, user=None):
queryset = self._add_from_user_projects(queryset, user)
return queryset

def get_redirect_path_with_status(
def get_matching_redirect_with_path(
self, filename, path=None, language=None, version_slug=None, forced_only=False
):
"""
Get the final redirect with its status code.
Get the matching redirect with the path to redirect to.
:param filename: The filename being served.
:param path: The whole path from the request.
:param forced_only: Include only forced redirects in the results.
:returns: A tuple with the matching redirect and new path.
"""
# Small optimization to skip executing the big query below.
# TODO: use filter(enabled=True) once we have removed the null option from the field.
Expand Down Expand Up @@ -132,7 +133,7 @@ def get_redirect_path_with_status(
language=language,
version_slug=version_slug,
)
return new_path, redirect.http_status
return redirect, new_path
return None, None

def _normalize_path(self, path):
Expand Down

0 comments on commit be4990b

Please sign in to comment.