Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ConnectionPool state after async cancellation #986

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [Unreleased]

- Fix `ConnectionPool` state after async cancellation.


## Version 1.0.7 (November 15th, 2024)

- Support `proxy=…` configuration on `ConnectionPool()`. (#974)
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_async/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ async def aclose(self) -> None:
async with Trace("close", logger, None, {}):
await self._connection.aclose()

def is_connected(self) -> bool:
return self._connection is not None and self._connection.is_connected()

def is_available(self) -> bool:
if self._connection is None:
# If HTTP/2 support is enabled, and the resulting connection could
Expand Down
4 changes: 4 additions & 0 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,17 @@ def _assign_requests_to_connections(self) -> list[AsyncConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
request_connections = {request.connection for request in self._requests}

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
elif not (connection.is_connected() or connection in request_connections):
# log: "removing garbage connection"
self._connections.remove(connection)
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ async def aclose(self) -> None:
def can_handle_request(self, origin: Origin) -> bool:
return origin == self._origin

def is_connected(self) -> bool:
return not self.is_closed()

def is_available(self) -> bool:
# Note that HTTP/1.1 connections in the "NEW" state are not treated as
# being "available". The control flow which created the connection will
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ async def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int
def can_handle_request(self, origin: Origin) -> bool:
return origin == self._origin

def is_connected(self) -> bool:
return not self.is_closed()

def is_available(self) -> bool:
return (
self._state != HTTPConnectionState.CLOSED
Expand Down
6 changes: 6 additions & 0 deletions httpcore/_async/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ async def aclose(self) -> None:
def info(self) -> str:
return self._connection.info()

def is_connected(self) -> bool:
return self._connection.is_connected()

def is_available(self) -> bool:
return self._connection.is_available()

Expand Down Expand Up @@ -351,6 +354,9 @@ async def aclose(self) -> None:
def info(self) -> str:
return self._connection.info()

def is_connected(self) -> bool:
return self._connection.is_connected()

def is_available(self) -> bool:
return self._connection.is_available()

Expand Down
8 changes: 8 additions & 0 deletions httpcore/_async/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ def info(self) -> str:
def can_handle_request(self, origin: Origin) -> bool:
raise NotImplementedError() # pragma: nocover

def is_connected(self) -> bool:
"""
Return `True` if the connection is open.

Beware that for some implementations `is_connected() != not is_closed()`.
"""
raise NotImplementedError() # pragma: nocover

def is_available(self) -> bool:
"""
Return `True` if the connection is currently able to accept an
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_async/socks_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ async def aclose(self) -> None:
if self._connection is not None:
await self._connection.aclose()

def is_connected(self) -> bool:
return self._connection is not None and self._connection.is_connected()

def is_available(self) -> bool:
if self._connection is None: # pragma: nocover
# If HTTP/2 support is enabled, and the resulting connection could
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_sync/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ def close(self) -> None:
with Trace("close", logger, None, {}):
self._connection.close()

def is_connected(self) -> bool:
return self._connection is not None and self._connection.is_connected()

def is_available(self) -> bool:
if self._connection is None:
# If HTTP/2 support is enabled, and the resulting connection could
Expand Down
4 changes: 4 additions & 0 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,17 @@ def _assign_requests_to_connections(self) -> list[ConnectionInterface]:
those connections to be handled seperately.
"""
closing_connections = []
request_connections = {request.connection for request in self._requests}

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
elif not (connection.is_connected() or connection in request_connections):
# log: "removing garbage connection"
self._connections.remove(connection)
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ def close(self) -> None:
def can_handle_request(self, origin: Origin) -> bool:
return origin == self._origin

def is_connected(self) -> bool:
return not self.is_closed()

def is_available(self) -> bool:
# Note that HTTP/1.1 connections in the "NEW" state are not treated as
# being "available". The control flow which created the connection will
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,9 @@ def _wait_for_outgoing_flow(self, request: Request, stream_id: int) -> int:
def can_handle_request(self, origin: Origin) -> bool:
return origin == self._origin

def is_connected(self) -> bool:
return not self.is_closed()

def is_available(self) -> bool:
return (
self._state != HTTPConnectionState.CLOSED
Expand Down
6 changes: 6 additions & 0 deletions httpcore/_sync/http_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ def close(self) -> None:
def info(self) -> str:
return self._connection.info()

def is_connected(self) -> bool:
return self._connection.is_connected()

def is_available(self) -> bool:
return self._connection.is_available()

Expand Down Expand Up @@ -351,6 +354,9 @@ def close(self) -> None:
def info(self) -> str:
return self._connection.info()

def is_connected(self) -> bool:
return self._connection.is_connected()

def is_available(self) -> bool:
return self._connection.is_available()

Expand Down
8 changes: 8 additions & 0 deletions httpcore/_sync/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ def info(self) -> str:
def can_handle_request(self, origin: Origin) -> bool:
raise NotImplementedError() # pragma: nocover

def is_connected(self) -> bool:
"""
Return `True` if the connection is open.

Beware that for some implementations `is_connected() != not is_closed()`.
"""
raise NotImplementedError() # pragma: nocover

def is_available(self) -> bool:
"""
Return `True` if the connection is currently able to accept an
Expand Down
3 changes: 3 additions & 0 deletions httpcore/_sync/socks_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,9 @@ def close(self) -> None:
if self._connection is not None:
self._connection.close()

def is_connected(self) -> bool:
return self._connection is not None and self._connection.is_connected()

def is_available(self) -> bool:
if self._connection is None: # pragma: nocover
# If HTTP/2 support is enabled, and the resulting connection could
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jinja2==3.1.4

# Packaging
build==1.2.1
twine==5.1.1
twine==6.0.1

# Tests & Linting
coverage[toml]==7.5.4
Expand Down
2 changes: 2 additions & 0 deletions tests/_async/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async def test_http_connection():
origin=origin, network_backend=network_backend, keepalive_expiry=5.0
) as conn:
assert not conn.is_idle()
assert not conn.is_connected()
assert not conn.is_closed()
assert not conn.is_available()
assert not conn.has_expired()
Expand All @@ -52,6 +53,7 @@ async def test_http_connection():
assert response.content == b"Hello, world!"

assert conn.is_idle()
assert conn.is_connected()
assert not conn.is_closed()
assert conn.is_available()
assert not conn.has_expired()
Expand Down
1 change: 1 addition & 0 deletions tests/_async/test_http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ async def test_http11_connection():
assert response.content == b"Hello, world!"

assert conn.is_idle()
assert conn.is_connected()
assert not conn.is_closed()
assert conn.is_available()
assert not conn.has_expired()
Expand Down
1 change: 1 addition & 0 deletions tests/_async/test_http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async def test_http2_connection():

assert conn.is_idle()
assert conn.is_available()
assert conn.is_connected()
assert not conn.is_closed()
assert not conn.has_expired()
assert (
Expand Down
2 changes: 2 additions & 0 deletions tests/_sync/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def test_http_connection():
origin=origin, network_backend=network_backend, keepalive_expiry=5.0
) as conn:
assert not conn.is_idle()
assert not conn.is_connected()
assert not conn.is_closed()
assert not conn.is_available()
assert not conn.has_expired()
Expand All @@ -52,6 +53,7 @@ def test_http_connection():
assert response.content == b"Hello, world!"

assert conn.is_idle()
assert conn.is_connected()
assert not conn.is_closed()
assert conn.is_available()
assert not conn.has_expired()
Expand Down
1 change: 1 addition & 0 deletions tests/_sync/test_http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def test_http11_connection():
assert response.content == b"Hello, world!"

assert conn.is_idle()
assert conn.is_connected()
assert not conn.is_closed()
assert conn.is_available()
assert not conn.has_expired()
Expand Down
1 change: 1 addition & 0 deletions tests/_sync/test_http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def test_http2_connection():

assert conn.is_idle()
assert conn.is_available()
assert conn.is_connected()
assert not conn.is_closed()
assert not conn.has_expired()
assert (
Expand Down
24 changes: 24 additions & 0 deletions tests/test_cancellations.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import typing
from unittest.mock import patch

import anyio
import hpack
Expand Down Expand Up @@ -135,6 +136,29 @@ async def test_connection_pool_timeout_during_response():
assert not pool.connections


@pytest.mark.anyio
async def test_connection_pool_cancellation_during_waiting_for_connection():
"""
A cancellation of ongoing request waiting for a connection should leave
the pool in a consistent state.

In this case, that means the connection will become closed, and no
longer remain in the pool.
"""

async def wait_for_connection(self, *args, **kwargs):
await anyio.sleep(999)

with patch(
"httpcore._async.connection_pool.AsyncPoolRequest.wait_for_connection",
new=wait_for_connection,
):
async with httpcore.AsyncConnectionPool() as pool:
with anyio.move_on_after(0.01):
await pool.request("GET", "http://example.com")
assert not pool.connections


@pytest.mark.anyio
async def test_h11_timeout_during_request():
"""
Expand Down
Loading