Skip to content

Commit

Permalink
WIP: plugin development flow
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldix committed Dec 23, 2024
1 parent 2a132f1 commit 301b83c
Show file tree
Hide file tree
Showing 17 changed files with 442 additions and 7 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion influxdb3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ tokio_console = ["console-subscriber", "tokio/tracing", "observability_deps/rele

# Use jemalloc as the default allocator.
jemalloc_replacing_malloc = ["influxdb3_process/jemalloc_replacing_malloc"]
system-py = ["influxdb3_write/system-py"]
system-py = ["influxdb3_write/system-py", "influxdb3_server/system-py"]

[dev-dependencies]
# Core Crates
Expand Down
21 changes: 21 additions & 0 deletions influxdb3/src/commands/plugin_test/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::error::Error;

pub mod wal;

#[derive(Debug, clap::Parser)]
pub(crate) struct Config {
#[clap(subcommand)]
command: Command,
}

#[derive(Debug, clap::Parser)]
enum Command {
/// Test a plugin triggered by WAL writes
Wal(wal::Config),
}

pub(crate) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
match config.command {
Command::Wal(config) => wal::command(config).await,
}
}
68 changes: 68 additions & 0 deletions influxdb3/src/commands/plugin_test/wal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::error::Error;
use secrecy::ExposeSecret;
use influxdb3_client::plugin_test::WalPluginTestRequest;
use crate::commands::common::InfluxDb3Config;

#[derive(Debug, clap::Parser)]
pub struct Config {
#[clap(flatten)]
influxdb3_config: InfluxDb3Config,

#[clap(flatten)]
wal_plugin_test: WalPluginTest,
}

#[derive(Debug, clap::Parser)]
pub struct WalPluginTest {
/// The name of the plugin, which should match its file name on the server <plugin-dir>/<name>.py
#[clap(short = 'n', long = "name")]
pub name: String,
/// If given, pass this line protocol as input
#[clap(long = "lp")]
pub input_lp: Option<String>,
/// If given, pass this file of LP as input from on the server <plugin-dir>/<name>_test/<input-file>
#[clap(long = "file")]
pub input_file: Option<String>,
/// If given, save the output to this file on the server in <plugin-dir>/<name>_test/<save-output-to-file>
#[clap(long = "save-output-to-file")]
pub save_output_to_file: Option<String>,
/// If given, validate the output against this file on the server in <plugin-dir>/<name>_test/<validate-output-file>
#[clap(long = "validate-output-file")]
pub validate_output_file: Option<String>,
}

impl Into<WalPluginTestRequest> for WalPluginTest {
fn into(self) -> WalPluginTestRequest {
WalPluginTestRequest {
name: self.name,
input_lp: self.input_lp,
input_file: self.input_file,
save_output_to_file: self.save_output_to_file,
validate_output_file: self.validate_output_file,
}
}
}

pub(super) async fn command(config: Config) -> Result<(), Box<dyn Error>> {
let InfluxDb3Config {
host_url,
auth_token,
..
} = config.influxdb3_config;

let wal_plugin_test_request: WalPluginTestRequest = config.wal_plugin_test.into();

let mut client = influxdb3_client::Client::new(host_url)?;
if let Some(t) = auth_token {
client = client.with_auth_token(t.expose_secret());
}
let resonse = client.wal_plugin_test(wal_plugin_test_request).await?;

// pretty print the response
println!(
"RESPONSE:\n{}",
serde_json::to_string_pretty(&resonse).expect("serialize wal plugin test response as JSON")
);

Ok(())
}
10 changes: 10 additions & 0 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use panic_logging::SendPanicsToTracing;
use parquet_file::storage::{ParquetStorage, StorageId};
use std::{collections::HashMap, path::Path, str::FromStr};
use std::{num::NonZeroUsize, sync::Arc};
use std::path::PathBuf;
use thiserror::Error;
use tokio::net::TcpListener;
use tokio::time::Instant;
Expand Down Expand Up @@ -293,6 +294,14 @@ pub struct Config {
action
)]
pub meta_cache_eviction_interval: humantime::Duration,

/// The local directory that has python plugins and their test files.
#[clap(
long = "plugin-dir",
env = "INFLUXDB3_PLUGIN_DIR",
action
)]
pub plugin_dir: Option<PathBuf>,
}

