|
1 |
| -use hyper::service::{make_service_fn, service_fn}; |
2 |
| -use hyper::{Body, Method, Request, Response, Server, StatusCode}; |
| 1 | +use http_body_util::Full; |
| 2 | +use hyper::body; |
| 3 | +use hyper::body::Bytes; |
| 4 | + |
| 5 | +use hyper::server::conn::http1; |
| 6 | +use hyper::service::service_fn; |
| 7 | +use hyper::{Method, Request, Response, StatusCode}; |
| 8 | +use hyper_util::rt::TokioIo; |
3 | 9 | use log::{debug, error, info};
|
4 | 10 | use phf::phf_map;
|
5 | 11 | use std::collections::HashMap;
|
6 | 12 | use std::fmt;
|
7 | 13 | use std::net::SocketAddr;
|
8 | 14 | use std::sync::atomic::Ordering;
|
9 | 15 | use std::sync::Arc;
|
| 16 | +use tokio::net::TcpListener; |
10 | 17 |
|
11 | 18 | use crate::config::Address;
|
12 | 19 | use crate::pool::{get_all_pools, PoolIdentifier};
|
@@ -243,7 +250,9 @@ impl<Value: fmt::Display> PrometheusMetric<Value> {
|
243 | 250 | }
|
244 | 251 | }
|
245 | 252 |
|
246 |
| -async fn prometheus_stats(request: Request<Body>) -> Result<Response<Body>, hyper::http::Error> { |
| 253 | +async fn prometheus_stats( |
| 254 | + request: Request<body::Incoming>, |
| 255 | +) -> Result<Response<Full<Bytes>>, hyper::http::Error> { |
247 | 256 | match (request.method(), request.uri().path()) {
|
248 | 257 | (&Method::GET, "/metrics") => {
|
249 | 258 | let mut lines = Vec::new();
|
@@ -374,14 +383,35 @@ fn push_server_stats(lines: &mut Vec<String>) {
|
374 | 383 | }
|
375 | 384 |
|
376 | 385 | pub async fn start_metric_server(http_addr: SocketAddr) {
|
377 |
| - let http_service_factory = |
378 |
| - make_service_fn(|_conn| async { Ok::<_, hyper::Error>(service_fn(prometheus_stats)) }); |
379 |
| - let server = Server::bind(&http_addr).serve(http_service_factory); |
| 386 | + let listener = TcpListener::bind(http_addr); |
| 387 | + let listener = match listener.await { |
| 388 | + Ok(listener) => listener, |
| 389 | + Err(e) => { |
| 390 | + error!("Failed to bind prometheus server to HTTP address: {}.", e); |
| 391 | + return; |
| 392 | + } |
| 393 | + }; |
380 | 394 | info!(
|
381 | 395 | "Exposing prometheus metrics on http://{}/metrics.",
|
382 | 396 | http_addr
|
383 | 397 | );
|
384 |
| - if let Err(e) = server.await { |
385 |
| - error!("Failed to run HTTP server: {}.", e); |
| 398 | + loop { |
| 399 | + let stream = match listener.accept().await { |
| 400 | + Ok((stream, _)) => stream, |
| 401 | + Err(e) => { |
| 402 | + error!("Error accepting connection: {}", e); |
| 403 | + continue; |
| 404 | + } |
| 405 | + }; |
| 406 | + let io = TokioIo::new(stream); |
| 407 | + |
| 408 | + tokio::task::spawn(async move { |
| 409 | + if let Err(err) = http1::Builder::new() |
| 410 | + .serve_connection(io, service_fn(prometheus_stats)) |
| 411 | + .await |
| 412 | + { |
| 413 | + eprintln!("Error serving HTTP connection for metrics: {:?}", err); |
| 414 | + } |
| 415 | + }); |
386 | 416 | }
|
387 | 417 | }
|
0 commit comments