From b2c3d8f631baa9a6393f5810271b4a5d7c731626 Mon Sep 17 00:00:00 2001 From: Jacob Quinn Date: Thu, 27 Apr 2023 10:23:42 -0600 Subject: [PATCH] Summary of changes in this PR: (#1034) * Summary of changes in this PR: * Finally achieving my dream of moving the connection pool out of HTTP; it's going to live in the [ConcurrentUtilities.jl](https://github.com/JuliaServices/ConcurrentUtilities.jl/pull/18) package instead. In short, it had no unit tests, scant documentation, and was generally a pain to deal with in HTTP. We also discovered at least 2 major issues with the current implementation during a deep dive into performance and issue diagnosis, including: * If a ConnectError was thrown while creating a connection, a "permit" to the pool was permanently lost; get enough of them and the entire connection pool grinds to a halt :grimace: * Being a threadsafe structure, the pool had the poor choice of holding the pool lock for the duration of trying to get a permit _and making new connections_. This meant that in the case of a connection taking a long time to be made, we were essentially holding the rest of the pool hostage. This is totally unnecessary and can cause big performance issues in really heavy workloads where there's lots of contention on the pool for managing requests. The new implementation in ConcurrentUtilities.jl solves both these problems while being about 1/4th the LOC of the previous implementation. And it has unit tests! yay! All in all, I think #1033 and #1032 should both be mostly resolved by these changes/updates. * Relatedly, we're adjusting the API for connection pools to allow the user to pass in their _own_ connection pool to be used for that request (to check for a connection to reuse and to return the connection to after completion). A pool can be constructed like `HTTP.Pool(; max::Int)` and passed to any of the `HTTP.request` methods like `HTTP.get(...; pool=pool)`. HTTP has its own global default pool `HTTP.Connections.POOL` that it uses by default to manage connection reuse. The `HTTP.set_default_connection_limit!` will still work as long as it is called before any requests are made. Calling it _after_ requests have been made will be a no-op. The `connection_limit` keyword arg is now formally deprecated and will issue a warning if passed. I'm comfortable with a full deprecation here because it turns out it wasn't even really working before anyway (unless it was passed/used on _every_ request and never changed). So instead of "changing" things, we're really just doing a proper implementation that now actually works, has better behavior, and is actually controllable by the user. * Add a try-finally in keepalive! around our global IO lock usage just for good house-keeping * Refactored `try_with_timeout` to use a `Channel` instead of the non-threaded `@async`; it's much simpler and seems cleaner * I refactored a few of the stream IO functions so that we always know the number of bytes downloaded, whether in memory or written to an IO, so we can log them and use them in verbose logging to give bit-rate calculations * Added a new `logerrors::Bool=false` keyword arg that allows doing `@error` logs on errors that may otherwise be "swallowed" when doing retries; it can be helpful to sometimes be able to at least see what kinds of errors are happening; also cleaned up our error handling in general so we don't lose backtraces which fixes #1003. * Added lots of metrics around various time spent in various layers, read vs. write durations, etc. These can be enabled, and stored in the request context, by passing `observelayers=true` This mostly resolves #1025 and #1019. * Fixed some missing keyword args that either weren't correct in the inline docs or weren't included in the client.md docs * Removed a few client-side layers (BasicAuth, DebugRequest, Canonicalize, etc.) since their implementations were _really_ trivial and moved their functionality into a single HeadersRequest layer where we now do all the header-related work during the request. This has the affect of _drastically_ reducing the exploding backtraces we now have when doing requests because of our "functional style" client-side layering. * Fixed #612 by ensuring `keepalive` is included in our `connectionkey` computation so only connections that specified `keepalive` will be reused if its passed the same value on subsequent requests * Update based on new Pool chnages * Updates * cleanup * Put back in exception unwrapping * Address PR review --- Project.toml | 2 + docs/src/client.md | 24 +- docs/src/reference.md | 8 +- src/{ConnectionPool.jl => Connections.jl} | 222 ++++++++----- src/Exceptions.jl | 43 +-- src/HTTP.jl | 78 +++-- src/Messages.jl | 2 +- src/Servers.jl | 4 +- src/Streams.jl | 24 +- src/WebSockets.jl | 2 +- src/clientlayers/BasicAuthRequest.jl | 26 -- src/clientlayers/CanonicalizeRequest.jl | 27 -- src/clientlayers/ConnectionRequest.jl | 36 ++- src/clientlayers/ContentTypeRequest.jl | 27 -- src/clientlayers/CookieRequest.jl | 2 +- src/clientlayers/DebugRequest.jl | 28 -- src/clientlayers/ExceptionRequest.jl | 12 +- ...ultHeadersRequest.jl => HeadersRequest.jl} | 44 ++- src/clientlayers/MessageRequest.jl | 17 +- src/clientlayers/RedirectRequest.jl | 2 +- src/clientlayers/RetryRequest.jl | 4 +- src/clientlayers/StreamRequest.jl | 69 ++-- src/clientlayers/TimeoutRequest.jl | 22 +- src/connectionpools.jl | 296 ------------------ test/async.jl | 8 +- test/benchmark.jl | 2 +- test/client.jl | 18 +- test/loopback.jl | 10 +- test/runtests.jl | 2 +- test/server.jl | 2 +- test/try_with_timeout.jl | 33 +- test/websockets/autobahn.jl | 2 +- 32 files changed, 448 insertions(+), 650 deletions(-) rename src/{ConnectionPool.jl => Connections.jl} (72%) delete mode 100644 src/clientlayers/BasicAuthRequest.jl delete mode 100644 src/clientlayers/CanonicalizeRequest.jl delete mode 100644 src/clientlayers/ContentTypeRequest.jl delete mode 100644 src/clientlayers/DebugRequest.jl rename src/clientlayers/{DefaultHeadersRequest.jl => HeadersRequest.jl} (57%) delete mode 100644 src/connectionpools.jl diff --git a/Project.toml b/Project.toml index 6acf839b0..a31de49c2 100644 --- a/Project.toml +++ b/Project.toml @@ -6,6 +6,7 @@ version = "1.7.4" [deps] Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193" +ConcurrentUtilities = "f0e56b4a-5159-44fe-b623-3e5288b988bb" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36" @@ -20,6 +21,7 @@ UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" [compat] CodecZlib = "0.7" +ConcurrentUtilities = "2.1" LoggingExtras = "0.4.9,1" MbedTLS = "0.6.8, 0.7, 1" OpenSSL = "1.3" diff --git a/docs/src/client.md b/docs/src/client.md index f1e23c431..4190037ab 100644 --- a/docs/src/client.md +++ b/docs/src/client.md @@ -91,18 +91,26 @@ If you're running into a real head-scratcher, don't hesitate to [open an issue]( When a connection is attempted to a remote host, sometimes the connection is unable to be established for whatever reason. Passing a non-zero `connect_timetout` value will cause `HTTP.request` to wait that many seconds before giving up and throwing an error. -#### `connection_limit` +#### `pool` -Many remote web services/APIs have rate limits or throttling in place to avoid bad actors from abusing their service. They may prevent too many requests over a time period or they may prevent too many connections being simultaneously open from the same client. By default, when `HTTP.request` opens a remote connection, it remembers the exact host:port combination and will keep the connection open to be reused by subsequent requests to the same host:port. The `connection_limit` keyword argument controls how many concurrent connections are allowed to a single remote host. +Many remote web services/APIs have rate limits or throttling in place to avoid bad actors from abusing their service. They may prevent too many requests over a time period or they may prevent too many connections being simultaneously open from the same client. By default, when `HTTP.request` opens a remote connection, it remembers the exact host:port combination and will keep the connection open to be reused by subsequent requests to the same host:port. The `pool` keyword argument specifies a specific `HTTP.Pool` object to be used for controlling the maximum number of concurrent connections allowed to be happening across the pool. It's constructed via `HTTP.Pool(; max::Int)`. Requests attempted when the maximum is already hit will block until previous requests finish. The `idle_timeout` keyword argument can be passed to `HTTP.request` to control how long it's been since a connection was lasted used in order to be considered 'valid'; otherwise, "stale" connections will be discarded. #### `readtimeout` -After a connection is established and a request is sent, a response is expected. If a non-zero value is passed to the `readtimeout` keyword argument, `HTTP.request` will wait to receive a response that many seconds before throwing an error. Note that for chunked or streaming responses, each chunk/packet of bytes received causes the timeout to reset. Passing `readtimeout = 0` disables any timeout checking and is the default. +After a connection is established and a request is sent, a response is expected. If a non-zero value is passed to the `readtimeout` keyword argument, `HTTP.request` will wait to receive a response that many seconds before throwing an error. Passing `readtimeout = 0` disables any timeout checking and is the default. ### `status_exception` When a non-2XX HTTP status code is received in a response, this is meant to convey some error condition. 3XX responses typically deal with "redirects" where the request should actually try a different url (these are followed automatically by default in `HTTP.request`, though up to a limit; see [`redirect`](@ref)). 4XX status codes typically mean the remote server thinks something is wrong in how the request is made. 5XX typically mean something went wrong on the server-side when responding. By default, as mentioned previously, `HTTP.request` will attempt to follow redirect responses, and retry "retryable" requests (where the status code and original request method allow). If, after redirects/retries, a response still has a non-2XX response code, the default behavior is to throw an `HTTP.StatusError` exception to signal that the request didn't succeed. This behavior can be disabled by passing `status_exception=false`, where the `HTTP.Response` object will be returned with the non-2XX status code intact. +### `logerrors` + +If `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.ConnectError` will be logged via `@error` as they happen, regardless of whether the request is then retried or not. Useful for debugging or monitoring requests where there's worry of certain errors happening but ignored because of retries. + +### `observelayers` + +If `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored in the `Request.context` dictionary, and can be accessed like `HTTP.get(...).request.context`. + ### `basicauth` By default, if "user info" is detected in the request url, like `http://user:password@host`, the `Authorization: Basic` header will be added to the request headers before the request is sent. While not very common, some APIs use this form of authentication to verify requests. This automatic adding of the header can be disabled by passing `basicauth=false`. @@ -150,6 +158,16 @@ Controls the total number of retries that will be attempted. Can also disable al By default, this keyword argument is `false`, which controls whether non-idempotent requests will be retried (POST or PATCH requests). +#### `retry_delays` + +Allows providing a custom `ExponentialBackOff` object to control the delay between retries. +Default is `ExponentialBackOff(n = retries)`. + +#### `retry_check` + +Allows providing a custom function to control whether a retry should be attempted. +The function should accept 5 arguments: the delay state, exception, request, response (an `HTTP.Response` object *if* a request was successfully made, otherwise `nothing`), and `resp_body` response body (which may be `nothing` if there is no response yet, otherwise a `Vector{UInt8}`), and return `true` if a retry should be attempted. So in traditional nomenclature, the function would have the form `f(s, ex, req, resp, resp_body) -> Bool`. + ### Redirect Arguments #### `redirect` diff --git a/docs/src/reference.md b/docs/src/reference.md index 921d878b1..9a9630bc4 100644 --- a/docs/src/reference.md +++ b/docs/src/reference.md @@ -138,7 +138,7 @@ HTTP.Messages.isidempotent HTTP.Messages.retryable HTTP.Messages.defaultheader! HTTP.Messages.readheaders -HTTP.DefaultHeadersRequest.setuseragent! +HTTP.HeadersRequest.setuseragent! HTTP.Messages.readchunksize HTTP.Messages.headerscomplete(::HTTP.Messages.Response) HTTP.Messages.writestartline @@ -167,17 +167,13 @@ HTTP.poplayer! HTTP.popfirstlayer! HTTP.MessageRequest.messagelayer HTTP.RedirectRequest.redirectlayer -HTTP.DefaultHeadersRequest.defaultheaderslayer -HTTP.BasicAuthRequest.basicauthlayer +HTTP.HeadersRequest.headerslayer HTTP.CookieRequest.cookielayer -HTTP.CanonicalizeRequest.canonicalizelayer HTTP.TimeoutRequest.timeoutlayer HTTP.ExceptionRequest.exceptionlayer HTTP.RetryRequest.retrylayer HTTP.ConnectionRequest.connectionlayer -HTTP.DebugRequest.debuglayer HTTP.StreamRequest.streamlayer -HTTP.ContentTypeDetection.contenttypedetectionlayer ``` ### Raw Request Connection diff --git a/src/ConnectionPool.jl b/src/Connections.jl similarity index 72% rename from src/ConnectionPool.jl rename to src/Connections.jl index 1b3940cc8..e8f91b2c8 100644 --- a/src/ConnectionPool.jl +++ b/src/Connections.jl @@ -17,24 +17,35 @@ If a subsequent request matches these properties of a previous connection and limits are respected (reuse limit, idle timeout), and it wasn't otherwise remotely closed, a connection will be reused. """ -module ConnectionPool +module Connections -export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, set_default_connection_limit! +export Connection, newconnection, releaseconnection, getrawstream, inactiveseconds, shouldtimeout, default_connection_limit, set_default_connection_limit!, Pool using Sockets, LoggingExtras, NetworkOptions using MbedTLS: SSLConfig, SSLContext, setup!, associate!, hostname!, handshake! -using MbedTLS, OpenSSL +using MbedTLS, OpenSSL, ConcurrentUtilities using ..IOExtras, ..Conditions, ..Exceptions -const default_connection_limit = Ref(8) const nolimit = typemax(Int) -set_default_connection_limit!(n) = default_connection_limit[] = n - taskid(t=current_task()) = string(hash(t) & 0xffff, base=16, pad=4) -include("connectionpools.jl") -using .ConnectionPools +const default_connection_limit = Ref{Int}() + +function __init__() + # default connection limit is 4x the number of threads + # this was chosen after some empircal benchmarking on aws/azure machines + # where, for non-data-intensive workloads, having at least 4x ensured + # there was no artificial restriction on overall throughput + default_connection_limit[] = max(16, Threads.nthreads() * 4) + nosslcontext[] = OpenSSL.SSLContext(OpenSSL.TLSClientMethod()) + TCP_POOL[] = CPool{Sockets.TCPSocket}(default_connection_limit[]) + MBEDTLS_POOL[] = CPool{MbedTLS.SSLContext}(default_connection_limit[]) + OPENSSL_POOL[] = CPool{OpenSSL.SSLStream}(default_connection_limit[]) + return +end + +set_default_connection_limit!(n) = default_connection_limit[] = n """ Connection @@ -46,6 +57,7 @@ Fields: - `port::String`, exactly as specified in the URI (i.e. may be empty). - `idle_timeout`, No. of seconds to maintain connection after last request/response. - `require_ssl_verification`, whether ssl verification is required for an ssl connection +- `keepalive`, whether the tcp socket should have keepalive enabled - `peerip`, remote IP adress (used for debug/log messages). - `peerport`, remote TCP port number (used for debug/log messages). - `localport`, local TCP port number (used for debug messages). @@ -63,6 +75,7 @@ mutable struct Connection{IO_t <: IO} <: IO port::String idle_timeout::Int require_ssl_verification::Bool + keepalive::Bool peerip::IPAddr # for debugging/logging peerport::UInt16 # for debugging/logging localport::UInt16 # debug only @@ -81,22 +94,24 @@ end Used for "hashing" a Connection object on just the key properties necessary for determining connection re-useability. That is, when a new request calls `newconnection`, we take the -request parameters of what socket type, the host and port, and if ssl -verification is required, and if an existing Connection was already created with the exact +request parameters of host and port, and if ssl verification is required, if keepalive is enabled, +and if an existing Connection was already created with the exact same parameters, we can re-use it (as long as it's not already being used, obviously). """ -connectionkey(x::Connection) = (typeof(x.io), x.host, x.port, x.require_ssl_verification, x.clientconnection) +connectionkey(x::Connection) = (x.host, x.port, x.require_ssl_verification, x.keepalive, x.clientconnection) + +const ConnectionKeyType = Tuple{AbstractString, AbstractString, Bool, Bool, Bool} Connection(host::AbstractString, port::AbstractString, idle_timeout::Int, - require_ssl_verification::Bool, io::T, client=true) where {T}= + require_ssl_verification::Bool, keepalive::Bool, io::T, client=true) where {T}= Connection{T}(host, port, idle_timeout, - require_ssl_verification, + require_ssl_verification, keepalive, safe_getpeername(io)..., localport(io), io, client, PipeBuffer(), time(), false, false, IOBuffer(), nothing) -Connection(io; require_ssl_verification::Bool=true) = - Connection("", "", 0, require_ssl_verification, io, false) +Connection(io; require_ssl_verification::Bool=true, keepalive::Bool=true) = + Connection("", "", 0, require_ssl_verification, keepalive, io, false) getrawstream(c::Connection) = c.io @@ -325,35 +340,91 @@ function purge(c::Connection) @ensure bytesavailable(c) == 0 end -const TCP_POOL = Pool(Connection{Sockets.TCPSocket}) -const MbedTLS_SSL_POOL = Pool(Connection{MbedTLS.SSLContext}) -const OpenSSL_SSL_POOL = Pool(Connection{OpenSSL.SSLStream}) +const CPool{T} = ConcurrentUtilities.Pool{ConnectionKeyType, Connection{T}} + """ - POOLS + HTTP.Pool(max::Int=HTTP.default_connection_limit[]) -A dict of global connection pools keeping track of active connections, split by their IO type. +Connection pool for managing the reuse of HTTP connections. +`max` controls the maximum number of concurrent connections allowed +and defaults to the `HTTP.default_connection_limit` value. + +A pool can be passed to any of the `HTTP.request` methods via the `pool` keyword argument. """ -const POOLS = Dict{DataType,Pool}( - Sockets.TCPSocket => TCP_POOL, - MbedTLS.SSLContext => MbedTLS_SSL_POOL, - OpenSSL.SSLStream => OpenSSL_SSL_POOL, -) -getpool(::Type{Sockets.TCPSocket}) = TCP_POOL -getpool(::Type{MbedTLS.SSLContext}) = MbedTLS_SSL_POOL -getpool(::Type{OpenSSL.SSLStream}) = OpenSSL_SSL_POOL -# Fallback for custom connection io types -# to opt out from locking, define your own `Pool` and add a `getpool` method for your IO type -const POOLS_LOCK = Threads.ReentrantLock() -function getpool(::Type{T}) where {T} - Base.@lock POOLS_LOCK get!(() -> Pool(Connection{T}), POOLS, T)::Pool{Connection{T}} +struct Pool + lock::ReentrantLock + tcp::CPool{Sockets.TCPSocket} + mbedtls::CPool{MbedTLS.SSLContext} + openssl::CPool{OpenSSL.SSLStream} + other::IdDict{Type, CPool} + max::Int +end + +function Pool(max::Union{Int, Nothing}=nothing) + max = something(max, default_connection_limit[]) + return Pool(ReentrantLock(), + CPool{Sockets.TCPSocket}(max), + CPool{MbedTLS.SSLContext}(max), + CPool{OpenSSL.SSLStream}(max), + IdDict{Type, CPool}(), + max, + ) +end + +# Default HTTP global connection pools +const TCP_POOL = Ref{CPool{Sockets.TCPSocket}}() +const MBEDTLS_POOL = Ref{CPool{MbedTLS.SSLContext}}() +const OPENSSL_POOL = Ref{CPool{OpenSSL.SSLStream}}() +const OTHER_POOL = Lockable(IdDict{Type, CPool}()) + +getpool(::Nothing, ::Type{Sockets.TCPSocket}) = TCP_POOL[] +getpool(::Nothing, ::Type{MbedTLS.SSLContext}) = MBEDTLS_POOL[] +getpool(::Nothing, ::Type{OpenSSL.SSLStream}) = OPENSSL_POOL[] +getpool(::Nothing, ::Type{T}) where {T} = Base.@lock OTHER_POOL get!(OTHER_POOL[], T) do + CPool{T}(default_connection_limit[]) +end + +function getpool(pool::Pool, ::Type{T})::CPool{T} where {T} + if T === Sockets.TCPSocket + return pool.tcp + elseif T === MbedTLS.SSLContext + return pool.mbedtls + elseif T === OpenSSL.SSLStream + return pool.openssl + else + return Base.@lock pool.lock get!(() -> CPool{T}(pool.max), pool.other, T) + end end """ - closeall() + closeall(pool::HTTP.Pool=nothing) -Close all connections in `POOLS`. +Remove and close all connections in the `pool` to avoid any connection reuse. +If `pool` is not specified, the default global pools are closed. """ -closeall() = foreach(ConnectionPools.reset!, values(POOLS)) +function closeall(pool::Union{Nothing, Pool}=nothing) + if pool === nothing + drain!(TCP_POOL[]) + drain!(MBEDTLS_POOL[]) + drain!(OPENSSL_POOL[]) + Base.@lock OTHER_POOL foreach(drain!, values(OTHER_POOL[])) + else + drain!(pool.tcp) + drain!(pool.mbedtls) + drain!(pool.openssl) + Base.@lock pool.lock foreach(drain!, values(pool.other)) + end + return +end + +function connection_isvalid(c, idle_timeout) + check = isopen(c) && inactiveseconds(c) <= idle_timeout + check || close(c) + return check +end + +@noinline connection_limit_warning(cl) = cl === nothing || + @warn "connection_limit no longer supported as a keyword argument; use `HTTP.set_default_connection_limit!($cl)` or pass a connection pool like `pool=HTTP.Pool($cl)` instead." """ newconnection(type, host, port) -> Connection @@ -364,35 +435,42 @@ or create a new `Connection` if required. function newconnection(::Type{T}, host::AbstractString, port::AbstractString; - connection_limit=default_connection_limit[], + pool::Union{Nothing, Pool}=nothing, + connection_limit=nothing, forcenew::Bool=false, idle_timeout=typemax(Int), require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), + keepalive::Bool=true, kw...) where {T <: IO} + connection_limit_warning(connection_limit) return acquire( - getpool(T), - (T, host, port, require_ssl_verification, true); - max_concurrent_connections=Int(connection_limit), + getpool(pool, T), + (host, port, require_ssl_verification, keepalive, true); forcenew=forcenew, - idle_timeout=Int(idle_timeout)) do + isvalid=c->connection_isvalid(c, Int(idle_timeout))) do Connection(host, port, - idle_timeout, require_ssl_verification, + idle_timeout, require_ssl_verification, keepalive, getconnection(T, host, port; - require_ssl_verification=require_ssl_verification, kw...) + require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...) ) end end -releaseconnection(c::Connection{T}, reuse) where {T} = - release(getpool(T), connectionkey(c), c; return_for_reuse=reuse) +function releaseconnection(c::Connection{T}, reuse; pool::Union{Nothing, Pool}=nothing, kw...) where {T} + c.timestamp = time() + release(getpool(pool, T), connectionkey(c), reuse ? c : nothing) +end function keepalive!(tcp) Base.iolock_begin() - Base.check_open(tcp) - err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), - tcp.handle, 1, 1) - Base.uv_error("failed to set keepalive on tcp socket", err) - Base.iolock_end() + try + Base.check_open(tcp) + err = ccall(:uv_tcp_keepalive, Cint, (Ptr{Nothing}, Cint, Cuint), + tcp.handle, 1, 1) + Base.uv_error("failed to set keepalive on tcp socket", err) + finally + Base.iolock_end() + end return end @@ -425,25 +503,27 @@ function getconnection(::Type{TCPSocket}, addrs = Sockets.getalladdrinfo(host) connect_timeout = connect_timeout == 0 && readtimeout > 0 ? readtimeout : connect_timeout lasterr = ErrorException("unknown connection error") - for addr in addrs try - return if connect_timeout > 0 + if connect_timeout > 0 tcp = Sockets.TCPSocket() Sockets.connect!(tcp, addr, p) - try_with_timeout(() -> checkconnected(tcp), connect_timeout, () -> close(tcp)) do - Sockets.wait_connected(tcp) - keepalive && keepalive!(tcp) + try + try_with_timeout(connect_timeout) do + Sockets.wait_connected(tcp) + keepalive && keepalive!(tcp) + end + catch + close(tcp) + rethrow() end - return tcp else tcp = Sockets.connect(addr, p) keepalive && keepalive!(tcp) - tcp end + return tcp catch e lasterr = e isa TimeoutError ? ConnectTimeout(host, port) : e - continue end end # If no connetion could be set up, to any address, throw last error @@ -498,7 +578,6 @@ function getconnection(::Type{SSLStream}, host::AbstractString, port::AbstractString; kw...)::SSLStream - port = isempty(port) ? "443" : port @debugv 2 "SSL connect: $host:$port..." tcp = getconnection(TCPSocket, host, port; kw...) @@ -523,11 +602,9 @@ function sslconnection(::Type{SSLContext}, tcp::TCPSocket, host::AbstractString; require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), sslconfig::SSLConfig=nosslconfig, kw...)::SSLContext - if sslconfig === nosslconfig sslconfig = global_sslconfig(require_ssl_verification) end - io = SSLContext() setup!(io, sslconfig) associate!(io, tcp) @@ -538,25 +615,27 @@ end function sslupgrade(::Type{IOType}, c::Connection{T}, host::AbstractString; + pool::Union{Nothing, Pool}=nothing, require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), + keepalive::Bool=true, readtimeout::Int=0, kw...)::Connection{IOType} where {T, IOType} # initiate the upgrade to SSL # if the upgrade fails, an error will be thrown and the original c will be closed # in ConnectionRequest tls = if readtimeout > 0 - try_with_timeout(() -> shouldtimeout(c, readtimeout), readtimeout, () -> close(c)) do - sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...) + try_with_timeout(readtimeout) do + sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...) end else - sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, kw...) + sslconnection(IOType, c.io, host; require_ssl_verification=require_ssl_verification, keepalive=keepalive, kw...) end # success, now we turn it into a new Connection - conn = Connection(host, "", 0, require_ssl_verification, tls) - # release the "old" one, but don't allow reuse since we're hijacking the socket - release(getpool(T), connectionkey(c), c; return_for_reuse=false) + conn = Connection(host, "", 0, require_ssl_verification, keepalive, tls) + # release the "old" one, but don't return the connection since we're hijacking the socket + release(getpool(pool, T), connectionkey(c)) # and return the new one - return acquire(getpool(IOType), connectionkey(conn), conn) + return acquire(() -> conn, getpool(pool, IOType), connectionkey(conn); forcenew=true) end function Base.show(io::IO, c::Connection) @@ -589,9 +668,4 @@ function tcpstatus(c::Connection) end end -function __init__() - nosslcontext[] = OpenSSL.SSLContext(OpenSSL.TLSClientMethod()) - return -end - -end # module ConnectionPool +end # module Connections diff --git a/src/Exceptions.jl b/src/Exceptions.jl index b11e68e77..5918e684a 100644 --- a/src/Exceptions.jl +++ b/src/Exceptions.jl @@ -25,41 +25,22 @@ macro $(:try)(exes...) end end # @eval -function try_with_timeout(f, shouldtimeout, delay, iftimeout=() -> nothing) - @assert delay > 0 - cond = Condition() - # execute f async - t = @async try - notify(cond, f()) - catch e - @debugv 1 "error executing f in try_with_timeout" - isopen(timer) && notify(cond, e, error = true) - end - # start a timer - timer = Timer(delay; interval=delay / 10) do tm +function try_with_timeout(f, timeout) + ch = Channel(0) + timer = Timer(tm -> close(ch, TimeoutError(timeout)), timeout) + @async begin try - if shouldtimeout() - @debugv 1 "❗️ Timeout: $delay" - close(tm) - iftimeout() - notify(cond, TimeoutError(delay), error = true) - end + put!(ch, $f()) catch e - @debugv 1 "callback error in try_with_timeout" - close(tm) - notify(cond, e, error = true) + if !(e isa HTTPError) + e = CapturedException(e, catch_backtrace()) + end + close(ch, e) + finally + close(timer) end end - try - res = wait(cond) - @debugv 1 "try_with_timeout finished with: $res" - res - catch e - @debugv 1 "try_with_timeout failed with: $e" - rethrow() - finally - close(timer) - end + return take!(ch) end abstract type HTTPError <: Exception end diff --git a/src/HTTP.jl b/src/HTTP.jl index d98b0b7ec..68b9fbbdb 100644 --- a/src/HTTP.jl +++ b/src/HTTP.jl @@ -37,24 +37,44 @@ include("sniff.jl") ;using .Sniff include("multipart.jl") ;using .Forms include("Parsers.jl") ;import .Parsers: Headers, Header, ParseError -include("ConnectionPool.jl") ;using .ConnectionPool +include("Connections.jl") ;using .Connections +# backwards compat +const ConnectionPool = Connections include("StatusCodes.jl") ;using .StatusCodes include("Messages.jl") ;using .Messages include("cookies.jl") ;using .Cookies include("Streams.jl") ;using .Streams + +getrequest(r::Request) = r +getrequest(s::Stream) = s.message.request + +# Wraps client-side "layer" to track the amount of time spent in it as a request is processed. +function observelayer(f) + function observation(req_or_stream; kw...) + req = getrequest(req_or_stream) + nm = nameof(f) + cntnm = Symbol(nm, "_count") + durnm = Symbol(nm, "_duration_ms") + start_time = time() + req.context[cntnm] = Base.get(req.context, cntnm, 0) + 1 + try + return f(req_or_stream; kw...) + finally + req.context[durnm] = Base.get(req.context, durnm, 0) + (time() - start_time) * 1000 + # @info "observed layer = $f, count = $(req.context[cntnm]), duration = $(req.context[durnm])" + end + end +end + include("clientlayers/MessageRequest.jl"); using .MessageRequest include("clientlayers/RedirectRequest.jl"); using .RedirectRequest -include("clientlayers/DefaultHeadersRequest.jl"); using .DefaultHeadersRequest -include("clientlayers/BasicAuthRequest.jl"); using .BasicAuthRequest +include("clientlayers/HeadersRequest.jl"); using .HeadersRequest include("clientlayers/CookieRequest.jl"); using .CookieRequest -include("clientlayers/CanonicalizeRequest.jl"); using .CanonicalizeRequest include("clientlayers/TimeoutRequest.jl"); using .TimeoutRequest include("clientlayers/ExceptionRequest.jl"); using .ExceptionRequest include("clientlayers/RetryRequest.jl"); using .RetryRequest include("clientlayers/ConnectionRequest.jl"); using .ConnectionRequest -include("clientlayers/DebugRequest.jl"); using .DebugRequest include("clientlayers/StreamRequest.jl"); using .StreamRequest -include("clientlayers/ContentTypeRequest.jl"); using .ContentTypeDetection include("download.jl") include("Servers.jl") ;using .Servers; using .Servers: listen @@ -119,9 +139,10 @@ Supported optional keyword arguments: request and response process - `connect_timeout = 10`, close the connection after this many seconds if it is still attempting to connect. Use `connect_timeout = 0` to disable. - - `connection_limit = 8`, number of concurrent connections allowed to each host:port. - - `readtimeout = 0`, close the connection if no data is received for this many - seconds. Use `readtimeout = 0` to disable. + - `pool = nothing`, an `HTTP.Pool` object to use for managing the reuse of connections between requests. + By default, a global pool is used, which is shared across all requests. To create a pool for a specific set of requests, + use `pool = HTTP.Pool(max::Int)`, where `max` controls the maximum number of concurrent connections allowed to be used for requests at a given time. + - `readtimeout = 0`, abort a request after this many seconds. Will trigger retries if applicable. Use `readtimeout = 0` to disable. - `status_exception = true`, throw `HTTP.StatusError` for response status >= 300. - Basic authentication is detected automatically from the provided url's `userinfo` (in the form `scheme://user:password@host`) and adds the `Authorization: Basic` header; this can be disabled by passing `basicauth=false` @@ -134,12 +155,19 @@ Supported optional keyword arguments: - `decompress = nothing`, by default, decompress the response body if the response has a "Content-Encoding" header set to "gzip". If `decompress=true`, decompress the response body regardless of `Content-Encoding` header. If `decompress=false`, do not decompress the response body. + - `logerrors = false`, if `true`, `HTTP.StatusError`, `HTTP.TimeoutError`, `HTTP.IOError`, and `HTTP.ConnectError` will be + logged via `@error` as they happen, regardless of whether the request is then retried or not. Useful for debugging or + monitoring requests where there's worry of certain errors happening but ignored because of retries. + - `observelayers = false`, if `true`, enables the `HTTP.observelayer` to wrap each client-side "layer" to track the amount of + time spent in each layer as a request is processed. This can be useful for debugging performance issues. Note that when retries + or redirects happen, the time spent in each layer is cumulative, as noted by the `[layer]_count`. The metrics are stored + in the `Request.context` dictionary, and can be accessed like `HTTP.get(...).request.context` Retry arguments: - `retry = true`, retry idempotent requests in case of error. - `retries = 4`, number of times to retry. - `retry_non_idempotent = false`, retry non-idempotent requests too. e.g. POST. - - `retry_delay = ExponentialBackOff(n = retries)`, provide a custom `ExponentialBackOff` object to control the delay between retries. + - `retry_delays = ExponentialBackOff(n = retries)`, provide a custom `ExponentialBackOff` object to control the delay between retries. - `retry_check = (s, ex, req, resp, resp_body) -> Bool`, provide a custom function to control whether a retry should be attempted. The function should accept 5 arguments: the delay state, exception, request, response (an `HTTP.Response` object *if* a request was successfully made, otherwise `nothing`), and `resp_body` response body (which may be `nothing` if there is no response yet, otherwise @@ -282,12 +310,13 @@ end ``` """ function request(method, url, h=nothing, b=nobody; - headers=h, body=b, query=nothing, kw...)::Response - return request(HTTP.stack(), method, url, headers, body, query; kw...) + headers=h, body=b, query=nothing, observelayers::Bool=false, kw...)::Response + return request(HTTP.stack(observelayers), method, url, headers, body, query; kw...) end +# layers are applied from left to right, i.e. the first layer is the outermost that is called first, which then calls into the second layer, etc. const STREAM_LAYERS = [timeoutlayer, exceptionlayer] -const REQUEST_LAYERS = [redirectlayer, defaultheaderslayer, basicauthlayer, contenttypedetectionlayer, cookielayer, retrylayer, canonicalizelayer] +const REQUEST_LAYERS = [redirectlayer, headerslayer, cookielayer, retrylayer] """ Layer @@ -386,10 +415,12 @@ If `request=false`, will remove the bottom "stream" layer as opposed to bottom " popfirstlayer!(; request::Bool=true) = popfirst!(request ? REQUEST_LAYERS : STREAM_LAYERS) function stack( + observelayers::Bool=false, # custom layers requestlayers=(), streamlayers=()) + obs = observelayers ? observelayer : identity # stream layers if streamlayers isa NamedTuple inner_stream_layers = haskey(streamlayers, :last) ? streamlayers.last : () @@ -398,14 +429,13 @@ function stack( inner_stream_layers = streamlayers outer_stream_layers = () end - layers = foldr((x, y) -> x(y), inner_stream_layers, init=streamlayer) - layers2 = foldr((x, y) -> x(y), STREAM_LAYERS, init=layers) + layers = foldr((x, y) -> obs(x(y)), inner_stream_layers, init=obs(streamlayer)) + layers2 = foldr((x, y) -> obs(x(y)), STREAM_LAYERS, init=layers) if !isempty(outer_stream_layers) - layers2 = foldr((x, y) -> x(y), outer_stream_layers, init=layers2) + layers2 = foldr((x, y) -> obs(x(y)), outer_stream_layers, init=layers2) end # request layers # messagelayer must be the 1st/outermost layer to convert initial args to Request - # we also want debuglayer to be early to ensure any debug logging is handled correctly in other layers if requestlayers isa NamedTuple inner_request_layers = haskey(requestlayers, :last) ? requestlayers.last : () outer_request_layers = haskey(requestlayers, :first) ? requestlayers.first : () @@ -413,12 +443,12 @@ function stack( inner_request_layers = requestlayers outer_request_layers = () end - layers3 = foldr((x, y) -> x(y), inner_request_layers; init=connectionlayer(layers2)) - layers4 = foldr((x, y) -> x(y), REQUEST_LAYERS; init=layers3) + layers3 = foldr((x, y) -> obs(x(y)), inner_request_layers; init=obs(connectionlayer(layers2))) + layers4 = foldr((x, y) -> obs(x(y)), REQUEST_LAYERS; init=layers3) if !isempty(outer_request_layers) - layers4 = foldr((x, y) -> x(y), outer_request_layers, init=layers4) + layers4 = foldr((x, y) -> obs(x(y)), outer_request_layers, init=layers4) end - return messagelayer(debuglayer(layers4)) + return messagelayer(layers4) end function request(stack::Base.Callable, method, url, h=nothing, b=nobody, q=nothing; @@ -472,9 +502,9 @@ macro client(requestlayers, streamlayers=[]) head(a...; kw...) = ($__source__; request("HEAD", a...; kw...)) delete(a...; kw...) = ($__source__; request("DELETE", a...; kw...)) open(f, a...; kw...) = ($__source__; request(a...; iofunction=f, kw...)) - function request(method, url, h=HTTP.Header[], b=HTTP.nobody; headers=h, body=b, query=nothing, kw...)::HTTP.Response + function request(method, url, h=HTTP.Header[], b=HTTP.nobody; headers=h, body=b, query=nothing, observelayers::Bool=false, kw...)::HTTP.Response $__source__ - HTTP.request(HTTP.stack($requestlayers, $streamlayers), method, url, headers, body, query; kw...) + HTTP.request(HTTP.stack(observelayers, $requestlayers, $streamlayers), method, url, headers, body, query; kw...) end end) end @@ -572,7 +602,7 @@ write(socket, frame) """ function openraw(method::Union{String,Symbol}, url, headers=Header[]; kw...)::Tuple{IO, Response} socketready = Channel{Tuple{IO, Response}}(0) - @async HTTP.open(method, url, headers; kw...) do http + Threads.@spawn HTTP.open(method, url, headers; kw...) do http HTTP.startread(http) socket = http.stream put!(socketready, (socket, http.message)) diff --git a/src/Messages.jl b/src/Messages.jl index afafcd5ca..384b0d36a 100644 --- a/src/Messages.jl +++ b/src/Messages.jl @@ -61,7 +61,7 @@ export Message, Request, Response, using URIs, CodecZlib using ..Pairs, ..IOExtras, ..Parsers, ..Strings, ..Forms, ..Conditions -using ..ConnectionPool, ..StatusCodes +using ..Connections, ..StatusCodes const nobody = UInt8[] const unknown_length = typemax(Int) diff --git a/src/Servers.jl b/src/Servers.jl index 0d4ac6c72..6afb16590 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -13,7 +13,7 @@ export listen, listen!, Server, forceclose, port using Sockets, Logging, LoggingExtras, MbedTLS, Dates using MbedTLS: SSLContext, SSLConfig -using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..ConnectionPool, ..Exceptions +using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str TRUE(x) = true @@ -282,7 +282,7 @@ Whatever you type on the client will be displayed on the server and vis-versa. using HTTP function chat(io::HTTP.Stream) - @async while !eof(io) + Threads.@spawn while !eof(io) write(stdout, readavailable(io), "\\n") end while isopen(io) diff --git a/src/Streams.jl b/src/Streams.jl index 1c2f22117..80b92720f 100644 --- a/src/Streams.jl +++ b/src/Streams.jl @@ -3,7 +3,7 @@ module Streams export Stream, closebody, isaborted, setstatus, readall! using Sockets, LoggingExtras -using ..IOExtras, ..Messages, ..ConnectionPool, ..Conditions, ..Exceptions +using ..IOExtras, ..Messages, ..Connections, ..Conditions, ..Exceptions import ..HTTP # for doc references mutable struct Stream{M <: Message, S <: IO} <: IO @@ -31,7 +31,7 @@ Creates a `HTTP.Stream` that wraps an existing `IO` stream. - `eof(::Stream)` and `readavailable(::Stream)` parse the body from the `IO` stream. - `closeread(::Stream)` reads the trailers and calls `closeread` on the `IO` - stream. When the `IO` stream is a [`HTTP.ConnectionPool.Connection`](@ref), + stream. When the `IO` stream is a [`HTTP.Connections.Connection`](@ref), calling `closeread` releases the connection back to the connection pool for reuse. If a complete response has not been received, `closeread` throws `EOFError`. @@ -41,7 +41,7 @@ Stream(r::M, io::S) where {M, S} = Stream{M, S}(r, io, false, false, true, 0, -1 Messages.header(http::Stream, a...) = header(http.message, a...) setstatus(http::Stream, status) = (http.message.response.status = status) Messages.setheader(http::Stream, a...) = setheader(http.message.response, a...) -ConnectionPool.getrawstream(http::Stream) = getrawstream(http.stream) +Connections.getrawstream(http::Stream) = getrawstream(http.stream) Sockets.getsockname(http::Stream) = Sockets.getsockname(IOExtras.tcpsocket(getrawstream(http))) function Sockets.getpeername(http::Stream) @@ -281,7 +281,7 @@ end @noinline function bufcheck(buf::Base.GenericIOBuffer, n) requested_buffer_capacity = (buf.append ? buf.size : (buf.ptr - 1)) + n - requested_buffer_capacity > length(buf.data) && throw(ArgumentError("Unable to grow response stream IOBuffer large enough for response body size")) + requested_buffer_capacity > length(buf.data) && throw(ArgumentError("Unable to grow response stream IOBuffer $(length(buf.data)) large enough for response body size: $requested_buffer_capacity")) end function Base.readbytes!(http::Stream, buf::Base.GenericIOBuffer, n=bytesavailable(http)) @@ -299,23 +299,29 @@ function Base.readbytes!(http::Stream, buf::Base.GenericIOBuffer, n=bytesavailab return n end -Base.read(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer()) = take!(readall!(http, buf)) +function Base.read(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer()) + readall!(http, buf) + return take!(buf) +end function readall!(http::Stream, buf::Base.GenericIOBuffer=PipeBuffer()) + n = 0 if ntoread(http) == unknown_length while !eof(http) - readbytes!(http, buf) + n += readbytes!(http, buf) end else + # even if we know the length, we still need to read until eof + # because Transfer-Encoding: chunked comes in piece-by-piece while !eof(http) - readbytes!(http, buf, ntoread(http)) + n += readbytes!(http, buf, ntoread(http)) end end - return buf + return n end function Base.readuntil(http::Stream, f::Function)::ByteView - UInt(ntoread(http)) == 0 && return ConnectionPool.nobytes + UInt(ntoread(http)) == 0 && return Connections.nobytes try bytes = readuntil(http.stream, f) update_ntoread(http, length(bytes)) diff --git a/src/WebSockets.jl b/src/WebSockets.jl index 14342f49d..85fa0dc42 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -2,7 +2,7 @@ module WebSockets using Base64, LoggingExtras, UUIDs, Sockets, Random using MbedTLS: digest, MD_SHA1, SSLContext -using ..IOExtras, ..Streams, ..ConnectionPool, ..Messages, ..Conditions, ..Servers +using ..IOExtras, ..Streams, ..Connections, ..Messages, ..Conditions, ..Servers import ..open import ..HTTP # for doc references diff --git a/src/clientlayers/BasicAuthRequest.jl b/src/clientlayers/BasicAuthRequest.jl deleted file mode 100644 index fab25b280..000000000 --- a/src/clientlayers/BasicAuthRequest.jl +++ /dev/null @@ -1,26 +0,0 @@ -module BasicAuthRequest - -using Base64, URIs, LoggingExtras -import ..Messages: setheader, hasheader - -export basicauthlayer - -""" - basicauthlayer(handler) -> handler - -Add `Authorization: Basic` header using credentials from url userinfo. -""" -function basicauthlayer(handler) - return function(req; basicauth::Bool=true, kw...) - if basicauth - userinfo = unescapeuri(req.url.userinfo) - if !isempty(userinfo) && !hasheader(req.headers, "Authorization") - @debugv 1 "Adding Authorization: Basic header." - setheader(req.headers, "Authorization" => "Basic $(base64encode(userinfo))") - end - end - return handler(req; kw...) - end -end - -end # module BasicAuthRequest diff --git a/src/clientlayers/CanonicalizeRequest.jl b/src/clientlayers/CanonicalizeRequest.jl deleted file mode 100644 index 7a9f154e3..000000000 --- a/src/clientlayers/CanonicalizeRequest.jl +++ /dev/null @@ -1,27 +0,0 @@ -module CanonicalizeRequest - -using ..Messages, ..Strings - -export canonicalizelayer - -""" - canonicalizelayer(handler) -> handler - -Rewrite request and response headers in Canonical-Camel-Dash-Format. -""" -function canonicalizelayer(handler) - return function(req; canonicalize_headers::Bool=false, kw...) - if canonicalize_headers - req.headers = canonicalizeheaders(req.headers) - end - res = handler(req; kw...) - if canonicalize_headers - res.headers = canonicalizeheaders(res.headers) - end - return res - end -end - -canonicalizeheaders(h::T) where {T} = T([tocameldash(k) => v for (k,v) in h]) - -end # module CanonicalizeRequest diff --git a/src/clientlayers/ConnectionRequest.jl b/src/clientlayers/ConnectionRequest.jl index 539c72eca..19c324ed4 100644 --- a/src/clientlayers/ConnectionRequest.jl +++ b/src/clientlayers/ConnectionRequest.jl @@ -3,7 +3,7 @@ module ConnectionRequest using URIs, Sockets, Base64, LoggingExtras using MbedTLS: SSLContext, SSLConfig using OpenSSL: SSLStream -using ..Messages, ..IOExtras, ..ConnectionPool, ..Streams, ..Exceptions +using ..Messages, ..IOExtras, ..Connections, ..Streams, ..Exceptions import ..SOCKET_TYPE_TLS islocalhost(host::AbstractString) = host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "0000:0000:0000:0000:0000:0000:0000:0001" || host == "0:0:0:0:0:0:0:1" @@ -49,13 +49,13 @@ export connectionlayer """ connectionlayer(handler) -> handler -Retrieve an `IO` connection from the ConnectionPool. +Retrieve an `IO` connection from the Connections. Close the connection if the request throws an exception. Otherwise leave it open so that it can be reused. """ function connectionlayer(handler) - return function(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, kw...) + return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, kw...) local io, stream if proxy !== nothing target_url = req.url @@ -74,10 +74,17 @@ function connectionlayer(handler) end IOType = sockettype(url, socket_type, socket_type_tls) + start_time = time() try io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...) catch e + if logerrors + @error "HTTP.ConnectError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context + end + req.context[:connect_errors] = get(req.context, :connect_errors, 0) + 1 throw(ConnectError(string(url), e)) + finally + req.context[:connect_duration_ms] = get(req.context, :connect_duration_ms, 0.0) + (time() - start_time) * 1000 end shouldreuse = !(target_url.scheme in ("ws", "wss")) @@ -91,7 +98,7 @@ function connectionlayer(handler) target_url = URI(target_url, port=80) # if there is no port info, connect_tunnel will fail end r = if readtimeout > 0 - try_with_timeout(() -> shouldtimeout(io, readtimeout), readtimeout, () -> close(io)) do + try_with_timeout(readtimeout) do connect_tunnel(io, target_url, req) end else @@ -102,26 +109,37 @@ function connectionlayer(handler) return r end if target_url.scheme in ("https", "wss") - io = ConnectionPool.sslupgrade(socket_type_tls, io, target_url.host; readtimeout=readtimeout, kw...) + io = Connections.sslupgrade(socket_type_tls, io, target_url.host; readtimeout=readtimeout, kw...) end req.headers = filter(x->x.first != "Proxy-Authorization", req.headers) end stream = Stream(req.response, io) - return handler(stream; readtimeout=readtimeout, kw...) + return handler(stream; readtimeout=readtimeout, logerrors=logerrors, kw...) catch e + while true + if e isa CompositeException + e = e.exceptions[1] + elseif e isa TaskFailedException + e = e.task.result + else + break + end + end + if logerrors && !(e isa StatusError || e isa TimeoutError) + @error "HTTP.ConnectionRequest" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context + end @debugv 1 "❗️ ConnectionLayer $e. Closing: $io" shouldreuse = false - @try Base.IOError close(io) if @isdefined(stream) && stream.nwritten == -1 # we didn't write anything, so don't need to worry about # idempotency of the request req.context[:nothingwritten] = true end e isa HTTPError || throw(RequestError(req, e)) - rethrow() + rethrow(e) finally - releaseconnection(io, shouldreuse) + releaseconnection(io, shouldreuse; kw...) if !shouldreuse @try Base.IOError close(io) end diff --git a/src/clientlayers/ContentTypeRequest.jl b/src/clientlayers/ContentTypeRequest.jl deleted file mode 100644 index 9cd6dbe45..000000000 --- a/src/clientlayers/ContentTypeRequest.jl +++ /dev/null @@ -1,27 +0,0 @@ -module ContentTypeDetection - -using URIs, LoggingExtras -using ..Sniff, ..Forms, ..Messages, ..IOExtras - -export contenttypedetectionlayer - -""" - contenttypedetectionlayer(handler) -> handler - -Try and detect the content type of the request body and add the "Content-Type" header. -""" -function contenttypedetectionlayer(handler) - return function(req; detect_content_type::Bool=false, kw...) - if detect_content_type && (!hasheader(req.headers, "Content-Type") - && !isa(req.body, Form) - && isbytes(req.body)) - - sn = sniff(bytes(req.body)) - setheader(req.headers, "Content-Type" => sn) - @debugv 1 "setting Content-Type header to: $sn" - end - return handler(req; kw...) - end -end - -end # module diff --git a/src/clientlayers/CookieRequest.jl b/src/clientlayers/CookieRequest.jl index 5bbc419c2..a740db5bc 100644 --- a/src/clientlayers/CookieRequest.jl +++ b/src/clientlayers/CookieRequest.jl @@ -16,7 +16,7 @@ from the `cookiejar` keyword argument (by default, a global cookiejar is used). Store "Set-Cookie" cookies from the response headers. """ function cookielayer(handler) - return function(req::Request; cookies=true, cookiejar::CookieJar=COOKIEJAR, kw...) + return function managecookies(req::Request; cookies=true, cookiejar::CookieJar=COOKIEJAR, kw...) if cookies === true || (cookies isa AbstractDict && !isempty(cookies)) url = req.url cookiestosend = Cookies.getcookies!(cookiejar, url) diff --git a/src/clientlayers/DebugRequest.jl b/src/clientlayers/DebugRequest.jl deleted file mode 100644 index 217b66e06..000000000 --- a/src/clientlayers/DebugRequest.jl +++ /dev/null @@ -1,28 +0,0 @@ -module DebugRequest - -using Logging, LoggingExtras -import ..DEBUG_LEVEL - -export debuglayer - -""" - debuglayer(handler) -> handler - -If the positive `verbose` keyword arg passed to the `handler`, then enable -debug logging with verbosity `verbose` for the lifetime of the request. -If no `verbose` is specified, it defaults to the *HTTP.jl* global `DEBUG_LEVEL[]`. -""" -function debuglayer(handler) - return function(request; verbose=DEBUG_LEVEL[], kw...) - # if debugging, enable by wrapping request in custom logger logic - if verbose > 0 - LoggingExtras.withlevel(Logging.Debug; verbosity=verbose) do - handler(request; verbose, kw...) - end - else - return handler(request; verbose, kw...) - end - end -end - -end # module DebugRequest diff --git a/src/clientlayers/ExceptionRequest.jl b/src/clientlayers/ExceptionRequest.jl index f91c84754..7c1bb3dab 100644 --- a/src/clientlayers/ExceptionRequest.jl +++ b/src/clientlayers/ExceptionRequest.jl @@ -10,10 +10,16 @@ using ..IOExtras, ..Messages, ..Exceptions Throw a `StatusError` if the request returns an error response status. """ function exceptionlayer(handler) - return function(stream; status_exception::Bool=true, kw...) - res = handler(stream; kw...) + return function exceptions(stream; status_exception::Bool=true, logerrors::Bool=false, kw...) + res = handler(stream; logerrors=logerrors, kw...) if status_exception && iserror(res) - throw(StatusError(res.status, res.request.method, res.request.target, res)) + req = res.request + req.context[:status_errors] = get(req.context, :status_errors, 0) + 1 + e = StatusError(res.status, req.method, req.target, res) + if logerrors + @error "HTTP.StatusError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context + end + throw(e) else return res end diff --git a/src/clientlayers/DefaultHeadersRequest.jl b/src/clientlayers/HeadersRequest.jl similarity index 57% rename from src/clientlayers/DefaultHeadersRequest.jl rename to src/clientlayers/HeadersRequest.jl index 82726152c..6c6f7e324 100644 --- a/src/clientlayers/DefaultHeadersRequest.jl +++ b/src/clientlayers/HeadersRequest.jl @@ -1,17 +1,37 @@ -module DefaultHeadersRequest +module HeadersRequest -export defaultheaderslayer, setuseragent! +export headerslayer, setuseragent! -using ..Messages, ..Forms, ..IOExtras +using Base64, URIs, LoggingExtras +using ..Messages, ..Forms, ..IOExtras, ..Sniff, ..Forms, ..Strings """ - defaultheaderslayer(handler) -> handler + headerslayer(handler) -> handler Sets default expected headers. """ -function defaultheaderslayer(handler) - return function(req; iofunction=nothing, decompress=nothing, kw...) +function headerslayer(handler) + return function defaultheaders(req; iofunction=nothing, decompress=nothing, + basicauth::Bool=true, detect_content_type::Bool=false, canonicalize_headers::Bool=false, kw...) headers = req.headers + ## basicauth + if basicauth + userinfo = unescapeuri(req.url.userinfo) + if !isempty(userinfo) && !hasheader(headers, "Authorization") + @debugv 1 "Adding Authorization: Basic header." + setheader(headers, "Authorization" => "Basic $(base64encode(userinfo))") + end + end + ## content type detection + if detect_content_type && (!hasheader(headers, "Content-Type") + && !isa(req.body, Form) + && isbytes(req.body)) + + sn = sniff(bytes(req.body)) + setheader(headers, "Content-Type" => sn) + @debugv 1 "setting Content-Type header to: $sn" + end + ## default headers if isempty(req.url.port) || (req.url.scheme == "http" && req.url.port == "80") || (req.url.scheme == "https" && req.url.port == "443") @@ -44,10 +64,20 @@ function defaultheaderslayer(handler) if decompress === nothing || decompress defaultheader!(headers, "Accept-Encoding" => "gzip") end - return handler(req; iofunction, decompress, kw...) + ## canonicalize headers + if canonicalize_headers + req.headers = canonicalizeheaders(headers) + end + res = handler(req; iofunction, decompress, kw...) + if canonicalize_headers + res.headers = canonicalizeheaders(res.headers) + end + return res end end +canonicalizeheaders(h::T) where {T} = T([tocameldash(k) => v for (k,v) in h]) + const USER_AGENT = Ref{Union{String, Nothing}}("HTTP.jl/$VERSION") """ diff --git a/src/clientlayers/MessageRequest.jl b/src/clientlayers/MessageRequest.jl index 8e3c05070..de467c2d9 100644 --- a/src/clientlayers/MessageRequest.jl +++ b/src/clientlayers/MessageRequest.jl @@ -1,9 +1,10 @@ module MessageRequest -using URIs +using URIs, LoggingExtras using ..IOExtras, ..Messages, ..Parsers, ..Exceptions using ..Messages, ..Parsers using ..Strings: HTTPVersion +import ..DEBUG_LEVEL export messagelayer @@ -20,17 +21,27 @@ Construct a [`Request`](@ref) object from method, url, headers, and body. Hard-coded as the first layer in the request pipeline. """ function messagelayer(handler) - return function(method::String, url::URI, headers, body; copyheaders::Bool=true, response_stream=nothing, http_version=HTTPVersion(1, 1), kw...) + return function makerequest(method::String, url::URI, headers, body; copyheaders::Bool=true, response_stream=nothing, http_version=HTTPVersion(1, 1), verbose=DEBUG_LEVEL[], kw...) req = Request(method, resource(url), mkreqheaders(headers, copyheaders), body; url=url, version=http_version, responsebody=response_stream) local resp + start_time = time() try - resp = handler(req; response_stream=response_stream, kw...) + # if debugging, enable by wrapping request in custom logger logic + resp = if verbose > 0 + LoggingExtras.withlevel(Logging.Debug; verbosity=verbose) do + handler(req; verbose, response_stream, kw...) + end + else + handler(req; verbose, response_stream, kw...) + end catch e if e isa StatusError resp = e.response end rethrow(e) finally + dur = (time() - start_time) * 1000 + req.context[:total_request_duration_ms] = dur if @isdefined(resp) && iserror(resp) && haskey(resp.request.context, :response_body) if isbytes(resp.body) resp.body = resp.request.context[:response_body] diff --git a/src/clientlayers/RedirectRequest.jl b/src/clientlayers/RedirectRequest.jl index 60303833c..53a9f0ae3 100644 --- a/src/clientlayers/RedirectRequest.jl +++ b/src/clientlayers/RedirectRequest.jl @@ -11,7 +11,7 @@ export redirectlayer, nredirects Redirects the request in the case of 3xx response status. """ function redirectlayer(handler) - return function(req; redirect::Bool=true, redirect_limit::Int=3, redirect_method=nothing, forwardheaders::Bool=true, response_stream=nothing, kw...) + return function redirects(req; redirect::Bool=true, redirect_limit::Int=3, redirect_method=nothing, forwardheaders::Bool=true, response_stream=nothing, kw...) if !redirect || redirect_limit == 0 # no redirecting return handler(req; kw...) diff --git a/src/clientlayers/RetryRequest.jl b/src/clientlayers/RetryRequest.jl index 664279399..506ccdd60 100644 --- a/src/clientlayers/RetryRequest.jl +++ b/src/clientlayers/RetryRequest.jl @@ -27,7 +27,7 @@ retry check _wouldn't_ retry, if `retry_check` returns true, then the request will be retried anyway. """ function retrylayer(handler) - return function(req::Request; retry::Bool=true, retries::Int=4, + return function manageretries(req::Request; retry::Bool=true, retries::Int=4, retry_delays=ExponentialBackOff(n = retries, factor=3.0), retry_check=FALSE, retry_non_idempotent::Bool=false, kw...) if !retry || retries == 0 @@ -76,7 +76,9 @@ function retrylayer(handler) end end +const EAI_AGAIN = 2 isrecoverable(ex) = true +isrecoverable(ex::ConnectError) = ex.error isa Sockets.DNSError && ex.error.code == EAI_AGAIN ? false : true isrecoverable(ex::StatusError) = retryable(ex.status) function _retry_check(s, ex, req, check) diff --git a/src/clientlayers/StreamRequest.jl b/src/clientlayers/StreamRequest.jl index 269b22cf8..8c5e64ec0 100644 --- a/src/clientlayers/StreamRequest.jl +++ b/src/clientlayers/StreamRequest.jl @@ -1,6 +1,6 @@ module StreamRequest -using ..IOExtras, ..Messages, ..Streams, ..ConnectionPool, ..Strings, ..RedirectRequest, ..Exceptions +using ..IOExtras, ..Messages, ..Streams, ..Connections, ..Strings, ..RedirectRequest, ..Exceptions using LoggingExtras, CodecZlib, URIs using SimpleBufferStream: BufferStream @@ -17,12 +17,12 @@ immediately so that the transmission can be aborted if the `Response` status indicates that the server does not wish to receive the message body. [RFC7230 6.5](https://tools.ietf.org/html/rfc7230#section-6.5). """ -function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, kw...)::Response +function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothing, Bool}=nothing, logerrors::Bool=false, kw...)::Response response = stream.message req = response.request - io = stream.stream @debugv 1 sprintcompact(req) @debugv 2 "client startwrite" + write_start = time() startwrite(stream) @debugv 2 sprint(show, req) @@ -30,7 +30,6 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi @debugv 2 "$(typeof(req)).body: $(sprintcompact(req.body))" end - write_error = nothing try @sync begin if iofunction === nothing @@ -38,29 +37,32 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi writebody(stream, req) @debugv 2 "client closewrite" closewrite(stream) - catch e - # @error "error" exception=(e, catch_backtrace()) - write_error = e - isopen(io) && @try Base.IOError close(io) + finally + req.context[:write_duration_ms] = get(req.context, :write_duration_ms, 0.0) + ((time() - write_start) * 1000) + end + read_start = time() + @async try + @debugv 2 "client startread" + startread(stream) + if isaborted(stream) + # The server may have closed the connection. + # Don't propagate such errors. + @try Base.IOError close(stream.stream) + end + readbody(stream, response, decompress) + finally + req.context[:read_duration_ms] = get(req.context, :read_duration_ms, 0.0) + ((time() - read_start) * 1000) end - @debugv 2 "client startread" - startread(stream) - readbody(stream, response, decompress) else iofunction(stream) end - if isaborted(stream) - # The server may have closed the connection. - # Don't propagate such errors. - @try Base.IOError close(io) - end end catch e - if write_error !== nothing - throw(write_error) - else - rethrow(e) + if logerrors + @error "HTTP.IOError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context end + req.context[:io_errors] = get(req.context, :io_errors, 0) + 1 + rethrow() end @debugv 2 "client closewrite" @@ -70,33 +72,35 @@ function streamlayer(stream::Stream; iofunction=nothing, decompress::Union{Nothi @debugv 1 sprintcompact(response) @debugv 2 sprint(show, response) - return response end function writebody(stream::Stream, req::Request) if !isbytes(req.body) - writebodystream(stream, req.body) + n = writebodystream(stream, req.body) closebody(stream) else - write(stream, req.body) + n = write(stream, req.body) end - return + stream.message.request.context[:nbytes_written] = n + return n end function writebodystream(stream, body) + n = 0 for chunk in body - writechunk(stream, chunk) + n += writechunk(stream, chunk) end + return n end function writebodystream(stream, body::IO) - write(stream, body) + return write(stream, body) end function writebodystream(stream, body::Union{AbstractDict, NamedTuple}) # application/x-www-form-urlencoded - write(stream, URIs.escapeuri(body)) + return write(stream, URIs.escapeuri(body)) end writechunk(stream, body::IO) = writebodystream(stream, body) @@ -132,6 +136,7 @@ end const IOBuffers = Union{IOBuffer, Base.GenericIOBuffer{SubArray{UInt8, 1, Vector{UInt8}, Tuple{UnitRange{Int64}}, true}}} function readbody!(stream::Stream, res::Response, buf_or_stream) + n = 0 if !iserror(res) if isbytes(res.body) if length(res.body) > 0 @@ -144,22 +149,23 @@ function readbody!(stream::Stream, res::Response, buf_or_stream) # if it's a BufferStream, the response body was gzip encoded # so using the default write is fastest because it utilizes # readavailable under the hood, for which BufferStream is optimized - write(body, buf_or_stream) + n = write(body, buf_or_stream) elseif buf_or_stream isa Stream # for HTTP.Stream, there's already an optimized read method # that just needs an IOBuffer to write into - readall!(buf_or_stream, body) + n = readall!(buf_or_stream, body) else error("unreachable") end else res.body = read(buf_or_stream) + n = length(res.body) end elseif res.body isa Base.GenericIOBuffer && buf_or_stream isa Stream # optimization for IOBuffer response_stream to avoid temporary allocations - readall!(buf_or_stream, res.body) + n = readall!(buf_or_stream, res.body) else - write(res.body, buf_or_stream) + n = write(res.body, buf_or_stream) end else # read the response body into the request context so that it can be @@ -167,6 +173,7 @@ function readbody!(stream::Stream, res::Response, buf_or_stream) # we end up not retrying/redirecting/etc. res.request.context[:response_body] = read(buf_or_stream) end + res.request.context[:nbytes] = n end end # module StreamRequest diff --git a/src/clientlayers/TimeoutRequest.jl b/src/clientlayers/TimeoutRequest.jl index 96b650007..770a6319f 100644 --- a/src/clientlayers/TimeoutRequest.jl +++ b/src/clientlayers/TimeoutRequest.jl @@ -1,6 +1,6 @@ module TimeoutRequest -using ..ConnectionPool, ..Streams, ..Exceptions +using ..Connections, ..Streams, ..Exceptions using LoggingExtras export timeoutlayer @@ -11,14 +11,24 @@ export timeoutlayer Close the `HTTP.Stream` if no data has been received for `readtimeout` seconds. """ function timeoutlayer(handler) - return function(stream::Stream; readtimeout::Int=0, kw...) + return function timeouts(stream::Stream; readtimeout::Int=0, logerrors::Bool=false, kw...) if readtimeout <= 0 # skip - return handler(stream; kw...) + return handler(stream; logerrors=logerrors, kw...) end - io = stream.stream - return try_with_timeout(() -> shouldtimeout(io, readtimeout), readtimeout, () -> close(io)) do - handler(stream; kw...) + return try + try_with_timeout(readtimeout) do + handler(stream; logerrors=logerrors, kw...) + end + catch e + if e isa TimeoutError + req = stream.message.request + if logerrors + @error "HTTP.TimeoutError" exception=(e, catch_backtrace()) method=req.method url=req.url context=req.context + end + req.context[:timeout_errors] = get(req.context, :timeout_errors, 0) + 1 + end + rethrow() end end end diff --git a/src/connectionpools.jl b/src/connectionpools.jl deleted file mode 100644 index e81cc0410..000000000 --- a/src/connectionpools.jl +++ /dev/null @@ -1,296 +0,0 @@ -module ConnectionPools - -export Pod, Pool, acquire, release - -import Base: acquire, release - -connectionid(x) = objectid(x) -_id(x) = string(connectionid(x), base=16, pad=16) - -""" - ConnectionTracker(conn::T) - -Wraps a `Connection` of type `T`. -A `Connection` object must support the following interface: - * `isopen(x)`: check if a `Connection` object is still open and can be used - * `close(x)`: close a `Connection` object; `isopen(x)` should return false after calling `close` - * `ConnectionPools.connectionid(x)`: optional method to distinguish `Connection` objects from each other; by default, calls `objectid(x)`, which is valid for `mutable struct` objects - -The `idle_timestamp` field is a timestamp to track when a `Connection` was returned to a `Pod` and became idle_timestamp. - -The `times_used` field keeps track of how many times the connection has been "used" (i.e. acquired then released). -""" -mutable struct ConnectionTracker{T} - conn::T - idle_timestamp::Float64 - times_used::Int -end - -ConnectionTracker(conn::T) where {T} = ConnectionTracker(conn, time(), 0) - -""" - Pod(T; max_concurrent_connections::Int, idle_timeout::Int) - -A threadsafe object for managing a pool of and the reuse of `Connection` objects (see [`ConnectionTracker`](@ref)). - -A Pod manages a collection of `Connection`s and the following keyword arguments allow configuring the management thereof: - - * `max_concurrent_connections::Int=typemax(Int)`: controls the max # of currently acquired `Connection`s allowed - * `idle_timeout::Int=typemax(Int)`: controls the max # of seconds a `Connection` may be idle_timeout before it should be closed and not reused - -After creating a `Pod`, `Connection`s can be acquired by calling [`acquire`](@ref) and MUST -be subsequently released by calling [`release`](@ref). -""" -struct Pod{T} - # this lock/condition protects the `conns` Vector and `active` Dict - # no changes to either field should be made without holding this lock - lock::Threads.Condition - conns::Vector{ConnectionTracker{T}} - active::Dict{Any, ConnectionTracker{T}} - max_concurrent_connections::Int - idle_timeout::Int -end - -const MAX = typemax(Int) - -function Pod(T; max_concurrent_connections::Int=MAX, idle_timeout::Int=MAX) - return Pod(Threads.Condition(), ConnectionTracker{T}[], Dict{Any, ConnectionTracker{T}}(), max_concurrent_connections, idle_timeout) -end - -# check if an idle_timeout `Connection` is still valid to be reused -function isvalid(pod::Pod{C}, conn::ConnectionTracker{C}) where {C} - if (time() - conn.idle_timestamp) > pod.idle_timeout - # println("$(taskid()): connection idle_timeout timeout") - # if the connection has been idle_timeout too long, close it - close(conn.conn) - elseif isopen(conn.conn) - # println("$(taskid()): found a valid connection to reuse") - # dump(conn.conn) - # otherwise, if the connection is open, this is a valid connection we can use! - return true - else - # println("$(taskid()): connection no longer open") - end - return false -end - -function trackconnection!(pod::Pod{C}, conn::ConnectionTracker{C}) where {C} - conn.times_used += 1 - id = connectionid(conn.conn) - if haskey(pod.active, id) - error("connection to be acquired is already an active, tracked connection from the pod according to the `connectionid(conn)`") - end - pod.active[id] = conn - return conn.conn -end - -""" - acquire(f, pod::Pod{C}) -> C - -Check first for existing `Connection`s in a `Pod` still valid to reuse, -and if so, return one. If no existing `Connection` is available for reuse, -call the provided function `f()`, which must return a new connection instance of type `C`. -This new connection instance will be tracked by the `Pod` and MUST be returned to the `Pod` -after use by calling `release(pod, conn)`. -""" -function acquire(f, pod::Pod, forcenew::Bool=false) - lock(pod.lock) - try - # if there are idle connections in the pod, - # let's check if they're still valid and can be used again - while !forcenew && !isempty(pod.conns) - # Pod connections are FIFO, so grab the earliest returned connection - # println("$(taskid()): checking idle_timeout connections for reuse") - conn = popfirst!(pod.conns) - if isvalid(pod, conn) - # println("$(taskid()): found a valid connection to reuse") - return trackconnection!(pod, conn) - else - # nothing, let the non-valid connection fall into GC oblivion - end - end - # There were no idle connections able to be reused - # If there are not too many already-active connections, create new - if length(pod.active) < pod.max_concurrent_connections - # println("$(taskid()): no idle_timeout connections to reuse; creating new") - return trackconnection!(pod, ConnectionTracker(f())) - end - # If we reach here, there were no valid idle connections and too many - # currently-active connections, so we need to wait until for a "release" - # event, which will mean a connection has been returned that can be reused, - # or a "slot" has opened up so we can create a new connection, otherwise, - # we'll just need to start the loop back over and wait again - while true - # this `wait` call will block on our Pod `lock` condition - # until a connection is `release`ed and the condition - # is notified - # println("$(taskid()): connection pool maxxed out; waiting for connection to be released to the pod") - conn = wait(pod.lock) - if !forcenew && conn !== nothing - # println("$(taskid()): checking recently released connection validity for reuse") - if isvalid(pod, conn) - return trackconnection!(pod, conn) - end - end - # if the Connection just returned to the Pod wasn't valid, the active - # count should have at least went down, so we should be able to create a new one - if length(pod.active) < pod.max_concurrent_connections - return trackconnection!(pod, ConnectionTracker(f())) - end - # If for some reason there were still too many active connections, let's - # start the loop back over waiting for connections to be returned - end - finally - unlock(pod.lock) - end -end - -taskid() = string(objectid(current_task()) % UInt16, base=16, pad=4) - -# ability to provide an already created connection object to insert into the Pod -# if Pod is already at max_concurrent_connections, acquire will wait until an -# active connection is released back to the pod -# it will be tracked among active connections and must be released -function acquire(pod::Pod{C}, c::C) where {C} - lock(pod.lock) - try - if length(pod.active) < pod.max_concurrent_connections - return trackconnection!(pod, ConnectionTracker(c)) - else - while true - # wait until pod gets a connection released - conn = wait(pod.lock) - if conn !== nothing - push!(pod.conns, conn) - else - conn = ConnectionTracker(c) - end - return trackconnection!(pod, conn) - end - end - finally - unlock(pod.lock) - end -end - -function release(pod::Pod{C}, conn::C; return_for_reuse::Bool=true) where {C} - lock(pod.lock) - try - # We first want to look up the corresponding ConnectionTracker object in our - # Pod `active` Dict that tracks active connections - id = connectionid(conn) - # if, for some reason, it's not in our `active` tracking Dict - # then something is wrong; you're trying to release a `Connection` - # that this Pod currently doesn't think is active - if !haskey(pod.active, id) - error("couldn't find connection id in pod's current list of active connections; invalid release call; each acquired connection should be `release`ed ONLY once") - end - conn_tracker = pod.active[id] - # remove the ConnectionTracker from our `active` Dict tracker - delete!(pod.active, id) - if return_for_reuse && isopen(conn) - # reset the idle_timestamp of the ConnectionTracker - conn_tracker.idle_timestamp = time() - # check if there are any tasks waiting to acquire a connection - if isempty(pod.lock) - # if not, we put the connection back in the pod idle queue - # in this case, there's no need to notify the pod lock/condition - # since there's no one waiting to be notified anyway - # println("$(taskid()): returning connection (id='$(_id(id))') to pod idle_timeout queue for reuse") - push!(pod.conns, conn_tracker) - else - # if there are waiters, we notify the pod condition and pass the - # ConnectionTracker object in the notification; we ensure to pass - # all=false, so only one waiter is woken up and receives the - # ConnectionTracker - # println("$(taskid()): returning connection (id='$(_id(id))') to a waiting task for reuse") - notify(pod.lock, conn_tracker; all=false) - end - else - # if the user has, for whatever reason, requested this connection not be reused - # anymore by passing `return_for_reuse=false`, then we've still removed it from - # the `active` tracking and want to notify the pod in case there are waiting - # acquire tasks that can now create a new connection - # println("$(taskid()): connection not reuseable; notifying pod that a connection has been released though") - notify(pod.lock, nothing; all=false) - end - finally - unlock(pod.lock) - end - return -end - -""" - Pool(T) - -A threadsafe convenience object for managing multiple [`Pod`](@ref)s of connections. -A `Pod` of reuseable connections will be looked up by the `key` when calling `acquire(f, pool, key)`. -""" -struct Pool{C} - lock::ReentrantLock - pods::Dict{Any, Pod{C}} -end - -Pool(C) = Pool(ReentrantLock(), Dict{Any, Pod{C}}()) - -""" - acquire(f, pool::Pool{C}, key; max_concurrent_connections::Int, idle_timeout::Int, reuse::Int) -> C - -Get a connection from a `pool`, looking up a `Pod` of reuseable connections -by the provided `key`. If no `Pod` exists for the given key yet, one will be -created and passed the `max`, `idle_timeout`, and `reuse` keyword arguments if provided. -The provided function `f` must create a new connection instance of type `C`. -The acquired connection MUST be returned to the pool by calling `release(pool, key, conn)` exactly once. -""" -function acquire(f, pool::Pool{C}, key; forcenew::Bool=false, kw...) where {C} - pod = lock(pool.lock) do - get!(() -> Pod(C; kw...), pool.pods, key) - end - return acquire(f, pod, forcenew) -end - -function acquire(pool::Pool{C}, key, conn::C; kw...) where {C} - pod = lock(pool.lock) do - get!(() -> Pod(C; kw...), pool.pods, key) - end - return acquire(pod, conn) -end - -""" - release(pool::Pool{C}, key, conn::C) - -Return an acquired connection to a `pool` with the same `key` provided when it was acquired. -""" -function release(pool::Pool{C}, key, conn::C; kw...) where {C} - pod = lock(pool.lock) do - pool.pods[key] - end - release(pod, conn; kw...) - return -end - -""" - reset!(pool) -> nothing - -Close all connections in a `Pool`. -""" -function reset!(pool::Pool) - lock(pool.lock) do - for pod in values(pool.pods) - lock(pod.lock) do - foreach(pod.conns) do conn - close(conn.conn) - end - empty!(pod.conns) - for conn in values(pod.active) - close(conn.conn) - end - empty!(pod.active) - end - end - empty!(pool.pods) - end - return -end - -end # module diff --git a/test/async.jl b/test/async.jl index 1162417ec..c29109eef 100644 --- a/test/async.jl +++ b/test/async.jl @@ -32,7 +32,7 @@ using Test, HTTP, JSON @test a == b end - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end @testset "HTTP.request - Body - $config - https" for config in configs @@ -56,7 +56,7 @@ using Test, HTTP, JSON @test a == b end - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end @testset "HTTP.open - $config - https" for config in configs @@ -85,7 +85,7 @@ using Test, HTTP, JSON @test a == b end - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end @testset "HTTP.request - Response Stream - $config - https" for config in configs @@ -126,7 +126,7 @@ using Test, HTTP, JSON @test a == b end - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end end diff --git a/test/benchmark.jl b/test/benchmark.jl index bd1cbd510..05978c9b1 100644 --- a/test/benchmark.jl +++ b/test/benchmark.jl @@ -4,7 +4,7 @@ include("http_parser_benchmark.jl") using HTTP using HTTP.IOExtras -using HTTP.ConnectionPool +using HTTP.Connections using HTTP.Streams using HTTP.Messages diff --git a/test/client.jl b/test/client.jl index bcf28a791..ed1bfd1a0 100644 --- a/test/client.jl +++ b/test/client.jl @@ -303,7 +303,7 @@ end finally # Shutdown @try Base.IOError close(server) - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end end @@ -338,7 +338,7 @@ end @test String(resp.body) == "hello, world" finally @try Base.IOError close(server) - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end end end @@ -376,7 +376,7 @@ end end finally @try Base.IOError close(server) - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end # Tests for Stream{SSLContext} @@ -410,7 +410,7 @@ end HTTP.startwrite(http) HTTP.write(http, sprint(JSON.print, data)) end - old_user_agent = HTTP.DefaultHeadersRequest.USER_AGENT[] + old_user_agent = HTTP.HeadersRequest.USER_AGENT[] default_user_agent = "HTTP.jl/$VERSION" # Default values HTTP.setuseragent!(default_user_agent) @@ -433,7 +433,7 @@ end HTTP.setuseragent!(old_user_agent) finally @try Base.IOError close(server) - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end end @@ -474,7 +474,7 @@ import NetworkOptions, MbedTLS end finally @try Base.IOError close(server) - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end end @@ -549,7 +549,7 @@ end @test "Host: example.com:443" in req finally close(proxy) - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end end end @@ -585,7 +585,7 @@ end seekstart(req_body) resp = HTTP.get("http://localhost:8080/retry"; body=req_body, response_stream=res_body, retry=false, status_exception=false) @test String(take!(res_body)) == "500 unexpected error" - # even if status_exception=true, we should still get the right response body + # even if StatusError, we should still get the right response body shouldfail[] = true seekstart(req_body) try @@ -617,7 +617,7 @@ end @test checked[] >= 1 finally close(server) - HTTP.ConnectionPool.closeall() + HTTP.Connections.closeall() end end diff --git a/test/loopback.jl b/test/loopback.jl index d7199aca9..df0edcbda 100644 --- a/test/loopback.jl +++ b/test/loopback.jl @@ -22,10 +22,12 @@ mutable struct Loopback <: IO end Loopback() = Loopback(false, IOBuffer(), Base.BufferStream()) +pool = HTTP.Pool(1) + config = [ :socket_type => Loopback, :retry => false, - :connection_limit => 1 + :pool => pool, ] server_events = [] @@ -158,7 +160,7 @@ function Base.unsafe_write(lb::Loopback, p::Ptr{UInt8}, n::UInt) return n end -function HTTP.ConnectionPool.getconnection(::Type{Loopback}, +function HTTP.Connections.getconnection(::Type{Loopback}, host::AbstractString, port::AbstractString; kw...)::Loopback @@ -295,9 +297,9 @@ end hello_sent = Ref(false) world_sent = Ref(false) - @test_throws HTTP.StatusError begin + @test_throws HTTP.RequestError begin r = lbreq("abort", [], [ - FunctionIO(()->(hello_sent[] = true; sleep(0.5); "Hello")), + FunctionIO(()->(hello_sent[] = true; sleep(1.0); "Hello")), FunctionIO(()->(world_sent[] = true; " World!"))]) end @test hello_sent[] diff --git a/test/runtests.jl b/test/runtests.jl index c0374330d..af84b736f 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -21,7 +21,6 @@ include(joinpath(dir, "resources/TestRequest.jl")) "parser.jl", "loopback.jl", "websockets/deno_client/server.jl", - "websockets/autobahn.jl", "messages.jl", "handlers.jl", "server.jl", @@ -29,6 +28,7 @@ include(joinpath(dir, "resources/TestRequest.jl")) "mwe.jl", "try_with_timeout.jl", "httpversion.jl", + "websockets/autobahn.jl", ] file = joinpath(dir, f) println("Running $file tests...") diff --git a/test/server.jl b/test/server.jl index 95c7254ee..7b35396f9 100644 --- a/test/server.jl +++ b/test/server.jl @@ -284,7 +284,7 @@ end # @testset logs = with_testserver(combined_logfmt) do HTTP.get("http://localhost:32612", ["Referer" => "julialang.org"]) HTTP.get("http://localhost:32612/index.html") - useragent = HTTP.DefaultHeadersRequest.USER_AGENT[] + useragent = HTTP.HeadersRequest.USER_AGENT[] HTTP.setuseragent!(nothing) HTTP.get("http://localhost:32612/index.html?a=b") HTTP.setuseragent!(useragent) diff --git a/test/try_with_timeout.jl b/test/try_with_timeout.jl index 54b52cbe3..6adfbf742 100644 --- a/test/try_with_timeout.jl +++ b/test/try_with_timeout.jl @@ -1,39 +1,48 @@ @testset "try_with_timeout $warmup" for warmup in [true, false] - nevertimeout() = false - timeoutafterfirstdelay() = true throwerrorexception() = throw(ErrorException("error as expected")) throwargumenterror() = throw(ArgumentError("unexpected error")) @testset "rethrow exceptions" begin t = @elapsed begin - @test_throws ErrorException HTTP.try_with_timeout(nevertimeout, 1) do - throwerrorexception() + err = try + HTTP.try_with_timeout(1) do + throwerrorexception() + end + catch e + e end + @test err.ex isa ErrorException end if !warmup @test t < 1 end end - @testset "rethrow exceptions from shouldtimeout callback" begin + @testset "TimeoutError is thrown" begin t = @elapsed begin - @test_throws ErrorException HTTP.try_with_timeout(throwerrorexception, 1) do - sleep(5) - throwargumenterror() + err = try + HTTP.try_with_timeout(1) do + sleep(5) + throwargumenterror() + end + catch e + e end + @test err isa HTTP.TimeoutError end if !warmup @test 1 < t < 2 end end - @testset "rethrow exceptions from iftimeout callback" begin + @testset "value is successfully returned under timeout" begin t = @elapsed begin - @test_throws ErrorException HTTP.try_with_timeout(timeoutafterfirstdelay, 1, throwerrorexception) do - sleep(5) - throwargumenterror() + ret = HTTP.try_with_timeout(5) do + sleep(1) + return 1 end end + @test ret == 1 if !warmup @test 1 < t < 2 end diff --git a/test/websockets/autobahn.jl b/test/websockets/autobahn.jl index 53609acf6..c574b9b53 100644 --- a/test/websockets/autobahn.jl +++ b/test/websockets/autobahn.jl @@ -2,7 +2,7 @@ using Test, Sockets, HTTP, HTTP.WebSockets, JSON const DIR = abspath(joinpath(dirname(pathof(HTTP)), "../test/websockets")) -havedocker = success(`which docker`) +havedocker = success(`which docker`) && success(`docker images crossbario/autobahn-testsuite`) !havedocker && @warn "Docker not found, skipping Autobahn tests" # 32-bit not supported by autobahn