/// Specified size of the Parquet cache in megabytes (MB)
Expand Down Expand Up @@ -485,6 +494,7 @@ pub async fn command(config: Config) -> Result<()> {
wal_config,
parquet_cache,
metric_registry: Arc::clone(&metrics),
plugin_dir: config.plugin_dir,
})
.await
.map_err(|e| Error::WriteBufferInit(e.into()))?;
Expand Down
10 changes: 10 additions & 0 deletions influxdb3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ mod commands {
pub mod serve;
pub mod token;
pub mod write;
pub mod plugin_test;
}

enum ReturnCode {
Expand Down Expand Up @@ -101,6 +102,9 @@ enum Command {

/// Manage table (delete only for the moment)
Table(commands::manage::table::Config),

/// Test Python plugins for processing WAL writes, persistence Snapshots, requests, or scheduled tasks.
PluginTest(commands::plugin_test::Config),
}

fn main() -> Result<(), std::io::Error> {
Expand Down Expand Up @@ -177,6 +181,12 @@ fn main() -> Result<(), std::io::Error> {
std::process::exit(ReturnCode::Failure as _)
}
}
Some(Command::PluginTest(config)) => {
if let Err(e) = commands::plugin_test::command(config).await {
eprintln!("Plugin Test command failed: {e}");
std::process::exit(ReturnCode::Failure as _)
}
}
}
});

Expand Down
1 change: 1 addition & 0 deletions influxdb3_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ bytes.workspace = true
reqwest.workspace = true
secrecy.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
url.workspace = true

Expand Down
29 changes: 29 additions & 0 deletions influxdb3_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod plugin_test;

use std::{
collections::HashMap, fmt::Display, num::NonZeroUsize, string::FromUtf8Error, time::Duration,
};
Expand All @@ -8,6 +10,7 @@ use reqwest::{Body, IntoUrl, Method, StatusCode};
use secrecy::{ExposeSecret, Secret};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::plugin_test::{WalPluginTestRequest, WalPluginTestResponse};

/// Primary error type for the [`Client`]
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -466,6 +469,32 @@ impl Client {
}
}

/// Make a request to the `POST /api/v3/plugin_test/wal` API
pub async fn wal_plugin_test(&self, wal_plugin_test_request: WalPluginTestRequest) -> Result<WalPluginTestResponse> {
let api_path = "/api/v3/plugin_test/wal";

let url = self.base_url.join(api_path)?;

let mut req = self.http_client.post(url).json(&wal_plugin_test_request);

if let Some(token) = &self.auth_token {
req = req.bearer_auth(token.expose_secret());
}
let resp = req
.send()
.await
.map_err(|src| Error::request_send(Method::POST, api_path, src))?;

if resp.status().is_success() {
resp.json().await.map_err(Error::Json)
} else {
Err(Error::ApiError {
code: resp.status(),
message: resp.text().await.map_err(Error::Text)?,
})
}
}

/// Send a `/ping` request to the target `influxdb3` server to check its
/// status and gather `version` and `revision` information
pub async fn ping(&self) -> Result<PingResponse> {
Expand Down
22 changes: 22 additions & 0 deletions influxdb3_client/src/plugin_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//! Request structs for the /api/v3/plugin_test API
use std::collections::HashMap;
use serde::{Deserialize, Serialize};

/// Request definition for `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestRequest {
pub name: String,
pub input_lp: Option<String>,
pub input_file: Option<String>,
pub save_output_to_file: Option<String>,
pub validate_output_file: Option<String>,
}

/// Response definition for `POST /api/v3/plugin_test/wal` API
#[derive(Debug, Serialize, Deserialize)]
pub struct WalPluginTestResponse {
pub log_lines: Vec<String>,
pub database_writes: HashMap<String, Vec<String>>,
pub errors: Vec<String>,
}
1 change: 1 addition & 0 deletions influxdb3_py_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ influxdb3_catalog = {path = "../influxdb3_catalog"}
async-trait.workspace = true
schema.workspace = true
parking_lot.workspace = true
log = "0.4.22"

[dependencies.pyo3]
version = "0.23.3"
Expand Down
2 changes: 1 addition & 1 deletion influxdb3_py_api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#[cfg(feature = "system-py")]
pub mod system_py;
pub mod system_py;
Loading

0 comments on commit 301b83c

Please sign in to comment.