diff --git a/CHANGELOG.md b/CHANGELOG.md
index de62ceea..193295fd 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -37,6 +37,7 @@ The project versioning policy is now explicitly governed by SEMVER. See https://
 
 - Support async cancellations, ensuring that the connection pool is left in a clean state when cancellations occur. (#726)
 - The networking backend interface has [been added to the public API](https://www.encode.io/httpcore/network-backends). Some classes which were previously private implementation detail are now part of the top-level public API. (#699)
+- Add some connection pool events for trace extension. (#702)
 - Graceful handling of HTTP/2 GoAway frames, with requests being transparently retried on a new connection. (#730)
 - Add exceptions when a synchronous `trace callback` is passed to an asynchronous request or an asynchronous `trace callback` is passed to a synchronous request. (#717)
 - Drop Python 3.7 support. (#727)
diff --git a/docs/extensions.md b/docs/extensions.md
index 2bec844e..145a9204 100644
--- a/docs/extensions.md
+++ b/docs/extensions.md
@@ -119,6 +119,7 @@ The `event_name` and `info` arguments here will be one of the following:
 * `{event_type}.{event_name}.started`, `<dictionary of keyword arguments>`
 * `{event_type}.{event_name}.complete`, `{"return_value": <...>}`
 * `{event_type}.{event_name}.failed`, `{"exception": <...>}`
+* `{event_type}.{event_name}`, `<dictionary of keyword arguments>`
 
 Note that when using the async variant of `httpcore` the handler function passed to `"trace"` must be an `async def ...` function.
 
@@ -130,6 +131,16 @@ The following event types are currently exposed...
 * `"connection.connect_unix_socket"`
 * `"connection.start_tls"`
 
+**Connection pool events**
+
+* `"connection_pool.add_request"`
+* `"connection_pool.remove_request"`
+* `"connection_pool.add_connection"`
+* `"connection_pool.reuse_connection"`
+* `"connection_pool.assign_request_to_connection"`
+* `"connection_pool.unassign_request_from_connection"`
+* `"connection_pool.timeout_waiting_for_connection"`
+
 **HTTP/1.1 events**
 
 * `"http11.send_request_headers"`
diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py
index 0320c6d8..b24bcd7e 100644
--- a/httpcore/_async/connection_pool.py
+++ b/httpcore/_async/connection_pool.py
@@ -1,3 +1,4 @@
+import logging
 import ssl
 import sys
 import time
@@ -9,9 +10,12 @@
 from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol
 from .._models import Origin, Request, Response
 from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation
+from .._trace import atrace
 from .connection import AsyncHTTPConnection
 from .interfaces import AsyncConnectionInterface, AsyncRequestInterface
 
+logger = logging.getLogger("httpcore.connection_pool")
+
 
 class RequestStatus:
     def __init__(self, request: Request):
@@ -162,8 +166,14 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
         # Reuse an existing connection if one is currently available.
         for idx, connection in enumerate(self._pool):
             if connection.can_handle_request(origin) and connection.is_available():
+                kwargs = {"connection": connection, "request": status.request}
+                await atrace("reuse_connection", logger, status.request, kwargs)
                 self._pool.pop(idx)
                 self._pool.insert(0, connection)
+                kwargs = {"request": status.request, "connection": connection}
+                await atrace(
+                    "assign_request_to_connection", logger, status.request, kwargs
+                )
                 status.set_connection(connection)
                 return True
 
@@ -172,6 +182,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
             for idx, connection in reversed(list(enumerate(self._pool))):
                 if connection.is_idle():
                     await connection.aclose()
+                    await atrace(
+                        "remove_connection",
+                        logger,
+                        None,
+                        kwargs={"connection": connection},
+                    )
                     self._pool.pop(idx)
                     break
 
@@ -181,7 +197,12 @@ async def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
 
         # Otherwise create a new connection.
         connection = self.create_connection(origin)
+        await atrace(
+            "add_connection", logger, status.request, kwargs={"connection": connection}
+        )
         self._pool.insert(0, connection)
+        kwargs = {"request": status.request, "connection": connection}
+        await atrace("assign_request_to_connection", logger, status.request, kwargs)
         status.set_connection(connection)
         return True
 
@@ -193,6 +214,9 @@ async def _close_expired_connections(self) -> None:
         for idx, connection in reversed(list(enumerate(self._pool))):
             if connection.has_expired():
                 await connection.aclose()
+                await atrace(
+                    "remove_connection", logger, None, kwargs={"connection": connection}
+                )
                 self._pool.pop(idx)
 
         # If the pool size exceeds the maximum number of allowed keep-alive connections,
@@ -201,6 +225,9 @@ async def _close_expired_connections(self) -> None:
         for idx, connection in reversed(list(enumerate(self._pool))):
             if connection.is_idle() and pool_size > self._max_keepalive_connections:
                 await connection.aclose()
+                await atrace(
+                    "remove_connection", logger, None, kwargs={"connection": connection}
+                )
                 self._pool.pop(idx)
                 pool_size -= 1
 
@@ -230,6 +257,7 @@ async def handle_async_request(self, request: Request) -> Response:
             deadline = float("inf")
 
         async with self._pool_lock:
+            await atrace("add_request", logger, request, {"request": status.request})
             self._requests.append(status)
             await self._close_expired_connections()
             await self._attempt_to_acquire_connection(status)
@@ -241,9 +269,22 @@ async def handle_async_request(self, request: Request) -> Response:
                 # If we timeout here, or if the task is cancelled, then make
                 # sure to remove the request from the queue before bubbling
                 # up the exception.
+                if isinstance(exc, PoolTimeout):
+                    await atrace(
+                        "timeout_waiting_for_connection",
+                        logger,
+                        status.request,
+                        {"request": status.request},
+                    )
                 async with self._pool_lock:
                     # Ensure only remove when task exists.
                     if status in self._requests:
+                        await atrace(
+                            "remove_request",
+                            logger,
+                            status.request,
+                            {"request": status.request},
+                        )
                         self._requests.remove(status)
                     raise exc
 
@@ -260,6 +301,13 @@ async def handle_async_request(self, request: Request) -> Response:
                 async with self._pool_lock:
                     # Maintain our position in the request queue, but reset the
                     # status so that the request becomes queued again.
+                    kwargs = {"request": status.request, "connection": connection}
+                    await atrace(
+                        "unassign_request_from_connection",
+                        logger,
+                        status.request,
+                        kwargs,
+                    )
                     status.unset_connection()
                     await self._attempt_to_acquire_connection(status)
             except BaseException as exc:
@@ -296,9 +344,21 @@ async def response_closed(self, status: RequestStatus) -> None:
         async with self._pool_lock:
             # Update the state of the connection pool.
             if status in self._requests:
+                await atrace(
+                    "remove_request",
+                    logger,
+                    status.request,
+                    {"request": status.request},
+                )
                 self._requests.remove(status)
 
             if connection.is_closed() and connection in self._pool:
+                await atrace(
+                    "remove_connection",
+                    logger,
+                    None,
+                    kwargs={"connection": connection},
+                )
                 self._pool.remove(connection)
 
             # Since we've had a response closed, it's possible we'll now be able
diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py
index ccfb8d22..5f33d930 100644
--- a/httpcore/_sync/connection_pool.py
+++ b/httpcore/_sync/connection_pool.py
@@ -1,3 +1,4 @@
+import logging
 import ssl
 import sys
 import time
@@ -9,9 +10,12 @@
 from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol
 from .._models import Origin, Request, Response
 from .._synchronization import Event, Lock, ShieldCancellation
+from .._trace import trace
 from .connection import HTTPConnection
 from .interfaces import ConnectionInterface, RequestInterface
 
+logger = logging.getLogger("httpcore.connection_pool")
+
 
 class RequestStatus:
     def __init__(self, request: Request):
@@ -162,8 +166,14 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
         # Reuse an existing connection if one is currently available.
         for idx, connection in enumerate(self._pool):
             if connection.can_handle_request(origin) and connection.is_available():
+                kwargs = {"connection": connection, "request": status.request}
+                trace("reuse_connection", logger, status.request, kwargs)
                 self._pool.pop(idx)
                 self._pool.insert(0, connection)
+                kwargs = {"request": status.request, "connection": connection}
+                trace(
+                    "assign_request_to_connection", logger, status.request, kwargs
+                )
                 status.set_connection(connection)
                 return True
 
@@ -172,6 +182,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
             for idx, connection in reversed(list(enumerate(self._pool))):
                 if connection.is_idle():
                     connection.close()
+                    trace(
+                        "remove_connection",
+                        logger,
+                        None,
+                        kwargs={"connection": connection},
+                    )
                     self._pool.pop(idx)
                     break
 
@@ -181,7 +197,12 @@ def _attempt_to_acquire_connection(self, status: RequestStatus) -> bool:
 
         # Otherwise create a new connection.
         connection = self.create_connection(origin)
+        trace(
+            "add_connection", logger, status.request, kwargs={"connection": connection}
+        )
         self._pool.insert(0, connection)
+        kwargs = {"request": status.request, "connection": connection}
+        trace("assign_request_to_connection", logger, status.request, kwargs)
         status.set_connection(connection)
         return True
 
@@ -193,6 +214,9 @@ def _close_expired_connections(self) -> None:
         for idx, connection in reversed(list(enumerate(self._pool))):
             if connection.has_expired():
                 connection.close()
+                trace(
+                    "remove_connection", logger, None, kwargs={"connection": connection}
+                )
                 self._pool.pop(idx)
 
         # If the pool size exceeds the maximum number of allowed keep-alive connections,
@@ -201,6 +225,9 @@ def _close_expired_connections(self) -> None:
         for idx, connection in reversed(list(enumerate(self._pool))):
             if connection.is_idle() and pool_size > self._max_keepalive_connections:
                 connection.close()
+                trace(
+                    "remove_connection", logger, None, kwargs={"connection": connection}
+                )
                 self._pool.pop(idx)
                 pool_size -= 1
 
@@ -230,6 +257,7 @@ def handle_request(self, request: Request) -> Response:
             deadline = float("inf")
 
         with self._pool_lock:
+            trace("add_request", logger, request, {"request": status.request})
             self._requests.append(status)
             self._close_expired_connections()
             self._attempt_to_acquire_connection(status)
@@ -241,9 +269,22 @@ def handle_request(self, request: Request) -> Response:
                 # If we timeout here, or if the task is cancelled, then make
                 # sure to remove the request from the queue before bubbling
                 # up the exception.
+                if isinstance(exc, PoolTimeout):
+                    trace(
+                        "timeout_waiting_for_connection",
+                        logger,
+                        status.request,
+                        {"request": status.request},
+                    )
                 with self._pool_lock:
                     # Ensure only remove when task exists.
                     if status in self._requests:
+                        trace(
+                            "remove_request",
+                            logger,
+                            status.request,
+                            {"request": status.request},
+                        )
                         self._requests.remove(status)
                     raise exc
 
@@ -260,6 +301,13 @@ def handle_request(self, request: Request) -> Response:
                 with self._pool_lock:
                     # Maintain our position in the request queue, but reset the
                     # status so that the request becomes queued again.
+                    kwargs = {"request": status.request, "connection": connection}
+                    trace(
+                        "unassign_request_from_connection",
+                        logger,
+                        status.request,
+                        kwargs,
+                    )
                     status.unset_connection()
                     self._attempt_to_acquire_connection(status)
             except BaseException as exc:
@@ -296,9 +344,21 @@ def response_closed(self, status: RequestStatus) -> None:
         with self._pool_lock:
             # Update the state of the connection pool.
             if status in self._requests:
+                trace(
+                    "remove_request",
+                    logger,
+                    status.request,
+                    {"request": status.request},
+                )
                 self._requests.remove(status)
 
             if connection.is_closed() and connection in self._pool:
+                trace(
+                    "remove_connection",
+                    logger,
+                    None,
+                    kwargs={"connection": connection},
+                )
                 self._pool.remove(connection)
 
             # Since we've had a response closed, it's possible we'll now be able
diff --git a/httpcore/_trace.py b/httpcore/_trace.py
index b122a53e..4ef41121 100644
--- a/httpcore/_trace.py
+++ b/httpcore/_trace.py
@@ -6,6 +6,56 @@
 from ._models import Request
 
 
+def trace(
+    name: str,
+    logger: logging.Logger,
+    request: Optional[Request] = None,
+    kwargs: Optional[Dict[str, Any]] = None,
+) -> None:
+    trace_extension = None if request is None else request.extensions.get("trace")
+    prefix = logger.name.split(".")[-1]
+    info = kwargs or {}
+    debug = logger.isEnabledFor(logging.DEBUG)
+
+    if debug or trace_extension:
+        if trace_extension is not None:
+            prefix_and_name = f"{prefix}.{name}"
+            trace_extension(prefix_and_name, info)
+
+        if debug:
+            if not info:  # pragma: no cover
+                message = name
+            else:
+                args = " ".join([f"{key}={value!r}" for key, value in info.items()])
+                message = f"{name} {args}"
+            logger.debug(message)
+
+
+async def atrace(
+    name: str,
+    logger: logging.Logger,
+    request: Optional[Request] = None,
+    kwargs: Optional[Dict[str, Any]] = None,
+) -> None:
+    trace_extension = None if request is None else request.extensions.get("trace")
+    prefix = logger.name.split(".")[-1]
+    info = kwargs or {}
+    debug = logger.isEnabledFor(logging.DEBUG)
+
+    if debug or trace_extension:
+        if trace_extension is not None:
+            prefix_and_name = f"{prefix}.{name}"
+            await trace_extension(prefix_and_name, info)
+
+        if debug:
+            if not info:  # pragma: no cover
+                message = name
+            else:
+                args = " ".join([f"{key}={value!r}" for key, value in info.items()])
+                message = f"{name} {args}"
+            logger.debug(message)
+
+
 class Trace:
     def __init__(
         self,
diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py
index 2392ca17..fcb33a92 100644
--- a/tests/_async/test_connection_pool.py
+++ b/tests/_async/test_connection_pool.py
@@ -255,6 +255,9 @@ async def trace(name, kwargs):
         await pool.request("GET", "https://example.com/", extensions={"trace": trace})
 
     assert called == [
+        "connection_pool.add_request",
+        "connection_pool.add_connection",
+        "connection_pool.assign_request_to_connection",
         "connection.connect_tcp.started",
         "connection.connect_tcp.complete",
         "connection.start_tls.started",
@@ -269,6 +272,7 @@ async def trace(name, kwargs):
         "http11.receive_response_body.complete",
         "http11.response_closed.started",
         "http11.response_closed.complete",
+        "connection_pool.remove_request",
     ]
 
 
@@ -294,10 +298,27 @@ async def test_debug_request(caplog):
         await pool.request("GET", "http://example.com/")
 
     assert caplog.record_tuples == [
+        (
+            "httpcore.connection_pool",
+            logging.DEBUG,
+            "add_request request=<Request [b'GET']>",
+        ),
+        (
+            "httpcore.connection_pool",
+            logging.DEBUG,
+            "add_connection connection=<AsyncHTTPConnection [CONNECTING]>",
+        ),
+        (
+            "httpcore.connection_pool",
+            logging.DEBUG,
+            "assign_request_to_connection request=<Request [b'GET']> "
+            "connection=<AsyncHTTPConnection [CONNECTING]>",
+        ),
         (
             "httpcore.connection",
             logging.DEBUG,
-            "connect_tcp.started host='example.com' port=80 local_address=None timeout=None socket_options=None",
+            "connect_tcp.started host='example.com' port=80 local_address=None "
+            "timeout=None socket_options=None",
         ),
         (
             "httpcore.connection",
@@ -324,8 +345,8 @@ async def test_debug_request(caplog):
         (
             "httpcore.http11",
             logging.DEBUG,
-            "receive_response_headers.complete return_value="
-            "(b'HTTP/1.1', 200, b'OK', [(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])",
+            "receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', "
+            "[(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])",
         ),
         (
             "httpcore.http11",
@@ -335,6 +356,11 @@ async def test_debug_request(caplog):
         ("httpcore.http11", logging.DEBUG, "receive_response_body.complete"),
         ("httpcore.http11", logging.DEBUG, "response_closed.started"),
         ("httpcore.http11", logging.DEBUG, "response_closed.complete"),
+        (
+            "httpcore.connection_pool",
+            logging.DEBUG,
+            "remove_request request=<Request [b'GET']>",
+        ),
         ("httpcore.connection", logging.DEBUG, "close.started"),
         ("httpcore.connection", logging.DEBUG, "close.complete"),
     ]
@@ -364,6 +390,9 @@ async def trace(name, kwargs):
         assert info == []
 
     assert called == [
+        "connection_pool.add_request",
+        "connection_pool.add_connection",
+        "connection_pool.assign_request_to_connection",
         "connection.connect_tcp.started",
         "connection.connect_tcp.complete",
         "connection.start_tls.started",
@@ -376,6 +405,7 @@ async def trace(name, kwargs):
         "http11.receive_response_headers.failed",
         "http11.response_closed.started",
         "http11.response_closed.complete",
+        "connection_pool.remove_request",
     ]
 
 
@@ -417,8 +447,12 @@ async def trace(name, kwargs):
         assert info == []
 
     assert called == [
+        "connection_pool.add_request",
+        "connection_pool.add_connection",
+        "connection_pool.assign_request_to_connection",
         "connection.connect_tcp.started",
         "connection.connect_tcp.failed",
+        "connection_pool.remove_request",
     ]
 
 
diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py
index 287c2bcc..ad2b20de 100644
--- a/tests/_sync/test_connection_pool.py
+++ b/tests/_sync/test_connection_pool.py
@@ -255,6 +255,9 @@ def trace(name, kwargs):
         pool.request("GET", "https://example.com/", extensions={"trace": trace})
 
     assert called == [
+        "connection_pool.add_request",
+        "connection_pool.add_connection",
+        "connection_pool.assign_request_to_connection",
         "connection.connect_tcp.started",
         "connection.connect_tcp.complete",
         "connection.start_tls.started",
@@ -269,6 +272,7 @@ def trace(name, kwargs):
         "http11.receive_response_body.complete",
         "http11.response_closed.started",
         "http11.response_closed.complete",
+        "connection_pool.remove_request",
     ]
 
 
@@ -294,10 +298,27 @@ def test_debug_request(caplog):
         pool.request("GET", "http://example.com/")
 
     assert caplog.record_tuples == [
+        (
+            "httpcore.connection_pool",
+            logging.DEBUG,
+            "add_request request=<Request [b'GET']>",
+        ),
+        (
+            "httpcore.connection_pool",
+            logging.DEBUG,
+            "add_connection connection=<HTTPConnection [CONNECTING]>",
+        ),
+        (
+            "httpcore.connection_pool",
+            logging.DEBUG,
+            "assign_request_to_connection request=<Request [b'GET']> "
+            "connection=<HTTPConnection [CONNECTING]>",
+        ),
         (
             "httpcore.connection",
             logging.DEBUG,
-            "connect_tcp.started host='example.com' port=80 local_address=None timeout=None socket_options=None",
+            "connect_tcp.started host='example.com' port=80 local_address=None "
+            "timeout=None socket_options=None",
         ),
         (
             "httpcore.connection",
@@ -324,8 +345,8 @@ def test_debug_request(caplog):
         (
             "httpcore.http11",
             logging.DEBUG,
-            "receive_response_headers.complete return_value="
-            "(b'HTTP/1.1', 200, b'OK', [(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])",
+            "receive_response_headers.complete return_value=(b'HTTP/1.1', 200, b'OK', "
+            "[(b'Content-Type', b'plain/text'), (b'Content-Length', b'13')])",
         ),
         (
             "httpcore.http11",
@@ -335,6 +356,11 @@ def test_debug_request(caplog):
         ("httpcore.http11", logging.DEBUG, "receive_response_body.complete"),
         ("httpcore.http11", logging.DEBUG, "response_closed.started"),
         ("httpcore.http11", logging.DEBUG, "response_closed.complete"),
+        (
+            "httpcore.connection_pool",
+            logging.DEBUG,
+            "remove_request request=<Request [b'GET']>",
+        ),
         ("httpcore.connection", logging.DEBUG, "close.started"),
         ("httpcore.connection", logging.DEBUG, "close.complete"),
     ]
@@ -364,6 +390,9 @@ def trace(name, kwargs):
         assert info == []
 
     assert called == [
+        "connection_pool.add_request",
+        "connection_pool.add_connection",
+        "connection_pool.assign_request_to_connection",
         "connection.connect_tcp.started",
         "connection.connect_tcp.complete",
         "connection.start_tls.started",
@@ -376,6 +405,7 @@ def trace(name, kwargs):
         "http11.receive_response_headers.failed",
         "http11.response_closed.started",
         "http11.response_closed.complete",
+        "connection_pool.remove_request",
     ]
 
 
@@ -417,8 +447,12 @@ def trace(name, kwargs):
         assert info == []
 
     assert called == [
+        "connection_pool.add_request",
+        "connection_pool.add_connection",
+        "connection_pool.assign_request_to_connection",
         "connection.connect_tcp.started",
         "connection.connect_tcp.failed",
+        "connection_pool.remove_request",
     ]
 
 
diff --git a/unasync.py b/unasync.py
index 5a5627d7..0dd0b9d0 100755
--- a/unasync.py
+++ b/unasync.py
@@ -24,6 +24,7 @@
     ('@pytest.mark.anyio', ''),
     ('@pytest.mark.trio', ''),
     ('AutoBackend', 'SyncBackend'),
+    ('atrace', 'trace'),
 ]
 COMPILED_SUBS = [
     (re.compile(r'(^|\b)' + regex + r'($|\b)'), repl)