Skip to content

Commit

Permalink
nexus: EXECUTE peer($$query$$)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Nov 8, 2024
1 parent b7ca715 commit 3d378ae
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 57 deletions.
30 changes: 29 additions & 1 deletion nexus/analyzer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use qrep::process_options;
use sqlparser::ast::{
self, visit_relations, visit_statements,
CreateMirror::{Select, CDC},
Expr, FetchDirection, SqlOption, Statement,
DollarQuotedString, Expr, FetchDirection, SqlOption, Statement, Value,
};

mod qrep;
Expand Down Expand Up @@ -116,6 +116,10 @@ pub enum PeerDDL {
peer_name: String,
if_exists: bool,
},
ExecutePeer {
peer_name: String,
query: String,
},
CreateMirrorForCDC {
if_not_exists: bool,
flow_job: Box<FlowJob>,
Expand Down Expand Up @@ -388,6 +392,30 @@ impl StatementAnalyzer for PeerDDLAnalyzer {
}
}
}
Statement::Execute {
name, parameters, ..
} => {
if let Some(Expr::Value(query)) = parameters.first() {
if let Some(query) = match query {
Value::DoubleQuotedString(query)
| Value::SingleQuotedString(query)
| Value::EscapedStringLiteral(query) => Some(query.clone()),
Value::DollarQuotedString(DollarQuotedString { value, .. }) => {
Some(value.clone())
}
_ => None,
} {
Ok(Some(PeerDDL::ExecutePeer {
peer_name: name.to_string().to_lowercase(),
query: query.to_string(),
}))
} else {
Ok(None)
}
} else {
Ok(None)
}
}
Statement::ExecuteMirror { mirror_name } => Ok(Some(PeerDDL::ExecuteMirrorForSelect {
flow_job_name: mirror_name.to_string().to_lowercase(),
})),
Expand Down
6 changes: 5 additions & 1 deletion nexus/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl Catalog {
let stmt = self
.pg
.prepare_typed(
"SELECT id, name, type, options, enc_key_id FROM public.peers WHERE name = $1",
"SELECT name, type, options, enc_key_id FROM public.peers WHERE name = $1",
&[],
)
.await?;
Expand Down Expand Up @@ -516,6 +516,10 @@ impl Catalog {

#[async_trait::async_trait]
impl QueryExecutor for Catalog {
async fn execute_raw(&self, query: &str) -> PgWireResult<QueryOutput> {
peer_postgres::pg_execute_raw(&self.pg, query).await
}

#[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))]
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput> {
peer_postgres::pg_execute(&self.pg, ast::PostgresAst { peername: None }, stmt).await
Expand Down
21 changes: 12 additions & 9 deletions nexus/peer-bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ impl BigQueryQueryExecutor {

#[async_trait::async_trait]
impl QueryExecutor for BigQueryQueryExecutor {
async fn execute_raw(&self, query: &str) -> PgWireResult<QueryOutput> {
let query_response = self.run_tracked(query).await?;
let cursor = BqRecordStream::from(query_response);
tracing::info!(
"retrieved {} rows for query {}",
cursor.get_num_records(),
query
);
Ok(QueryOutput::Stream(Box::pin(cursor)))
}

#[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))]
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput> {
// only support SELECT statements
Expand All @@ -105,15 +116,7 @@ impl QueryExecutor for BigQueryQueryExecutor {
let query = query.to_string();
tracing::info!("bq rewritten query: {}", query);

let query_response = self.run_tracked(&query).await?;

let cursor = BqRecordStream::from(query_response);
tracing::info!(
"retrieved {} rows for query {}",
cursor.get_num_records(),
query
);
Ok(QueryOutput::Stream(Box::pin(cursor)))
self.execute_raw(&query).await
}
Statement::Declare { stmts } => {
if stmts.len() != 1 {
Expand Down
4 changes: 3 additions & 1 deletion nexus/peer-bigquery/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use std::{
use chrono::DateTime;
use futures::Stream;
use gcp_bigquery_client::model::{
field_type::FieldType, query_response::{ResultSet, QueryResponse}, table_field_schema::TableFieldSchema,
field_type::FieldType,
query_response::{QueryResponse, ResultSet},
table_field_schema::TableFieldSchema,
};
use peer_cursor::{Record, RecordStream, Schema};
use pgwire::{
Expand Down
1 change: 1 addition & 0 deletions nexus/peer-cursor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum QueryOutput {

#[async_trait::async_trait]
pub trait QueryExecutor: Send + Sync {
async fn execute_raw(&self, stmt: &str) -> PgWireResult<QueryOutput>;
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput>;
async fn describe(&self, stmt: &Statement) -> PgWireResult<Option<Schema>>;
}
Expand Down
6 changes: 5 additions & 1 deletion nexus/peer-mysql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ impl MySqlQueryExecutor {

#[async_trait::async_trait]
impl QueryExecutor for MySqlQueryExecutor {
// #[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))]
async fn execute_raw(&self, query: &str) -> PgWireResult<QueryOutput> {
let cursor = self.query(query.to_string()).await?;
Ok(QueryOutput::Stream(Box::pin(cursor)))
}

async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput> {
// only support SELECT statements
match stmt {
Expand Down
60 changes: 33 additions & 27 deletions nexus/peer-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,34 @@ async fn schema_from_query(client: &Client, query: &str) -> anyhow::Result<Schem
Ok(Arc::new(fields))
}

pub async fn pg_execute_raw(client: &Client, query: &str) -> PgWireResult<QueryOutput> {
// first fetch the schema as this connection will be
// short lived, only then run the query as the query
// could hold the pin on the connection for a long time.
let schema = schema_from_query(client, query).await.map_err(|e| {
tracing::error!("error getting schema: {}", e);
PgWireError::ApiError(format!("error getting schema: {}", e).into())
})?;

tracing::info!("[peer-postgres] rewritten query: {}", query);
// given that there could be a lot of rows returned, we
// need to use a cursor to stream the rows back to the
// client.
let stream = client
.query_raw(query, std::iter::empty::<&str>())
.await
.map_err(|e| {
tracing::error!("error executing query: {}", e);
PgWireError::ApiError(format!("error executing query: {}", e).into())
})?;

// log that raw query execution has completed
tracing::info!("[peer-postgres] raw query execution completed");

let cursor = stream::PgRecordStream::new(stream, schema);
Ok(QueryOutput::Stream(Box::pin(cursor)))
}

pub async fn pg_execute(
client: &Client,
ast: ast::PostgresAst,
Expand All @@ -58,33 +86,7 @@ pub async fn pg_execute(
ast.rewrite_query(&mut query);
let rewritten_query = query.to_string();

// first fetch the schema as this connection will be
// short lived, only then run the query as the query
// could hold the pin on the connection for a long time.
let schema = schema_from_query(client, &rewritten_query)
.await
.map_err(|e| {
tracing::error!("error getting schema: {}", e);
PgWireError::ApiError(format!("error getting schema: {}", e).into())
})?;

tracing::info!("[peer-postgres] rewritten query: {}", rewritten_query);
// given that there could be a lot of rows returned, we
// need to use a cursor to stream the rows back to the
// client.
let stream = client
.query_raw(&rewritten_query, std::iter::empty::<&str>())
.await
.map_err(|e| {
tracing::error!("error executing query: {}", e);
PgWireError::ApiError(format!("error executing query: {}", e).into())
})?;

// log that raw query execution has completed
tracing::info!("[peer-postgres] raw query execution completed");

let cursor = stream::PgRecordStream::new(stream, schema);
Ok(QueryOutput::Stream(Box::pin(cursor)))
pg_execute_raw(client, &rewritten_query).await
}
_ => {
let mut rewritten_stmt = stmt.clone();
Expand Down Expand Up @@ -120,6 +122,10 @@ pub async fn pg_describe(client: &Client, stmt: &Statement) -> PgWireResult<Opti

#[async_trait::async_trait]
impl QueryExecutor for PostgresQueryExecutor {
async fn execute_raw(&self, query: &str) -> PgWireResult<QueryOutput> {
pg_execute_raw(&self.client, query).await
}

#[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))]
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput> {
pg_execute(
Expand Down
16 changes: 16 additions & 0 deletions nexus/peer-snowflake/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,22 @@ impl SnowflakeQueryExecutor {

#[async_trait::async_trait]
impl QueryExecutor for SnowflakeQueryExecutor {
async fn execute_raw(&self, query: &str) -> PgWireResult<QueryOutput> {
let result_set = self
.process_query(&query)
.await
.map_err(|err| PgWireError::ApiError(err.into()))?;

let cursor = stream::SnowflakeRecordStream::new(
result_set,
self.partition_index,
self.partition_number,
self.endpoint_url.clone(),
self.auth.clone(),
);
Ok(QueryOutput::Stream(Box::pin(cursor)))
}

#[tracing::instrument(skip(self, stmt), fields(stmt = %stmt))]
async fn execute(&self, stmt: &Statement) -> PgWireResult<QueryOutput> {
match stmt {
Expand Down
43 changes: 26 additions & 17 deletions nexus/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,12 @@ impl NexusBackend {
}

// execute a statement on a peer
async fn execute_statement<'a>(
async fn process_execution<'a>(
&self,
executor: &dyn QueryExecutor,
stmt: &sqlparser::ast::Statement,
result: QueryOutput,
peer_holder: Option<Box<Peer>>,
) -> PgWireResult<Vec<Response<'a>>> {
let res = executor.execute(stmt).await?;
match res {
match result {
QueryOutput::AffectedRows(rows) => {
Ok(vec![Response::Execution(Tag::new("OK").with_rows(rows))])
}
Expand Down Expand Up @@ -413,6 +411,20 @@ impl NexusBackend {
))))
}
}
PeerDDL::ExecutePeer { peer_name, query } => {
let peer = self.catalog.get_peer(&peer_name).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer config: {:?}", err).into(),
)
})?;
let executor = self.get_peer_executor(&peer).await.map_err(|err| {
PgWireError::ApiError(
format!("unable to get peer executor: {:?}", err).into(),
)
})?;
let res = executor.execute_raw(&query).await?;
self.process_execution(res, Some(Box::new(peer))).await
}
PeerDDL::DropMirror { .. } => self.handle_drop_mirror(&nexus_stmt).await,
PeerDDL::DropPeer {
if_exists,
Expand Down Expand Up @@ -578,14 +590,8 @@ impl NexusBackend {
}
};

let res = self
.execute_statement(executor.as_ref(), &stmt, peer_holder)
.await;
// log the error if execution failed
if let Err(err) = &res {
tracing::error!("query execution failed: {:?}", err);
}
res
let res = executor.execute(&stmt).await?;
self.process_execution(res, peer_holder).await
}

NexusStatement::PeerCursor { stmt, cursor } => {
Expand All @@ -606,12 +612,13 @@ impl NexusBackend {
}
};

self.execute_statement(executor.as_ref(), &stmt, None).await
let res = executor.execute(&stmt).await?;
self.process_execution(res, None).await
}

NexusStatement::Rollback { stmt } => {
self.execute_statement(self.catalog.as_ref(), &stmt, None)
.await
let res = self.catalog.execute(&stmt).await?;
self.process_execution(res, None).await
}

NexusStatement::Empty => Ok(vec![Response::EmptyQuery]),
Expand Down Expand Up @@ -1105,7 +1112,9 @@ pub async fn main() -> anyhow::Result<()> {
let catalog_config = get_catalog_config(&args).await?;

if args.migrations_disabled && args.migrations_only {
return Err(anyhow::anyhow!("Invalid configuration, migrations cannot be enabled and disabled at the same time"));
return Err(anyhow::anyhow!(
"Invalid configuration, migrations cannot be enabled and disabled at the same time"
));
}

if !args.migrations_disabled {
Expand Down

0 comments on commit 3d378ae

Please sign in to comment.