Skip to content

Commit

Permalink
chore: add greptime log sink
Browse files Browse the repository at this point in the history
  • Loading branch information
paomian committed Jul 9, 2024
1 parent e144ac6 commit 739c4c8
Show file tree
Hide file tree
Showing 17 changed files with 1,200 additions and 379 deletions.
65 changes: 58 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ ratatui = { version = "0.26.3", optional = true, default-features = false, featu
hex = { version = "0.4.3", default-features = false, optional = true }

# GreptimeDB
greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-ingester-rust.git", rev = "d21dbcff680139ed2065b62100bac3123da7c789", optional = true }
greptimedb-ingester = { git = "https://github.com/GreptimeTeam/greptimedb-ingester-rust", rev = "2e6b0c5eb6a5e7549c3100e4d356b07d15cce66d", optional = true }

# External libs
arc-swap = { version = "1.7", default-features = false, optional = true }
Expand Down Expand Up @@ -674,6 +674,7 @@ sinks-logs = [
"sinks-elasticsearch",
"sinks-file",
"sinks-gcp",
"sinks-greptimedb_logs",
"sinks-honeycomb",
"sinks-http",
"sinks-humio",
Expand Down Expand Up @@ -737,7 +738,8 @@ sinks-elasticsearch = ["transforms-metric_to_log"]
sinks-file = ["dep:async-compression"]
sinks-gcp = ["sinks-gcp-chronicle", "dep:base64", "gcp"]
sinks-gcp-chronicle = ["gcp"]
sinks-greptimedb = ["dep:greptimedb-client"]
sinks-greptimedb = ["dep:greptimedb-ingester"]
sinks-greptimedb_logs = []
sinks-honeycomb = []
sinks-http = []
sinks-humio = ["sinks-splunk_hec", "transforms-metric_to_log"]
Expand Down Expand Up @@ -787,6 +789,7 @@ all-integration-tests = [
"gcp-integration-tests",
"gcp-pubsub-integration-tests",
"greptimedb-integration-tests",
"greptimedb-logs-integration-tests",
"http-client-integration-tests",
"humio-integration-tests",
"influxdb-integration-tests",
Expand Down Expand Up @@ -851,6 +854,7 @@ gcp-cloud-storage-integration-tests = ["sinks-gcp"]
gcp-integration-tests = ["sinks-gcp"]
gcp-pubsub-integration-tests = ["sinks-gcp", "sources-gcp_pubsub"]
greptimedb-integration-tests = ["sinks-greptimedb"]
greptimedb-logs-integration-tests = ["sinks-greptimedb_logs"]
humio-integration-tests = ["sinks-humio"]
http-client-integration-tests = ["sources-http_client"]
influxdb-integration-tests = ["sinks-influxdb"]
Expand Down
171 changes: 171 additions & 0 deletions src/sinks/greptimedb/logs/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use std::collections::HashMap;

use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;

use crate::http::{Auth, HttpClient};
use crate::sinks::greptimedb::logs::http_request_builder::{
http_healthcheck, GreptimeDBHttpRetryLogic, GreptimeDBLogsHttpRequestBuilder, PartitionKey,
};
use crate::sinks::greptimedb::logs::sink::GreptimeDBLogsHttpSink;
use crate::sinks::util::http::HttpService;
use crate::sinks::{
greptimedb::{default_dbname_template, GreptimeDBDefaultBatchSettings},
prelude::*,
};
use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};

fn extra_params_examples() -> HashMap<String, String> {
HashMap::<_, _>::from_iter([("source".to_owned(), "vector".to_owned())])
}

/// Configuration for the `greptimedb_logs` sink.
#[configurable_component(sink("greptimedb_logs", "Ingest logs data into GreptimeDB."))]
#[derive(Clone, Debug, Default, Derivative)]
#[serde(deny_unknown_fields)]
pub struct GreptimeDBLogsConfig {
/// The endpoint of the GreptimeDB server.
#[serde(alias = "host")]
#[configurable(metadata(docs::examples = "http://localhost:4000"))]
pub endpoint: String,

/// The table that data is inserted into.
#[configurable(metadata(docs::examples = "mytable"))]
pub table: Template,

/// The GreptimeDB [database][database] name to connect.
///
/// Default to `public`, the default database of GreptimeDB.
///
/// Database can be created via `create database` statement on
/// GreptimeDB. If you are using GreptimeCloud, use `dbname` from the
/// connection information of your instance.
///
/// [database]: https://docs.greptime.com/user-guide/concepts/key-concepts#database
#[configurable(metadata(docs::examples = "public"))]
#[derivative(Default(value = "default_dbname_template()"))]
#[serde(default = "default_dbname_template")]
pub dbname: Template,

/// pipeline name to be used for the logs
#[configurable(metadata(docs::examples = "pipeline_name"))]
pub pipeline_name: Template,

/// pipeline version to be used for the logs
#[configurable(metadata(docs::examples = "2024-06-07 06:46:23.858293"))]
pub pipeline_version: Option<Template>,

/// The username for your GreptimeDB instance.
///
/// This is required if your instance has authentication enabled.
#[configurable(metadata(docs::examples = "username"))]
#[serde(default)]
pub username: Option<String>,
/// The password for your GreptimeDB instance.
///
/// This is required if your instance has authentication enabled.
#[configurable(metadata(docs::examples = "password"))]
#[serde(default)]
pub password: Option<SensitiveString>,
/// Set http compression encoding for the request
/// Default to none, `gzip` or `zstd` is supported.
#[configurable(derived)]
#[serde(default = "Compression::gzip_default")]
pub compression: Compression,

#[configurable(derived)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
pub encoding: Transformer,

/// Custom parameters to add to the query string for each HTTP request sent to GreptimeDB.
#[serde(default)]
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
#[configurable(metadata(docs::examples = "extra_params_examples()"))]
pub extra_params: Option<HashMap<String, String>>,

#[configurable(derived)]
#[serde(default)]
pub(crate) batch: BatchConfig<GreptimeDBDefaultBatchSettings>,

#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,

#[configurable(derived)]
pub tls: Option<TlsConfig>,

#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
}

impl_generate_config_from_default!(GreptimeDBLogsConfig);

#[async_trait::async_trait]
#[typetag::serde(name = "greptimedb_logs")]
impl SinkConfig for GreptimeDBLogsConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let tls_settings = TlsSettings::from_options(&self.tls)?;
let client = HttpClient::new(tls_settings, &cx.proxy)?;

let auth = match (self.username.clone(), self.password.clone()) {
(Some(username), Some(password)) => Some(Auth::Basic {
user: username,
password,
}),
_ => None,
};
let request_builder = GreptimeDBLogsHttpRequestBuilder {
endpoint: self.endpoint.clone(),
auth: auth.clone(),
encoder: (
self.encoding.clone(),
Encoder::<Framer>::new(
NewlineDelimitedEncoderConfig.build().into(),
JsonSerializerConfig::default().build().into(),
),
),
compression: self.compression,
extra_params: self.extra_params.clone(),
};

let service: HttpService<GreptimeDBLogsHttpRequestBuilder, PartitionKey> =
HttpService::new(client.clone(), request_builder.clone());

let request_limits = self.request.into_settings();

let service = ServiceBuilder::new()
.settings(request_limits, GreptimeDBHttpRetryLogic::default())
.service(service);

let sink = GreptimeDBLogsHttpSink::new(
self.batch.into_batcher_settings()?,
service,
self.dbname.clone(),
self.table.clone(),
self.pipeline_name.clone(),
self.pipeline_version.clone(),
request_builder,
);

let healthcheck = Box::pin(http_healthcheck(
client,
self.endpoint.clone(),
auth.clone(),
));
Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}

fn input(&self) -> Input {
Input::log()
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
Loading

0 comments on commit 739c4c8

Please sign in to comment.