From f804214f90cf132f2aeb89c55ef42f30649f3171 Mon Sep 17 00:00:00 2001 From: Panagiotis Georgakopoulos Date: Sat, 2 Sep 2023 10:07:23 +0300 Subject: [PATCH] feat(server): spawn task sooner in listenloop --- docs/examples/cors_server.jl | 6 +-- src/Servers.jl | 78 ++++++++++++++++++++++++++---------- src/WebSockets.jl | 2 +- 3 files changed, 60 insertions(+), 26 deletions(-) diff --git a/docs/examples/cors_server.jl b/docs/examples/cors_server.jl index 2275b3619..ad6765cbc 100644 --- a/docs/examples/cors_server.jl +++ b/docs/examples/cors_server.jl @@ -39,7 +39,7 @@ const CORS_RES_HEADERS = ["Access-Control-Allow-Origin" => "*"] #= JSONMiddleware minimizes code by automatically converting the request body to JSON to pass to the other service functions automatically. JSONMiddleware -recieves the body of the response from the other service funtions and sends +receives the body of the response from the other service funtions and sends back a success response code =# function JSONMiddleware(handler) @@ -65,9 +65,9 @@ function JSONMiddleware(handler) end #= CorsMiddleware: handles preflight request with the OPTIONS flag -If a request was recieved with the correct headers, then a response will be +If a request was received with the correct headers, then a response will be sent back with a 200 code, if the correct headers were not specified in the request, -then a CORS error will be recieved on the client side +then a CORS error will be received on the client side Since each request passes throught the CORS Handler, then if the request is not a preflight request, it will simply go to the JSONMiddleware to be passed to the diff --git a/src/Servers.jl b/src/Servers.jl index 6afb16590..7f5ea9eaa 100644 --- a/src/Servers.jl +++ b/src/Servers.jl @@ -13,6 +13,7 @@ export listen, listen!, Server, forceclose, port using Sockets, Logging, LoggingExtras, MbedTLS, Dates using MbedTLS: SSLContext, SSLConfig +using ConcurrentUtilities: Lockable, lock using ..IOExtras, ..Streams, ..Messages, ..Parsers, ..Connections, ..Exceptions import ..access_threaded, ..SOCKET_TYPE_TLS, ..@logfmt_str @@ -83,10 +84,19 @@ accept(s::Listener{SSLConfig}) = getsslcontext(Sockets.accept(s.server), s.ssl) function getsslcontext(tcp, sslconfig) try + handshake_done = Ref{Bool}(false) ssl = MbedTLS.SSLContext() MbedTLS.setup!(ssl, sslconfig) MbedTLS.associate!(ssl, tcp) - MbedTLS.handshake!(ssl) + handshake_task = @async begin + MbedTLS.handshake!(ssl) + handshake_done[] = true + end + timedwait(5.0) do + handshake_done[] || istaskdone(handshake_task) + end + !istaskdone(handshake_task) && wait(handshake_task) + handshake_done[] || throw(Base.IOError("SSL handshake timed out", Base.ETIMEDOUT)) return ssl catch e @try Base.IOError close(tcp) @@ -363,31 +373,55 @@ Accepts new tcp connections and spawns async tasks to handle them." function listenloop(f, listener, conns, tcpisvalid, max_connections, readtimeout, access_log, ready_to_accept, verbose) sem = Base.Semaphore(max_connections) + ssl = Lockable(listener.ssl) + connections = Lockable(conns) verbose >= 0 && @infov 1 "Listening on: $(listener.hostname):$(listener.hostport), thread id: $(Threads.threadid())" notify(ready_to_accept) while isopen(listener) try Base.acquire(sem) - io = accept(listener) - if io === nothing - @warnv 1 "unable to accept new connection" - continue - elseif !tcpisvalid(io) - @warnv 1 "!tcpisvalid: $io" - close(io) - continue - end - conn = Connection(io) - conn.state = IDLE - push!(conns, conn) - conn.host, conn.port = listener.hostname, listener.hostport - @async try - handle_connection(f, conn, listener, readtimeout, access_log) - finally - # handle_connection is in charge of closing the underlying io - delete!(conns, conn) - Base.release(sem) - end + io = Sockets.accept(listener.server) + Threads.@spawn begin + local conn = nothing + isssl = !isnothing(listener.ssl) + try + if io === nothing + @warnv 1 "unable to accept new connection" + return + end + if isssl + io = lock(ssl) do ssl + return getsslcontext(io, ssl) + end + end + if !tcpisvalid(io) + close(io) + return + end + conn = Connection(io) + conn.state = IDLE + lock(connections) do conns + push!(conns, conn) + end + conn.host, conn.port = listener.hostname, listener.hostport + handle_connection(f, conn, listener, readtimeout, access_log) + catch e + if e isa Base.IOError && e.code == Base.UV_ECONNABORTED + verbose >= 0 && @infov 1 "Server on $(listener.hostname):$(listener.hostport) closing" + else + @errorv 2 "Server on $(listener.hostname):$(listener.hostport) errored" exception=(e, catch_backtrace()) + # quick little sleep in case there's a temporary + # local error accepting and this might help avoid quickly re-erroring + sleep(0.05 + rand() * 0.05) + end + # handle_connection is in charge of closing the underlying io, but it may not get there + finally + !isnothing(conn) && lock(connections) do conns + delete!(conns, conn) + end + Base.release(sem) + end + end # Task.@spawn catch e if e isa Base.IOError && e.code == Base.UV_ECONNABORTED verbose >= 0 && @infov 1 "Server on $(listener.hostname):$(listener.hostport) closing" @@ -442,7 +476,7 @@ function handle_connection(f, c::Connection, listener, readtimeout, access_log) request.response.status = 200 try - # invokelatest becuase the perf is negligible, but this makes live-editing handlers more Revise friendly + # invokelatest because the perf is negligible, but this makes live-editing handlers more Revise friendly @debugv 1 "invoking handler" Base.invokelatest(f, http) # If `startwrite()` was never called, throw an error so we send a 500 and log this diff --git a/src/WebSockets.jl b/src/WebSockets.jl index 56b6933a9..2fbf6a07c 100644 --- a/src/WebSockets.jl +++ b/src/WebSockets.jl @@ -587,7 +587,7 @@ function Base.close(ws::WebSocket, body::CloseFrameBody=CloseFrameBody(1000, "") ws.readclosed = true end end - # we either recieved the responding CLOSE frame and readclosed was set + # we either received the responding CLOSE frame and readclosed was set # or there was an error/timeout reading it; in any case, readclosed should be closed now @assert ws.readclosed # if we're the server, it's our job to close the underlying socket