Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Loki remote write #4941

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ repos:
- id: fmt
- id: clippy
args: ["--workspace", "--all-targets", "--all-features", "--", "-D", "warnings"]
stages: [push]
stages: [pre-push]
- id: cargo-check
args: ["--workspace", "--all-targets", "--all-features"]
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/cmd/src/cli/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use base64::engine::general_purpose;
use base64::Engine;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use serde_json::Value;
use servers::http::greptime_result_v1::GreptimedbV1Response;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::GreptimeQueryOutput;
use snafu::ResultExt;

Expand Down
2 changes: 2 additions & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ humantime-serde.workspace = true
hyper = { version = "0.14", features = ["full"] }
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
itertools.workspace = true
json5 = "0.4"
jsonb.workspace = true
lazy_static.workspace = true
loki-api = "0.1"
mime_guess = "2.0"
notify.workspace = true
object-pool = "0.5"
Expand Down
58 changes: 56 additions & 2 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::any::Any;
use std::net::SocketAddr;
use std::string::FromUtf8Error;

use axum::http::StatusCode as HttpStatusCode;
use axum::response::{IntoResponse, Response};
use axum::{http, Json};
use base64::DecodeError;
Expand All @@ -30,8 +31,6 @@ use query::parser::PromQuery;
use serde_json::json;
use snafu::{Location, Snafu};

