Skip to content

Commit

Permalink
test: loki write
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Nov 8, 2024
1 parent 74b7d75 commit 9cac101
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::result::Result as StdResult;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -415,7 +415,8 @@ pub async fn loki_ingest(
// encoding: https://github.com/grafana/alloy/blob/be34410b9e841cc0c37c153f9550d9086a304bca/internal/component/common/loki/client/batch.go#L114-L145
// use very dirty hack to parse labels
let labels = stream.labels.replace("=", ":");
let labels: HashMap<String, String> = json5::from_str(&labels).context(ParseJson5Snafu)?;
// use btreemap to keep order
let labels: BTreeMap<String, String> = json5::from_str(&labels).context(ParseJson5Snafu)?;

// process entries
for entry in stream.entries {
Expand Down
1 change: 1 addition & 0 deletions tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ flow.workspace = true
frontend = { workspace = true, features = ["testing"] }
futures.workspace = true
futures-util.workspace = true
loki-api = "0.1"
meta-client.workspace = true
meta-srv = { workspace = true, features = ["mock"] }
moka.workspace = true
Expand Down
68 changes: 68 additions & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@

use std::collections::BTreeMap;
use std::io::Write;
use std::str::FromStr;

use api::prom_store::remote::WriteRequest;
use auth::user_provider_from_option;
use axum::http::{HeaderName, HeaderValue, StatusCode};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use loki_api::logproto::{EntryAdapter, PushRequest, StreamAdapter};
use loki_api::prost_types::Timestamp;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics;
use prost::Message;
use serde_json::{json, Value};
use servers::http::handler::HealthResponse;
use servers::http::header::constants::GREPTIME_LOG_TABLE_NAME_HEADER_NAME;
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::result::error_result::ErrorResponse;
Expand Down Expand Up @@ -92,6 +96,7 @@ macro_rules! http_tests {
test_otlp_metrics,
test_otlp_traces,
test_otlp_logs,
test_loki_logs,
);
)*
};
Expand Down Expand Up @@ -1690,6 +1695,69 @@ pub async fn test_otlp_logs(store_type: StorageType) {
guard.remove_all().await;
}

pub async fn test_loki_logs(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_loke_logs").await;

let client = TestClient::new(app);

// init loki request
let req: PushRequest = PushRequest {
streams: vec![StreamAdapter {
labels: "{service=\"test\",source=\"integration\"}".to_string(),
entries: vec![EntryAdapter {
timestamp: Some(Timestamp::from_str("2024-11-07T10:53:50").unwrap()),
line: "this is a log message".to_string(),
}],
hash: rand::random(),
}],
};
let encode = req.encode_to_vec();
let body = prom_store::snappy_compress(&encode).unwrap();

// write to loki
let res = send_req(
&client,
vec![
(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/x-protobuf"),
),
(
HeaderName::from_static(GREPTIME_LOG_TABLE_NAME_HEADER_NAME),
HeaderValue::from_static("loki_table_name"),
),
],
"/v1/events/loki/api/v1/push",
body,
false,
)
.await;
assert_eq!(StatusCode::OK, res.status());

// test schema
let expected = "[[\"loki_table_name\",\"CREATE TABLE IF NOT EXISTS \\\"loki_table_name\\\" (\\n \\\"timestamp\\\" TIMESTAMP(9) NOT NULL,\\n \\\"line\\\" STRING NULL,\\n \\\"service\\\" STRING NULL,\\n \\\"source\\\" STRING NULL,\\n TIME INDEX (\\\"timestamp\\\"),\\n PRIMARY KEY (\\\"service\\\", \\\"source\\\")\\n)\\n\\nENGINE=mito\\nWITH(\\n append_mode = 'true'\\n)\"]]";
validate_data(
"loki_schema",
&client,
"show create table loki_table_name;",
expected,
)
.await;

// test content
let expected = r#"[[1730976830000000000,"this is a log message","test","integration"]]"#;
validate_data(
"loki_content",
&client,
"select * from loki_table_name;",
expected,
)
.await;

guard.remove_all().await;
}

async fn validate_data(test_name: &str, client: &TestClient, sql: &str, expected: &str) {
let res = client
.get(format!("/v1/sql?sql={sql}").as_str())
Expand Down

0 comments on commit 9cac101

Please sign in to comment.