Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

errors: pager API errors refactor #1160

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 73 additions & 68 deletions scylla/src/client/pager.rs

Large diffs are not rendered by default.

78 changes: 38 additions & 40 deletions scylla/src/client/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use crate::cluster::node::CloudEndpoint;
use crate::cluster::node::{InternalKnownNode, KnownNode, NodeRef};
use crate::cluster::{Cluster, ClusterNeatDebug, ClusterState};
use crate::errors::{
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, TracingProtocolError,
BadQuery, NewSessionError, ProtocolError, QueryError, RequestAttemptError, RequestError,
TracingProtocolError,
};
use crate::frame::response::result;
#[cfg(feature = "ssl")]
Expand All @@ -30,7 +31,7 @@ use crate::policies::host_filter::HostFilter;
use crate::policies::load_balancing::{self, RoutingInfo};
use crate::policies::retry::{RequestInfo, RetryDecision, RetrySession};
use crate::policies::speculative_execution;
use crate::prepared_statement::PreparedStatement;
use crate::prepared_statement::{PartitionKeyError, PreparedStatement};
use crate::query::Query;
#[allow(deprecated)]
use crate::response::legacy_query_result::LegacyQueryResult;
Expand Down Expand Up @@ -1234,6 +1235,7 @@ where
self.metrics.clone(),
)
.await
.map_err(QueryError::from)
} else {
// Making QueryPager::new_for_query work with values is too hard (if even possible)
// so instead of sending one prepare to a specific connection on each iterator query,
Expand All @@ -1248,6 +1250,7 @@ where
metrics: self.metrics.clone(),
})
.await
.map_err(QueryError::from)
}
}

Expand Down Expand Up @@ -1393,7 +1396,8 @@ where
let paging_state_ref = &paging_state;

let (partition_key, token) = prepared
.extract_partition_key_and_calculate_token(prepared.get_partitioner_name(), values_ref)?
.extract_partition_key_and_calculate_token(prepared.get_partitioner_name(), values_ref)
.map_err(PartitionKeyError::into_query_error)?
.unzip();

let execution_profile = prepared
Expand Down Expand Up @@ -1502,6 +1506,7 @@ where
metrics: self.metrics.clone(),
})
.await
.map_err(QueryError::from)
}

