From 0a89f3082d39fcff35aaef1e56a168e1bef1567c Mon Sep 17 00:00:00 2001 From: Delyan Angelov Date: Wed, 18 Oct 2023 09:00:42 +0300 Subject: [PATCH] net.http: support passing on_running, on_stopped, on_closed callback functions to http.Server{}, as well as show_startup_message: false. (#19591) --- vlib/net/http/server.v | 67 +++++++++++++++++--- vlib/net/http/server_test.v | 120 ++++++++++++++++++++++++++++++------ vlib/net/tcp.v | 12 +++- 3 files changed, 172 insertions(+), 27 deletions(-) diff --git a/vlib/net/http/server.v b/vlib/net/http/server.v index fbb185bf3dc071..ed5aefd1bc5f2b 100644 --- a/vlib/net/http/server.v +++ b/vlib/net/http/server.v @@ -8,14 +8,14 @@ import net import time import runtime // ServerStatus is the current status of the server. -// .running means that the server is active and serving. -// .stopped means that the server is not active but still listening. -// .closed means that the server is completely inactive. +// .closed means that the server is completely inactive (the default on creation, and after calling .close()). +// .running means that the server is active and serving (after .listen_and_serve()). +// .stopped means that the server is not active but still listening (after .stop() ). pub enum ServerStatus { + closed running stopped - closed } interface Handler { @@ -36,6 +36,12 @@ pub mut: pool_channel_slots int = 1024 worker_num int = runtime.nr_jobs() listener net.TcpListener + // + on_running fn (mut s Server) = unsafe { nil } // Blocking cb. If set, ran by the web server on transitions to its .running state. + on_stopped fn (mut s Server) = unsafe { nil } // Blocking cb. If set, ran by the web server on transitions to its .stopped state. + on_closed fn (mut s Server) = unsafe { nil } // Blocking cb. If set, ran by the web server on transitions to its .closed state. + // + show_startup_message bool = true // set to false, to remove the default `Listening on ...` message. } // listen_and_serve listens on the server port `s.port` over TCP network and @@ -55,9 +61,16 @@ pub fn (mut s Server) listen_and_serve() { eprintln('Failed getting listener address, err: ${err}') return } - listening_address := s.addr.clone() + mut listening_address := s.addr.clone() if l.family() == net.AddrFamily.unspec { - s.listener = net.listen_tcp(.ip6, listening_address) or { + if listening_address == ':0' { + listening_address = 'localhost:0' + } + mut listen_family := net.AddrFamily.ip + // $if !windows { + // listen_family = net.AddrFamily.ip6 + // } + s.listener = net.listen_tcp(listen_family, listening_address) or { eprintln('Listening on ${s.addr} failed, err: ${err}') return } @@ -78,9 +91,16 @@ pub fn (mut s Server) listen_and_serve() { ws << new_handler_worker(wid, ch, s.handler) } - println('Listening on http://${listening_address}/') - flush_stdout() + if s.show_startup_message { + println('Listening on http://${s.addr}/') + flush_stdout() + } + + time.sleep(20 * time.millisecond) s.state = .running + if s.on_running != unsafe { nil } { + s.on_running(mut s) + } for { // break if we have a stop signal if s.state != .running { @@ -107,6 +127,9 @@ pub fn (mut s Server) listen_and_serve() { [inline] pub fn (mut s Server) stop() { s.state = .stopped + if s.on_stopped != unsafe { nil } { + s.on_stopped(mut s) + } } // close immediately closes the port and signals the server that it has been closed. @@ -114,6 +137,9 @@ pub fn (mut s Server) stop() { pub fn (mut s Server) close() { s.state = .closed s.listener.close() or { return } + if s.on_closed != unsafe { nil } { + s.on_closed(mut s) + } } // status indicates whether the server is running, stopped, or closed. @@ -122,6 +148,31 @@ pub fn (s &Server) status() ServerStatus { return s.state } +// WaitTillRunningParams allows for parametrising the calls to s.wait_till_running() +[params] +pub struct WaitTillRunningParams { +pub: + max_retries int = 100 // how many times to check for the status, for each single s.wait_till_running() call + retry_period_ms int = 10 // how much time to wait between each check for the status, in milliseconds +} + +// wait_till_running allows you to synchronise your calling (main) thread, with the state of the server +// (when the server is running in another thread). +// It returns an error, after params.max_retries * params.retry_period_ms +// milliseconds have passed, without that expected server transition. +pub fn (mut s Server) wait_till_running(params WaitTillRunningParams) !int { + mut i := 0 + for s.status() != .running && i < params.max_retries { + time.sleep(params.retry_period_ms * time.millisecond) + i++ + } + if i >= params.max_retries { + return error('maximum retries reached') + } + time.sleep(params.retry_period_ms) + return i +} + struct HandlerWorker { id int ch chan &net.TcpConn diff --git a/vlib/net/http/server_test.v b/vlib/net/http/server_test.v index b8e752fb3997de..098e3260513e1c 100644 --- a/vlib/net/http/server_test.v +++ b/vlib/net/http/server_test.v @@ -1,13 +1,28 @@ +import log import net import net.http import time +const atimeout = 500 * time.millisecond + +fn testsuite_begin() { + log.info(@FN) +} + +fn testsuite_end() { + log.info(@FN) +} + fn test_server_stop() { + log.warn('${@FN} started') + defer { + log.warn('${@FN} finished') + } mut server := &http.Server{ - accept_timeout: 1 * time.second + accept_timeout: atimeout } t := spawn server.listen_and_serve() - time.sleep(250 * time.millisecond) + server.wait_till_running()! mut watch := time.new_stopwatch() server.stop() assert server.status() == .stopped @@ -17,12 +32,17 @@ fn test_server_stop() { } fn test_server_close() { + log.warn('${@FN} started') + defer { + log.warn('${@FN} finished') + } mut server := &http.Server{ - accept_timeout: 1 * time.second + accept_timeout: atimeout handler: MyHttpHandler{} + show_startup_message: false } t := spawn server.listen_and_serve() - time.sleep(250 * time.millisecond) + server.wait_till_running()! mut watch := time.new_stopwatch() server.close() assert server.status() == .closed @@ -32,13 +52,18 @@ fn test_server_close() { } fn test_server_custom_listener() { + log.warn('${@FN} started') + defer { + log.warn('${@FN} finished') + } listener := net.listen_tcp(.ip6, ':8081')! mut server := &http.Server{ - accept_timeout: 1 * time.second + accept_timeout: atimeout listener: listener + show_startup_message: false } t := spawn server.listen_and_serve() - time.sleep(250 * time.millisecond) + server.wait_till_running()! mut watch := time.new_stopwatch() server.close() assert server.status() == .closed @@ -74,7 +99,7 @@ fn (mut handler MyHttpHandler) handle(req http.Request) http.Response { handler.redirects++ } '/big' { - r.body = 'xyz def '.repeat(10_000) + r.body = 'xyz def '.repeat(5_000) r.set_status(.ok) handler.oks++ } @@ -87,34 +112,36 @@ fn (mut handler MyHttpHandler) handle(req http.Request) http.Response { return r } -const cport = 8198 +const cport = 18197 fn test_server_custom_handler() { + log.warn('${@FN} started') + defer { + log.warn('${@FN} finished') + } mut handler := MyHttpHandler{} mut server := &http.Server{ - accept_timeout: 1 * time.second + accept_timeout: atimeout handler: handler port: cport } t := spawn server.listen_and_serve() - for server.status() != .running { - time.sleep(10 * time.millisecond) - } - x := http.fetch(url: 'http://localhost:${cport}/endpoint?abc=xyz', data: 'my data')! + server.wait_till_running()! + x := http.fetch(url: 'http://${server.addr}/endpoint?abc=xyz', data: 'my data')! assert x.body == 'my data, /endpoint?abc=xyz' assert x.status_code == 200 assert x.status_msg == 'OK' assert x.http_version == '1.1' - y := http.fetch(url: 'http://localhost:${cport}/another/endpoint', data: 'abcde')! + y := http.fetch(url: 'http://${server.addr}/another/endpoint', data: 'abcde')! assert y.body == 'abcde, /another/endpoint' assert y.status_code == 200 assert x.status_msg == 'OK' assert y.status() == .ok assert y.http_version == '1.1' // - http.fetch(url: 'http://localhost:${cport}/something/else')! + http.fetch(url: 'http://${server.addr}/something/else')! // - big_url := 'http://localhost:${cport}/redirect_to_big' + big_url := 'http://${server.addr}/redirect_to_big' mut progress_calls := &ProgressCalls{} z := http.fetch( url: big_url @@ -140,14 +167,14 @@ fn test_server_custom_handler() { assert z.status_code == 200 assert z.body.starts_with('xyz') assert z.body.len > 10000 - assert progress_calls.final_size > 80_000 + assert progress_calls.final_size > 40_000 assert progress_calls.finished_was_called assert progress_calls.chunks.len > 1 assert progress_calls.reads.len > 1 assert progress_calls.chunks[0].bytestr().starts_with('HTTP/1.1 301 Moved permanently') assert progress_calls.chunks[1].bytestr().starts_with('HTTP/1.1 200 OK') assert progress_calls.chunks.last().bytestr().contains('xyz def') - assert progress_calls.redirected_to == ['http://localhost:8198/big'] + assert progress_calls.redirected_to == ['http://${server.addr}/big'] // server.stop() t.wait() @@ -166,3 +193,60 @@ mut: redirected_to []string final_size u64 } + +// + +struct MyCountingHandler { +mut: + counter int +} + +fn (mut handler MyCountingHandler) handle(req http.Request) http.Response { + handler.counter++ + mut r := http.Response{ + body: req.data + ', ${req.url}, counter: ${handler.counter}' + header: req.header + } + match req.url.all_before('?') { + '/count' { + r.set_status(.ok) + } + else { + r.set_status(.not_found) + } + } + r.set_version(req.version) + return r +} + +fn test_my_counting_handler_on_random_port() { + log.warn('${@FN} started') + defer { + log.warn('${@FN} finished') + } + mut server := &http.Server{ + show_startup_message: false + port: 0 + accept_timeout: atimeout + handler: MyCountingHandler{} + on_running: fn (mut server http.Server) { + spawn fn (mut server http.Server) { + log.warn('server started') + url := 'http://${server.addr}/count' + log.info('fetching from url: ${url}') + for _ in 0 .. 5 { + x := http.fetch(url: url, data: 'my data') or { panic(err) } + log.info(x.body) + } + server.stop() + log.warn('server stopped') + }(mut server) + } + } + server.listen_and_serve() + if mut server.handler is MyCountingHandler { + dump(server.handler.counter) + assert server.handler.counter == 5 + } + assert true +} diff --git a/vlib/net/tcp.v b/vlib/net/tcp.v index 697f0dfd3ecc2f..b38c3a2f4bbbd3 100644 --- a/vlib/net/tcp.v +++ b/vlib/net/tcp.v @@ -22,7 +22,17 @@ mut: is_blocking bool } -pub fn dial_tcp(address string) !&TcpConn { +pub fn dial_tcp(oaddress string) !&TcpConn { + mut address := oaddress + $if windows { + // resolving 0.0.0.0 to localhost, works on linux and macos, but not on windows, so try to emulate it: + if address.starts_with(':::') { + address = address.replace_once(':::', 'localhost:') + } + if address.starts_with('0.0.0.0:') { + address = address.replace_once('0.0.0.0:', 'localhost:') + } + } addrs := resolve_addrs_fuzzy(address, .tcp) or { return error('${err.msg()}; could not resolve address ${address} in dial_tcp') }