Skip to content

Commit f1032cc

Browse files
authored
feat: Impl Arrow Flight protocol for query response (#769)
1 parent bff3101 commit f1032cc

29 files changed

+794
-76
lines changed

Cargo.lock

Lines changed: 28 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,13 @@ build = "build.rs"
1212
arrow-schema = { version = "51.0.0", features = ["serde"] }
1313
arrow-array = { version = "51.0.0" }
1414
arrow-json = "51.0.0"
15-
arrow-ipc = "51.0.0"
15+
arrow-ipc = { version = "51.0.0", features = ["zstd"] }
1616
arrow-select = "51.0.0"
1717
datafusion = "37.1.0"
1818
object_store = { version = "0.9.1", features = ["cloud", "aws"] }
1919
parquet = "51.0.0"
20-
21-
### LiveTail server deps
22-
arrow-flight = "51.0.0"
23-
tonic = {version = "0.11.0", features = ["tls"] }
20+
arrow-flight = { version = "51.0.0", features = [ "tls" ] }
21+
tonic = {version = "0.11.0", features = ["tls", "transport", "gzip", "zstd"] }
2422
tonic-web = "0.11.0"
2523
tower-http = { version = "0.4.4", features = ["cors"] }
2624

server/src/banner.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,12 @@ fn print_ascii_art() {
4949

5050
fn status_info(config: &Config, scheme: &str, id: Uid) {
5151
let address = format!(
52-
"\"{}://{}\" ({}), \":{}\" (gRPC)",
52+
"\"{}://{}\" ({}), \":{}\" (livetail), \":{}\" (flight protocol)",
5353
scheme,
5454
config.parseable.address,
5555
scheme.to_ascii_uppercase(),
56-
config.parseable.grpc_port
56+
config.parseable.grpc_port,
57+
config.parseable.flight_port
5758
);
5859

5960
let mut credentials =

server/src/cli.rs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ pub struct Cli {
8989

9090
/// public address for the parseable server ingestor
9191
pub ingestor_endpoint: String,
92+
93+
/// port use by airplane(flight query service)
94+
pub flight_port: u16,
9295
}
9396

9497
impl Cli {
@@ -118,6 +121,7 @@ impl Cli {
118121
pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint";
119122
pub const DEFAULT_USERNAME: &'static str = "admin";
120123
pub const DEFAULT_PASSWORD: &'static str = "admin";
124+
pub const FLIGHT_PORT: &'static str = "flight-port";
121125

122126
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
123127
self.local_staging_path.join(stream_name)
@@ -275,6 +279,16 @@ impl Cli {
275279
.value_parser(value_parser!(u16))
276280
.help("Port for gRPC server"),
277281
)
282+
.arg(
283+
Arg::new(Self::FLIGHT_PORT)
284+
.long(Self::FLIGHT_PORT)
285+
.env("P_FLIGHT_PORT")
286+
.value_name("PORT")
287+
.default_value("8002")
288+
.required(false)
289+
.value_parser(value_parser!(u16))
290+
.help("Port for Arrow Flight Querying Engine"),
291+
)
278292
.arg(
279293
Arg::new(Self::LIVETAIL_CAPACITY)
280294
.long(Self::LIVETAIL_CAPACITY)
@@ -317,11 +331,11 @@ impl Cli {
317331
.help("Mode of operation"),
318332
)
319333
.arg(
320-
Arg::new(Self::INGESTOR_ENDPOINT)
321-
.long(Self::INGESTOR_ENDPOINT)
322-
.env("P_INGESTOR_ENDPOINT")
323-
.value_name("URL")
324-
.required(false)
334+
Arg::new(Self::INGESTOR_ENDPOINT)
335+
.long(Self::INGESTOR_ENDPOINT)
336+
.env("P_INGESTOR_ENDPOINT")
337+
.value_name("URL")
338+
.required(false)
325339
.help("URL to connect to this specific ingestor. Default is the address of the server.")
326340
)
327341
.arg(
@@ -401,6 +415,10 @@ impl FromArgMatches for Cli {
401415
.get_one::<u16>(Self::GRPC_PORT)
402416
.cloned()
403417
.expect("default for livetail port");
418+
self.flight_port = m
419+
.get_one::<u16>(Self::FLIGHT_PORT)
420+
.cloned()
421+
.expect("default for flight port");
404422
self.livetail_channel_capacity = m
405423
.get_one::<usize>(Self::LIVETAIL_CAPACITY)
406424
.cloned()

server/src/event.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use std::sync::Arc;
2828

2929
use self::error::EventError;
3030
pub use self::writer::STREAM_WRITERS;
31-
use crate::metadata;
31+
use crate::{handlers::http::ingest::PostError, metadata};
3232
use chrono::NaiveDateTime;
3333

3434
pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
@@ -48,7 +48,7 @@ pub struct Event {
4848

4949
// Events holds the schema related to a each event for a single log stream
5050
impl Event {
51-
pub async fn process(self) -> Result<(), EventError> {
51+
pub async fn process(&self) -> Result<(), EventError> {
5252
let mut key = get_schema_key(&self.rb.schema().fields);
5353
if self.time_partition.is_some() {
5454
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();
@@ -77,7 +77,7 @@ impl Event {
7777
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);
7878

7979
if let Err(e) = metadata::STREAM_INFO
80-
.check_alerts(&self.stream_name, self.rb)
80+
.check_alerts(&self.stream_name, &self.rb)
8181
.await
8282
{
8383
log::error!("Error checking for alerts. {:?}", e);
@@ -86,6 +86,24 @@ impl Event {
8686
Ok(())
8787
}
8888

89+
pub fn process_unchecked(self) -> Result<Self, PostError> {
90+
let key = get_schema_key(&self.rb.schema().fields);
91+
92+
Self::process_event(
93+
&self.stream_name,
94+
&key,
95+
self.rb.clone(),
96+
self.parsed_timestamp,
97+
)
98+
.map_err(PostError::Event)?;
99+
100+
Ok(self)
101+
}
102+
103+
pub fn clear(&self, stream_name: &str) {
104+
STREAM_WRITERS.clear(stream_name);
105+
}
106+
89107
// event process all events after the 1st event. Concatenates record batches
90108
// and puts them in memory store for each event.
91109
fn process_event(

server/src/event/writer.rs

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@ mod mem_writer;
2222

2323
use std::{
2424
collections::HashMap,
25-
sync::{Arc, Mutex, RwLock},
25+
sync::{Arc, Mutex, RwLock, RwLockWriteGuard},
26+
};
27+
28+
use crate::{
29+
option::{Mode, CONFIG},
30+
utils,
2631
};
2732

2833
use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
29-
use crate::utils;
3034
use arrow_array::{RecordBatch, TimestampMillisecondArray};
3135
use arrow_schema::Schema;
3236
use chrono::NaiveDateTime;
@@ -62,6 +66,11 @@ impl Writer {
6266
self.mem.push(schema_key, rb);
6367
Ok(())
6468
}
69+
70+
fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> {
71+
self.mem.push(schema_key, rb);
72+
Ok(())
73+
}
6574
}
6675

6776
#[derive(Deref, DerefMut, Default)]
@@ -80,7 +89,8 @@ impl WriterTable {
8089

8190
match hashmap_guard.get(stream_name) {
8291
Some(stream_writer) => {
83-
stream_writer.lock().unwrap().push(
92+
self.handle_existing_writer(
93+
stream_writer,
8494
stream_name,
8595
schema_key,
8696
record,
@@ -89,26 +99,84 @@ impl WriterTable {
8999
}
90100
None => {
91101
drop(hashmap_guard);
92-
let mut map = self.write().unwrap();
102+
let map = self.write().unwrap();
93103
// check for race condition
94104
// if map contains entry then just
95-
if let Some(writer) = map.get(stream_name) {
105+
self.handle_missing_writer(map, stream_name, schema_key, record, parsed_timestamp)?;
106+
}
107+
};
108+
Ok(())
109+
}
110+
111+
fn handle_existing_writer(
112+
&self,
113+
stream_writer: &Mutex<Writer>,
114+
stream_name: &str,
115+
schema_key: &str,
116+
record: RecordBatch,
117+
parsed_timestamp: NaiveDateTime,
118+
) -> Result<(), StreamWriterError> {
119+
if CONFIG.parseable.mode != Mode::Query {
120+
stream_writer.lock().unwrap().push(
121+
stream_name,
122+
schema_key,
123+
record,
124+
parsed_timestamp,
125+
)?;
126+
} else {
127+
stream_writer
128+
.lock()
129+
.unwrap()
130+
.push_mem(stream_name, record)?;
131+
}
132+
133+
Ok(())
134+
}
135+
136+
fn handle_missing_writer(
137+
&self,
138+
mut map: RwLockWriteGuard<HashMap<String, Mutex<Writer>>>,
139+
stream_name: &str,
140+
schema_key: &str,
141+
record: RecordBatch,
142+
parsed_timestamp: NaiveDateTime,
143+
) -> Result<(), StreamWriterError> {
144+
match map.get(stream_name) {
145+
Some(writer) => {
146+
if CONFIG.parseable.mode != Mode::Query {
96147
writer.lock().unwrap().push(
97148
stream_name,
98149
schema_key,
99150
record,
100151
parsed_timestamp,
101152
)?;
102153
} else {
154+
writer.lock().unwrap().push_mem(stream_name, record)?;
155+
}
156+
}
157+
None => {
158+
if CONFIG.parseable.mode != Mode::Query {
103159
let mut writer = Writer::default();
104160
writer.push(stream_name, schema_key, record, parsed_timestamp)?;
105161
map.insert(stream_name.to_owned(), Mutex::new(writer));
162+
} else {
163+
let mut writer = Writer::default();
164+
writer.push_mem(schema_key, record)?;
165+
map.insert(stream_name.to_owned(), Mutex::new(writer));
106166
}
107167
}
108-
};
168+
}
109169
Ok(())
110170
}
111171

172+
pub fn clear(&self, stream_name: &str) {
173+
let map = self.write().unwrap();
174+
if let Some(writer) = map.get(stream_name) {
175+
let w = &mut writer.lock().unwrap().mem;
176+
w.clear();
177+
}
178+
}
179+
112180
pub fn delete_stream(&self, stream_name: &str) {
113181
self.write().unwrap().remove(stream_name);
114182
}

server/src/event/writer/file_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::storage::staging::StorageDir;
2929
use chrono::NaiveDateTime;
3030

3131
pub struct ArrowWriter {
32+
#[allow(dead_code)]
3233
pub file_path: PathBuf,
3334
pub writer: StreamWriter<File>,
3435
}

0 commit comments

Comments
 (0)