Skip to content

Commit

Permalink
Handle more gRPC error conditions (#102)
Browse files Browse the repository at this point in the history
* Handle more gRPC error conditions

This commit improves the gRPC error when consuming streams from a gRPC endpoint. Unfortunately, Tonic doesn't allow us to test for a connection being reset specifically; it just returns an error. I think this is a feature that could be improved with Tonic. If we could test for connections being reset then we could offer more logging for

* Provides a convenience for marshalling declarations (#103)
  • Loading branch information
huntc authored Nov 6, 2023
1 parent d38fce3 commit b0b7184
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 43 deletions.
1 change: 1 addition & 0 deletions akka-persistence-rs-commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async-stream = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
ciborium = { workspace = true, optional = true }
itoa = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
smol_str = { workspace = true }
Expand Down
115 changes: 114 additions & 1 deletion akka-persistence-rs-commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ where
entity_id: EntityId,
seq_nr: u64,
timestamp: DateTime<Utc>,
event: E,
event: &E,
) -> Option<ProducerRecord>;
}

Expand Down Expand Up @@ -417,6 +417,119 @@ where
}
}

#[cfg(feature = "cbor")]
pub mod cbor {
use super::*;

pub struct Marshaller<E, F, SS> {
pub entity_type: EntityType,
pub events_key_secret_path: Arc<str>,
pub to_record_type: F,
pub secret_store: SS,
phantom: PhantomData<E>,
}

// Our event keys will occupy the top 12 bits of the key, meaning
// that we can have 4K types of event. We use 32 of the bottom 52
// bits as the entity id. We choose 32 bits as this is a common size
// for identifiers transmitted from IoT sensors. These identifiers
// are also known as "device addresses" and represent an address
// which may, in turn, equate to a 64 bit address globally unique
// to a device. These globally unique addresses are not generally
// transmitted in order to conserve packet size.
const EVENT_TYPE_BIT_SHIFT: usize = 52;
const EVENT_ID_BIT_MASK: u64 = 0xFFFFFFFF;

#[async_trait]
impl<E, F, SS> CommitLogMarshaller<E> for Marshaller<E, F, SS>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> Option<u32> + Sync,
SS: SecretStore,
{
fn entity_type(&self) -> EntityType {
self.entity_type.clone()
}

fn to_compaction_key(&self, entity_id: &EntityId, event: &E) -> Option<Key> {
let record_type = (self.to_record_type)(event);
record_type.and_then(|record_type| {
let entity_id = entity_id.parse::<u32>().ok()?;
Some((record_type as u64) << EVENT_TYPE_BIT_SHIFT | entity_id as u64)
})
}

fn to_entity_id(&self, record: &ConsumerRecord) -> Option<EntityId> {
let entity_id = (record.key & EVENT_ID_BIT_MASK) as u32;
let mut buffer = itoa::Buffer::new();
Some(EntityId::from(buffer.format(entity_id)))
}

async fn envelope(
&self,
entity_id: EntityId,
record: ConsumerRecord,
) -> Option<EventEnvelope<E>> {
self.decrypted_envelope(entity_id, record).await
}

async fn producer_record(
&self,
topic: Topic,
entity_id: EntityId,
seq_nr: u64,
timestamp: DateTime<Utc>,
event: &E,
) -> Option<ProducerRecord> {
self.encrypted_producer_record(topic, entity_id, seq_nr, timestamp, event)
.await
}
}

#[async_trait]
impl<E, F, SS> EncryptedCommitLogMarshaller<E> for Marshaller<E, F, SS>
where
for<'async_trait> E: DeserializeOwned + Serialize + Send + Sync + 'async_trait,
F: Fn(&E) -> Option<u32> + Sync,
SS: SecretStore,
{
type SecretStore = SS;

fn secret_store(&self) -> &Self::SecretStore {
&self.secret_store
}

fn secret_path(&self, _entity_id: &EntityId) -> Arc<str> {
self.events_key_secret_path.clone()
}
}

/// Provides a marshaller that conveniently uses CBOR for serialization and
/// a supplied secret store for encryption. Entity identifiers are also
/// required to be numeric. These characteristics are reasonable when using
/// the Streambed commit log at the edge.
pub fn marshaller<E, F, S, SS>(
entity_type: EntityType,
events_key_secret_path: S,
secret_store: SS,
to_record_type: F,
) -> Marshaller<E, F, SS>
where
for<'a> E: DeserializeOwned + Serialize + Send + Sync + 'a,
F: Fn(&E) -> Option<u32> + Sync,
SS: SecretStore,
S: ToString,
{
Marshaller {
entity_type,
events_key_secret_path: Arc::from(events_key_secret_path.to_string()),
to_record_type,
secret_store,
phantom: PhantomData,
}
}
}