use crate::http::error_result::status_code_to_http_status;

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
Expand Down Expand Up @@ -499,6 +498,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to parse payload as json5"))]
ParseJson5 {
#[snafu(source)]
error: json5::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unsupported content type: {:?}", content_type))]
UnsupportedContentType {
content_type: ContentType,
Expand Down Expand Up @@ -625,6 +632,7 @@ impl ErrorExt for Error {
| MissingQueryContext { .. }
| MysqlValueConversion { .. }
| ParseJson { .. }
| ParseJson5 { .. }
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
| OpenTelemetryLog { .. }
Expand Down Expand Up @@ -711,3 +719,49 @@ impl IntoResponse for Error {
(status, body).into_response()
}
}

pub fn status_code_to_http_status(status_code: &StatusCode) -> HttpStatusCode {
match status_code {
StatusCode::Success | StatusCode::Cancelled => HttpStatusCode::OK,

StatusCode::Unsupported
| StatusCode::InvalidArguments
| StatusCode::InvalidSyntax
| StatusCode::RequestOutdated
| StatusCode::RegionAlreadyExists
| StatusCode::TableColumnExists
| StatusCode::TableAlreadyExists
| StatusCode::RegionNotFound
| StatusCode::DatabaseNotFound
| StatusCode::TableNotFound
| StatusCode::TableColumnNotFound
| StatusCode::PlanQuery
| StatusCode::DatabaseAlreadyExists
| StatusCode::FlowNotFound
| StatusCode::FlowAlreadyExists => HttpStatusCode::BAD_REQUEST,

StatusCode::AuthHeaderNotFound
| StatusCode::InvalidAuthHeader
| StatusCode::UserNotFound
| StatusCode::UnsupportedPasswordType
| StatusCode::UserPasswordMismatch
| StatusCode::RegionReadonly => HttpStatusCode::UNAUTHORIZED,

StatusCode::PermissionDenied | StatusCode::AccessDenied => HttpStatusCode::FORBIDDEN,

StatusCode::RateLimited => HttpStatusCode::TOO_MANY_REQUESTS,

StatusCode::RegionNotReady
| StatusCode::TableUnavailable
| StatusCode::RegionBusy
| StatusCode::StorageUnavailable
| StatusCode::External => HttpStatusCode::SERVICE_UNAVAILABLE,

StatusCode::Internal
| StatusCode::Unexpected
| StatusCode::IllegalState
| StatusCode::Unknown
| StatusCode::RuntimeResourcesExhausted
| StatusCode::EngineExecuteQuery => HttpStatusCode::INTERNAL_SERVER_ERROR,
}
}
63 changes: 33 additions & 30 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,20 @@ use tower_http::decompression::RequestDecompressionLayer;
use tower_http::trace::TraceLayer;

use self::authorize::AuthState;
use self::table_result::TableResponse;
use self::result::table_result::TableResponse;
use crate::configurator::ConfiguratorRef;
use crate::error::{AddressBindSnafu, AlreadyStartedSnafu, Error, HyperSnafu, Result, ToJsonSnafu};
use crate::http::arrow_result::ArrowResponse;
use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::influxdb::{influxdb_health, influxdb_ping, influxdb_write_v1, influxdb_write_v2};
use crate::http::influxdb_result_v1::InfluxdbV1Response;
use crate::http::json_result::JsonResponse;
use crate::http::prometheus::{
build_info_query, format_query, instant_query, label_values_query, labels_query, parse_query,
range_query, series_query,
};
use crate::http::result::arrow_result::ArrowResponse;
use crate::http::result::csv_result::CsvResponse;
use crate::http::result::error_result::ErrorResponse;
use crate::http::result::greptime_result_v1::GreptimedbV1Response;
use crate::http::result::influxdb_result_v1::InfluxdbV1Response;
use crate::http::result::json_result::JsonResponse;
use crate::interceptor::LogIngestInterceptorRef;
use crate::metrics::http_metrics_layer;
use crate::metrics_handler::MetricsHandler;
Expand All @@ -77,8 +77,11 @@ use crate::query_handler::{
use crate::server::Server;

pub mod authorize;
#[cfg(feature = "dashboard")]
mod dashboard;
pub mod dyn_log;
pub mod event;
mod extractor;
pub mod handler;
pub mod header;
pub mod influxdb;
Expand All @@ -88,20 +91,9 @@ pub mod otlp;
pub mod pprof;
pub mod prom_store;
pub mod prometheus;
mod prometheus_resp;
pub mod result;
pub mod script;

pub mod arrow_result;
pub mod csv_result;
#[cfg(feature = "dashboard")]
mod dashboard;
pub mod error_result;
pub mod greptime_manage_resp;
pub mod greptime_result_v1;
pub mod influxdb_result_v1;
pub mod json_result;
pub mod table_result;

#[cfg(any(test, feature = "testing"))]
pub mod test_helpers;

Expand Down Expand Up @@ -619,17 +611,22 @@ impl HttpServerBuilder {
validator: Option<LogValidatorRef>,
ingest_interceptor: Option<LogIngestInterceptorRef<Error>>,
) -> Self {
Self {
router: self.router.nest(
&format!("/{HTTP_API_VERSION}/events"),
HttpServer::route_log(LogState {
log_handler: handler,
log_validator: validator,
ingest_interceptor,
}),
),
..self
}
let log_state = LogState {
log_handler: handler,
log_validator: validator,
ingest_interceptor,
};
let router = self.router.nest(
&format!("/{HTTP_API_VERSION}/events"),
HttpServer::route_log(log_state.clone()),
);

let router = router.nest(
&format!("/{HTTP_API_VERSION}/loki"),
HttpServer::route_loki(log_state),
);

Self { router, ..self }
}

pub fn with_plugins(self, plugins: Plugins) -> Self {
Expand Down Expand Up @@ -766,6 +763,12 @@ impl HttpServer {
.with_state(metrics_handler)
}

fn route_loki<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/api/v1/push", routing::post(event::loki_ingest))
.with_state(log_state)
}

fn route_log<S>(log_state: LogState) -> Router<S> {
Router::new()
.route("/logs", routing::post(event::log_ingester))
Expand Down
2 changes: 1 addition & 1 deletion src/servers/src/http/authorize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::error::{
self, InvalidAuthHeaderInvisibleASCIISnafu, InvalidAuthHeaderSnafu, InvalidParameterSnafu,
NotFoundInfluxAuthSnafu, Result, UnsupportedAuthSchemeSnafu, UrlDecodeSnafu,
};
use crate::http::error_result::ErrorResponse;
use crate::http::result::error_result::ErrorResponse;
use crate::http::HTTP_API_PREFIX;
use crate::influxdb::{is_influxdb_request, is_influxdb_v2_request};

Expand Down
Loading