From 52da49ac0d2ddee458cc3279a20f5079b8b754f4 Mon Sep 17 00:00:00 2001 From: Dominik Liebler Date: Fri, 19 Apr 2024 16:14:35 +0200 Subject: [PATCH] fetch results for Trino more efficiently --- connectorx/src/sources/trino/mod.rs | 39 ++++++++++++++++++---- connectorx/src/sources/trino/typesystem.rs | 4 +-- 2 files changed, 35 insertions(+), 8 deletions(-) diff --git a/connectorx/src/sources/trino/mod.rs b/connectorx/src/sources/trino/mod.rs index 4f463fb28..f0bdfe6d3 100644 --- a/connectorx/src/sources/trino/mod.rs +++ b/connectorx/src/sources/trino/mod.rs @@ -129,7 +129,6 @@ where fn fetch_metadata(&mut self) { assert!(!self.queries.is_empty()); - // TODO: prevent from running the same query multiple times (limit1 + no limit) let first_query = &self.queries[0]; let cxq = limit1_query(first_query, &GenericDialect {})?; @@ -238,6 +237,9 @@ impl SourcePartition for TrinoSourcePartition { } pub struct TrinoSourcePartitionParser<'a> { + rt: Arc, + client: Arc, + next_uri: Option, rows: Vec, ncols: usize, current_col: usize, @@ -253,11 +255,19 @@ impl<'a> TrinoSourcePartitionParser<'a> { query: CXQuery, schema: &[TrinoTypeSystem], ) -> Self { - let rows = client.get_all::(query.to_string()); - let data = rt.block_on(rows).map_err(TrinoSourceError::PrustoError)?; - let rows = data.clone().into_vec(); + let results = rt + .block_on(client.get::(query.to_string())) + .map_err(TrinoSourceError::PrustoError)?; + + let rows = match results.data_set { + Some(x) => x.into_vec(), + _ => vec![], + }; Self { + rt, + client, + next_uri: results.next_uri, rows, ncols: schema.len(), current_row: 0, @@ -283,8 +293,25 @@ impl<'a> PartitionParser<'a> for TrinoSourcePartitionParser<'a> { fn fetch_next(&mut self) -> (usize, bool) { assert!(self.current_col == 0); - // results are always fetched in a single batch for Prusto - (self.rows.len(), true) + match self.next_uri.clone() { + Some(uri) => { + let results = self + .rt + .block_on(self.client.get_next::(&uri)) + .map_err(TrinoSourceError::PrustoError)?; + + self.rows = match results.data_set { + Some(x) => x.into_vec(), + _ => vec![], + }; + + self.current_row = 0; + self.next_uri = results.next_uri; + + (self.rows.len(), false) + } + None => return (self.rows.len(), true), + } } } diff --git a/connectorx/src/sources/trino/typesystem.rs b/connectorx/src/sources/trino/typesystem.rs index f739c9a42..c21c8cf45 100644 --- a/connectorx/src/sources/trino/typesystem.rs +++ b/connectorx/src/sources/trino/typesystem.rs @@ -1,10 +1,10 @@ use super::errors::TrinoSourceError; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use fehler::{throw, throws}; -use prusto::{Presto, PrestoFloat, PrestoInt, PrestoTy}; +use prusto::{PrestoFloat, PrestoInt, PrestoTy}; use std::convert::TryFrom; -// TODO: implement Tuple, Row, Array and Map as well as UUID +// TODO: implement Tuple, Row, Array and Map #[derive(Copy, Clone, Debug, PartialEq)] pub enum TrinoTypeSystem { Date(bool),