Skip to content

Commit

Permalink
connection: quietly prepare unprepared queries in batch before sending
Browse files Browse the repository at this point in the history
Before sending an unprepared statement in batch, quietly prepare it on one connection to obtain the information about the
bind markers. If the list of values to be bound is empty, we
just send it as an unprepared query.
  • Loading branch information
sylwiaszunejko committed Sep 25, 2023
1 parent e9af9a1 commit 3c58014
Showing 1 changed file with 60 additions and 3 deletions.
63 changes: 60 additions & 3 deletions scylla/src/transport/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) use ssl_config::SslConfig;

use crate::authentication::AuthenticatorProvider;
use scylla_cql::frame::response::authenticate::Authenticate;
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::convert::TryFrom;
use std::io::ErrorKind;
use std::net::{IpAddr, SocketAddr};
Expand All @@ -52,7 +52,7 @@ use crate::frame::{
request::{self, batch, execute, query, register, SerializableRequest},
response::{event::Event, result, NonErrorResponse, Response, ResponseOpcode},
server_event_type::EventType,
value::{BatchValues, ValueList},
value::{BatchValues, BatchValuesIterator, ValueList},
FrameParams, SerializedRequest,
};
use crate::query::Query;
Expand Down Expand Up @@ -772,11 +772,13 @@ impl Connection {

pub(crate) async fn batch_with_consistency(
&self,
batch: &Batch,
init_batch: &Batch,
values: impl BatchValues,
consistency: Consistency,
serial_consistency: Option<SerialConsistency>,
) -> Result<QueryResult, QueryError> {
let batch = self.prepare_batch(init_batch, &values).await?;

let batch_frame = batch::Batch {
statements: Cow::Borrowed(&batch.statements),
values,
Expand Down Expand Up @@ -820,6 +822,61 @@ impl Connection {
}
}

async fn prepare_batch<'b>(
&self,
init_batch: &'b Batch,
values: impl BatchValues,
) -> Result<Cow<'b, Batch>, QueryError> {
let mut to_prepare = HashSet::<String>::new();

{
let mut values_iter = values.batch_values_iter();
for stmt in &init_batch.statements {
if let BatchStatement::Query(query) = stmt {
let value = values_iter.next_serialized().transpose()?;
if let Some(v) = value {
if v.len() > 0 {
to_prepare.insert(query.contents.clone());
}
}
} else {
values_iter.skip_next();
}
}
}

let mut prepared_queries = HashMap::<String, PreparedStatement>::new();

for query in &to_prepare {
let prepared = self.prepare(&Query::new(query)).await?;
prepared_queries.insert(query.clone(), prepared);
}

let mut batch: Cow<Batch>;

if to_prepare.is_empty() {
batch = Cow::Borrowed(init_batch);
} else {
batch = Cow::Owned(Default::default());
batch.to_mut().config = init_batch.config.clone();
for stmt in &init_batch.statements {
match stmt {
BatchStatement::Query(query) => {
match prepared_queries.get(&query.contents) {
Some(prepared) => batch.to_mut().append_statement(prepared.clone()),
None => batch.to_mut().append_statement(query.clone()),
}
}
BatchStatement::PreparedStatement(prepared) => {
batch.to_mut().append_statement(prepared.clone());
}
}
}
}

Ok(batch)
}

pub(crate) async fn use_keyspace(
&self,
keyspace_name: &VerifiedKeyspaceName,
Expand Down

0 comments on commit 3c58014

Please sign in to comment.