async fn do_batch(
Expand Down Expand Up @@ -1849,11 +1854,11 @@ where
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
ResT: AllowedRunRequestResTType,
{
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::QueryId)> =
let history_listener_and_id: Option<(&'a dyn HistoryListener, history::RequestId)> =
statement_config
.history_listener
.as_ref()
.map(|hl| (&**hl, hl.log_query_start()));
.map(|hl| (&**hl, hl.log_request_start()));

let load_balancer = &execution_profile.load_balancing_policy;

Expand Down Expand Up @@ -1899,16 +1904,18 @@ where
let request_runner_generator = |is_speculative: bool| {
let history_data: Option<HistoryData> = history_listener_and_id
.as_ref()
.map(|(history_listener, query_id)| {
.map(|(history_listener, request_id)| {
let speculative_id: Option<history::SpeculativeId> =
if is_speculative {
Some(history_listener.log_new_speculative_fiber(*query_id))
Some(
history_listener.log_new_speculative_fiber(*request_id),
)
} else {
None
};
HistoryData {
listener: *history_listener,
query_id: *query_id,
request_id: *request_id,
speculative_id,
}
});
Expand Down Expand Up @@ -1947,9 +1954,9 @@ where
let history_data: Option<HistoryData> =
history_listener_and_id
.as_ref()
.map(|(history_listener, query_id)| HistoryData {
.map(|(history_listener, request_id)| HistoryData {
listener: *history_listener,
query_id: *query_id,
request_id: *request_id,
speculative_id: None,
});
self.run_request_speculative_fiber(
Expand All @@ -1966,7 +1973,6 @@ where
},
)
.await
.unwrap_or(Err(QueryError::EmptyPlan))
}
}
};
Expand All @@ -1977,24 +1983,19 @@ where
let result = match effective_timeout {
Some(timeout) => tokio::time::timeout(timeout, runner)
.await
.unwrap_or_else(|e| {
Err(QueryError::RequestTimeout(format!(
"Request took longer than {}ms: {}",
timeout.as_millis(),
e
)))
}),
None => runner.await,
.map(|res| res.map_err(RequestError::from))
.unwrap_or_else(|_| Err(RequestError::RequestTimeout(timeout))),
None => runner.await.map_err(RequestError::from),
};

if let Some((history_listener, query_id)) = history_listener_and_id {
if let Some((history_listener, request_id)) = history_listener_and_id {
match &result {
Ok(_) => history_listener.log_query_success(query_id),
Err(e) => history_listener.log_query_error(query_id, e),
Ok(_) => history_listener.log_request_success(request_id),
Err(e) => history_listener.log_request_error(request_id, e),
}
}

result
result.map_err(RequestError::into_query_error)
}

/// Executes the closure `run_request_once`, provided the load balancing plan and some information
Expand All @@ -2008,12 +2009,12 @@ where
run_request_once: impl Fn(Arc<Connection>, Consistency, &ExecutionProfileInner) -> QueryFut,
execution_profile: &ExecutionProfileInner,
mut context: ExecuteRequestContext<'a>,
) -> Option<Result<RunRequestResult<ResT>, QueryError>>
) -> Result<RunRequestResult<ResT>, RequestError>
where
QueryFut: Future<Output = Result<ResT, RequestAttemptError>>,
ResT: AllowedRunRequestResTType,
{
let mut last_error: Option<QueryError> = None;
let mut last_error: RequestError = RequestError::EmptyPlan;
let mut current_consistency: Consistency = context
.consistency_set_on_statement
.unwrap_or(execution_profile.consistency);
Expand All @@ -2030,7 +2031,7 @@ where
error = %e,
"Choosing connection failed"
);
last_error = Some(e.into());
last_error = e.into();
// Broken connection doesn't count as a failed request, don't log in metrics
continue 'nodes_in_plan;
}
Expand Down Expand Up @@ -2063,7 +2064,7 @@ where
elapsed,
node,
);
return Some(Ok(RunRequestResult::Completed(response)));
return Ok(RunRequestResult::Completed(response));
}
Err(e) => {
trace!(
Expand Down Expand Up @@ -2097,12 +2098,9 @@ where
retry_decision = ?retry_decision
);

last_error = Some(request_error.into_query_error());
context.log_attempt_error(
&attempt_id,
last_error.as_ref().unwrap(),
&retry_decision,
);
context.log_attempt_error(&attempt_id, &request_error, &retry_decision);

last_error = request_error.into();

match retry_decision {
RetryDecision::RetrySameNode(new_cl) => {
Expand All @@ -2118,13 +2116,13 @@ where
RetryDecision::DontRetry => break 'nodes_in_plan,

RetryDecision::IgnoreWriteError => {
return Some(Ok(RunRequestResult::IgnoredWriteError))
return Ok(RunRequestResult::IgnoredWriteError)
}
};
}
}

last_error.map(Result::Err)
Err(last_error)
}