#[cfg(test)]
mod tests {
use std::{env, fs, num::NonZeroUsize, time::Duration};
Expand Down
29 changes: 21 additions & 8 deletions akka-projection-rs-grpc/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use akka_projection_rs::SourceProvider;
use async_stream::stream;
use async_trait::async_trait;
use chrono::Timelike;
use log::debug;
use log::warn;
use prost::Message;
use prost_types::Timestamp;
Expand Down Expand Up @@ -179,17 +180,29 @@ where
if let Ok(response) = result {
let mut stream_outs = response.into_inner();
while let Some(stream_out) = stream_outs.next().await {
if let Ok(proto::StreamOut{ message: Some(proto::stream_out::Message::Event(streamed_event)) }) = stream_out {
// Marshall and abort if we can't.
match stream_out {
Ok(proto::StreamOut{ message }) => match message {
Some(proto::stream_out::Message::Event(streamed_event)) => {
// Marshall and abort if we can't.

let Ok(envelope) = streamed_event.try_into() else {
warn!("Cannot marshall envelope. Aborting stream.");
break
};
let Ok(envelope) = streamed_event.try_into() else {
warn!("Cannot marshall envelope. Aborting stream.");
break
};

// All is well, so emit the event.

yield envelope;
}

// All is well, so emit the event.
Some(proto::stream_out::Message::FilteredEvent(_)) | None => ()
}

yield envelope;
Err(e) => {
// Debug level because connection errors are normal.
debug!("Error encountered while consuming events: {e:?}. Aborting.");
break;
}
}
}
}
Expand Down
87 changes: 53 additions & 34 deletions akka-projection-rs-grpc/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,21 +277,29 @@ pub async fn run<E, EC, ECR>(

loop {
tokio::select! {
Some(Ok(proto::ConsumeEventOut {
message: Some(message),
})) = stream_outs.next() => match message {
proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter }) => {
debug!("Starting the protocol");
let _ = consumer_filters.send(
filter
.into_iter()
.flat_map(|f| f.try_into())
.collect(),
);
break;
stream_out = stream_outs.next() => match stream_out {
Some(Ok(proto::ConsumeEventOut { message })) =>
match message {
Some(proto::consume_event_out::Message::Start(proto::ConsumerEventStart { filter })) => {
debug!("Starting the protocol");
let _ = consumer_filters.send(
filter
.into_iter()
.flat_map(|f| f.try_into())
.collect(),
);
break;
}
_ => {
warn!("Received a message before starting the protocol - ignoring event");
}
},
Some(Err(e)) => {
warn!("Encountered an error while waiting to start the protocol: {e:?}");
continue 'outer;
}
_ => {
warn!("Received a message before starting the protocol - ignoring event");
None => {
continue 'outer;
}
},

Expand Down Expand Up @@ -328,37 +336,48 @@ pub async fn run<E, EC, ECR>(
}
}

Some(Ok(proto::ConsumeEventOut {
message: Some(message),
})) = stream_outs.next() => match message {
proto::consume_event_out::Message::Start(proto::ConsumerEventStart { .. }) => {
warn!("Received a protocol start when already started - ignoring");
}
proto::consume_event_out::Message::Ack(proto::ConsumerEventAck { persistence_id, seq_nr }) => {
if let Ok(persistence_id) = persistence_id.parse() {
if let Some(contexts) = in_flight.get_mut(&persistence_id) {
let seq_nr = seq_nr as u64;
while let Some((envelope, reply_to)) = contexts.pop_front() {
if seq_nr == envelope.seq_nr && reply_to.send(()).is_ok() {
break;
stream_out = stream_outs.next() => match stream_out {
Some(Ok(proto::ConsumeEventOut { message })) => match message {
Some(proto::consume_event_out::Message::Start(proto::ConsumerEventStart { .. })) => {
warn!("Received a protocol start when already started - ignoring");
}
Some(proto::consume_event_out::Message::Ack(proto::ConsumerEventAck { persistence_id, seq_nr })) => {
if let Ok(persistence_id) = persistence_id.parse() {
if let Some(contexts) = in_flight.get_mut(&persistence_id) {
let seq_nr = seq_nr as u64;
while let Some((envelope, reply_to)) = contexts.pop_front() {
if seq_nr == envelope.seq_nr && reply_to.send(()).is_ok() {
break;
}
}
if contexts.is_empty() {
in_flight.remove(&persistence_id);
}
}
if contexts.is_empty() {
in_flight.remove(&persistence_id);
}
} else {
warn!("Received an event but could not parse the persistence id - ignoring event");
}
} else {
warn!("Received an event but could not parse the persistence id - ignoring event");
}
None => {
warn!("Received an empty message while consuming replies - ignoring event");
}
}

Some(Err(e)) => {
// Debug level because connection errors are normal.
debug!("Encountered an error while consuming replies: {e:?}");
continue 'outer;
}

None => {
continue 'outer;
}
},

_ = &mut kill_switch => {
debug!("gRPC producer killed.");
break 'outer
}

else => break
}
}
}
Expand Down

0 comments on commit b0b7184

Please sign in to comment.