async fn await_schema_agreement_indefinitely(&self) -> Result<Uuid, QueryError> {
Expand All @@ -2142,8 +2140,8 @@ where
self.await_schema_agreement_indefinitely(),
)
.await
.unwrap_or(Err(QueryError::RequestTimeout(
"schema agreement not reached in time".to_owned(),
.unwrap_or(Err(QueryError::SchemaAgreementTimeout(
self.schema_agreement_timeout,
)))
}

Expand Down Expand Up @@ -2192,15 +2190,15 @@ struct ExecuteRequestContext<'a> {

struct HistoryData<'a> {
listener: &'a dyn HistoryListener,
query_id: history::QueryId,
request_id: history::RequestId,
speculative_id: Option<history::SpeculativeId>,
}

impl ExecuteRequestContext<'_> {
fn log_attempt_start(&self, node_addr: SocketAddr) -> Option<history::AttemptId> {
self.history_data.as_ref().map(|hd| {
hd.listener
.log_attempt_start(hd.query_id, hd.speculative_id, node_addr)
.log_attempt_start(hd.request_id, hd.speculative_id, node_addr)
})
}

Expand All @@ -2221,7 +2219,7 @@ impl ExecuteRequestContext<'_> {
fn log_attempt_error(
&self,
attempt_id_opt: &Option<history::AttemptId>,
error: &QueryError,
error: &RequestAttemptError,
retry_decision: &RetryDecision,
) {
let attempt_id: &history::AttemptId = match attempt_id_opt {
Expand Down
27 changes: 21 additions & 6 deletions scylla/src/cluster/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//! - [CollectionType],
//!

use crate::client::pager::QueryPager;
use crate::client::pager::{NextPageError, NextRowError, QueryPager};
use crate::cluster::node::resolve_contact_points;
use crate::deserialize::DeserializeOwnedRow;
use crate::errors::{DbError, NewSessionError, QueryError, RequestAttemptError};
Expand Down Expand Up @@ -51,7 +51,7 @@ use uuid::Uuid;
use crate::cluster::node::{InternalKnownNode, NodeAddr, ResolvedContactPoint};
use crate::errors::{
KeyspaceStrategyError, KeyspacesMetadataError, MetadataError, PeersMetadataError,
ProtocolError, TablesMetadataError, UdtMetadataError, ViewsMetadataError,
ProtocolError, RequestError, TablesMetadataError, UdtMetadataError, ViewsMetadataError,
};

/// Allows to read current metadata from the cluster
Expand Down Expand Up @@ -836,6 +836,7 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
Ok::<_, QueryError>(rows_stream)
})
.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten()
.and_then(|row_result| future::ok((NodeInfoSource::Peer, row_result)));

Expand All @@ -853,6 +854,7 @@ async fn query_peers(conn: &Arc<Connection>, connect_port: u16) -> Result<Vec<Pe
Ok::<_, QueryError>(rows_stream)
})
.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten()
.and_then(|row_result| future::ok((NodeInfoSource::Local, row_result)));

Expand Down Expand Up @@ -971,7 +973,7 @@ where
let mut query = Query::new(query_str);
query.set_page_size(METADATA_QUERY_PAGE_SIZE);

conn.query_iter(query).await
conn.query_iter(query).await.map_err(QueryError::from)
} else {
let keyspaces = &[keyspaces_to_fetch] as &[&[String]];
let query_str = format!("{query_str} where keyspace_name in ?");
Expand All @@ -984,7 +986,9 @@ where
.await
.map_err(RequestAttemptError::into_query_error)?;
let serialized_values = prepared.serialize_values(&keyspaces)?;
conn.execute_iter(prepared, serialized_values).await
conn.execute_iter(prepared, serialized_values)
.await
.map_err(QueryError::from)
}
}

Expand All @@ -994,7 +998,9 @@ where
pager.rows_stream::<R>().map_err(convert_typecheck_error)?;
Ok::<_, QueryError>(stream)
};
fut.into_stream().try_flatten()
fut.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten()
}

async fn query_keyspaces(
Expand Down Expand Up @@ -1748,6 +1754,7 @@ async fn query_table_partitioners(
Ok::<_, QueryError>(stream)
})
.into_stream()
.map(|result| result.map(|stream| stream.map_err(QueryError::from)))
.try_flatten();

let result = rows
Expand All @@ -1763,7 +1770,15 @@ async fn query_table_partitioners(
// that we are only interested in the ones resulting from non-existent table
// system_schema.scylla_tables.
// For more information please refer to https://github.com/scylladb/scylla-rust-driver/pull/349#discussion_r762050262
Err(QueryError::DbError(DbError::Invalid, _)) => Ok(HashMap::new()),
// FIXME 2: The specific error we expect here should appear in QueryError::NextRowError. Currently
// leaving match against both variants. This will be fixed, once `MetadataError` is further adjusted
// in a follow-up PR. The goal is to return MetadataError from all functions related to metadata fetch.
Err(QueryError::DbError(DbError::Invalid, _))
| Err(QueryError::NextRowError(NextRowError::NextPageError(
NextPageError::RequestFailure(RequestError::LastAttemptError(
RequestAttemptError::DbError(DbError::Invalid, _),
)),
))) => Ok(HashMap::new()),
result => result,
}
}
Expand Down
Loading
Loading