Skip to content

Commit

Permalink
[Tunnelbroker] update DDB schema to handle client message ID
Browse files Browse the repository at this point in the history
Summary: Issue and context why we need this: [ENG-5171](https://linear.app/comm/issue/ENG-5171/change-tunnelbroker-ddb-schema).

Test Plan:
1. Send some messages and check if they're properly saved in DB:
{F810225}
2. Run `test_messages_order` to make sure:
- messages can be properly retrieved and deleted
- order is correct

Reviewers: varun, michal, bartek, jon

Reviewed By: michal, bartek

Subscribers: ashoat, tomek, wyilio

Differential Revision: https://phab.comm.dev/D9459
  • Loading branch information
xsanm committed Oct 16, 2023
1 parent 8e93fd2 commit e18a7c4
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 34 deletions.
67 changes: 65 additions & 2 deletions services/commtest/tests/tunnelbroker_integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
mod proto {
tonic::include_proto!("tunnelbroker");
}

use commtest::identity::device::create_device;
use commtest::identity::olm_account_infos::{
MOCK_CLIENT_KEYS_1, MOCK_CLIENT_KEYS_2,
};
use commtest::tunnelbroker::socket::create_socket;
use futures_util::StreamExt;
use futures_util::{SinkExt, StreamExt};
use proto::tunnelbroker_service_client::TunnelbrokerServiceClient;
use proto::MessageToDevice;
use tunnelbroker_messages::RefreshKeyRequest;
use std::time::Duration;
use tokio::time::sleep;
use tokio_tungstenite::tungstenite::Message;

use tunnelbroker_messages::{
MessageToDevice as WebsocketMessageToDevice, RefreshKeyRequest,
};

#[tokio::test]
async fn send_refresh_request() {
Expand Down Expand Up @@ -46,3 +56,56 @@ async fn send_refresh_request() {
serde_json::from_str(&response.to_text().unwrap()).unwrap();
assert_eq!(serialized_response, refresh_request);
}

#[tokio::test]
async fn test_messages_order() {
let sender = create_device(Some(&MOCK_CLIENT_KEYS_1)).await;
let receiver = create_device(Some(&MOCK_CLIENT_KEYS_2)).await;

let messages = vec![
WebsocketMessageToDevice {
device_id: receiver.device_id.clone(),
payload: "first message".to_string(),
},
WebsocketMessageToDevice {
device_id: receiver.device_id.clone(),
payload: "second message".to_string(),
},
WebsocketMessageToDevice {
device_id: receiver.device_id.clone(),
payload: "third message".to_string(),
},
];

let serialized_messages: Vec<_> = messages
.iter()
.map(|message| {
serde_json::to_string(message)
.expect("Failed to serialize message to device")
})
.map(Message::text)
.collect();

let (mut sender_socket, _) = create_socket(&sender).await.split();

for msg in serialized_messages.clone() {
sender_socket
.send(msg)
.await
.expect("Failed to send the message over WebSocket");
}

// Wait a specified duration to ensure that message had time to persist
sleep(Duration::from_millis(100)).await;

let mut receiver_socket = create_socket(&receiver).await;

for msg in messages {
if let Some(Ok(response)) = receiver_socket.next().await {
let received_payload = response.to_text().unwrap();
assert_eq!(msg.payload, received_payload);
} else {
panic!("Unable to receive message");
}
}
}
6 changes: 3 additions & 3 deletions services/terraform/modules/shared/dynamodb.tf
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ resource "aws_dynamodb_table" "blob-service-blobs" {
resource "aws_dynamodb_table" "tunnelbroker-undelivered-messages" {
name = "tunnelbroker-undelivered-messages"
hash_key = "deviceID"
range_key = "createdAt"
range_key = "messageID"
billing_mode = "PAY_PER_REQUEST"

attribute {
Expand All @@ -91,8 +91,8 @@ resource "aws_dynamodb_table" "tunnelbroker-undelivered-messages" {
}

attribute {
name = "createdAt"
type = "N"
name = "messageID"
type = "S"
}
}

Expand Down
11 changes: 7 additions & 4 deletions services/tunnelbroker/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ pub mod dynamodb {
// - (primary key) = (deviceID: Partition Key, createdAt: Sort Key)
// - deviceID: The public key of a device's olm identity key
// - payload: Message to be delivered. See shared/tunnelbroker_messages.
// - createdAt: UNIX timestamp of when the item was inserted.
// Timestamp is needed to order the messages correctly to the device.
// - messageID = [createdAt]#[clientMessageID]
// - createdAd: UNIX timestamp of when the item was inserted.
// Timestamp is needed to order the messages correctly to the device.
// Timestamp format is ISO 8601 to handle lexicographical sorting.
// - clientMessageID: Message ID generated on client using UUID Version 4.
pub mod undelivered_messages {
pub const TABLE_NAME: &str = "tunnelbroker-undelivered-messages";
pub const PARTITION_KEY: &str = "deviceID";
pub const DEVICE_ID: &str = "deviceID";
pub const PAYLOAD: &str = "payload";
pub const CREATED_AT: &str = "createdAt";
pub const SORT_KEY: &str = "createdAt";
pub const MESSAGE_ID: &str = "messageID";
pub const SORT_KEY: &str = "messageID";
}
}
12 changes: 6 additions & 6 deletions services/tunnelbroker/src/database/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::collections::HashMap;
use aws_sdk_dynamodb::types::AttributeValue;

use crate::constants::dynamodb::undelivered_messages::{
CREATED_AT, DEVICE_ID, PAYLOAD,
DEVICE_ID, MESSAGE_ID, PAYLOAD,
};

#[derive(Debug)]
pub struct DeviceMessage {
pub device_id: String,
pub created_at: String,
pub message_id: String,
pub payload: String,
}

Expand All @@ -28,10 +28,10 @@ impl DeviceMessage {
.as_s()
.map_err(|_| MessageErrors::SerializationError)?
.to_string();
let created_at: String = hashmap
.get(CREATED_AT)
let message_id: String = hashmap
.get(MESSAGE_ID)
.ok_or(MessageErrors::SerializationError)?
.as_n()
.as_s()
.map_err(|_| MessageErrors::SerializationError)?
.to_string();
let payload: String = hashmap
Expand All @@ -43,7 +43,7 @@ impl DeviceMessage {

Ok(DeviceMessage {
device_id,
created_at,
message_id,
payload,
})
}
Expand Down
4 changes: 2 additions & 2 deletions services/tunnelbroker/src/database/message_id.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use chrono::{DateTime, Utc};

#[derive(Debug, derive_more::Display, derive_more::Error)]
enum ParseMessageIdError {
pub enum ParseMessageIdError {
InvalidTimestamp(chrono::ParseError),
InvalidFormat,
}
#[derive(Debug)]
struct MessageID {
pub struct MessageID {
timestamp: DateTime<Utc>,
client_message_id: String,
}
Expand Down
28 changes: 13 additions & 15 deletions services/tunnelbroker/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::operation::delete_item::{
DeleteItemError, DeleteItemOutput,
};
use aws_sdk_dynamodb::operation::put_item::{PutItemError, PutItemOutput};
use aws_sdk_dynamodb::operation::put_item::PutItemError;
use aws_sdk_dynamodb::operation::query::QueryError;
use aws_sdk_dynamodb::{types::AttributeValue, Client};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{debug, error};

use crate::constants::dynamodb::undelivered_messages::{
Expand All @@ -18,20 +17,14 @@ use crate::constants::dynamodb::undelivered_messages::{
pub mod message;
pub mod message_id;

use crate::database::message_id::MessageID;
pub use message::*;

#[derive(Clone)]
pub struct DatabaseClient {
client: Arc<Client>,
}

pub fn unix_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("System time is misconfigured")
.as_secs()
}

pub fn handle_ddb_error<E>(db_error: SdkError<E>) -> tonic::Status {
match db_error {
SdkError::TimeoutError(_) | SdkError::ServiceError(_) => {
Expand All @@ -57,22 +50,27 @@ impl DatabaseClient {
&self,
device_id: &str,
payload: &str,
) -> Result<PutItemOutput, SdkError<PutItemError>> {
client_message_id: &str,
) -> Result<String, SdkError<PutItemError>> {
let message_id: String =
MessageID::new(client_message_id.to_string()).into();

let device_av = AttributeValue::S(device_id.to_string());
let payload_av = AttributeValue::S(payload.to_string());
let created_av = AttributeValue::N(unix_timestamp().to_string());
let message_id_av = AttributeValue::S(message_id.clone());

let request = self
.client
.put_item()
.table_name(TABLE_NAME)
.item(PARTITION_KEY, device_av)
.item(SORT_KEY, created_av)
.item(SORT_KEY, message_id_av)
.item(PAYLOAD, payload_av);

debug!("Persisting message to device: {}", &device_id);

request.send().await
request.send().await?;
Ok(message_id)
}

pub async fn retrieve_messages(
Expand Down Expand Up @@ -104,7 +102,7 @@ impl DatabaseClient {
pub async fn delete_message(
&self,
device_id: &str,
created_at: &str,
message_id: &str,
) -> Result<DeleteItemOutput, SdkError<DeleteItemError>> {
debug!("Deleting message for device: {}", device_id);

Expand All @@ -115,7 +113,7 @@ impl DatabaseClient {
),
(
SORT_KEY.to_string(),
AttributeValue::N(created_at.to_string()),
AttributeValue::S(message_id.to_string()),
),
]);

Expand Down
2 changes: 1 addition & 1 deletion services/tunnelbroker/src/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl TunnelbrokerService for TunnelbrokerGRPC {

self
.client
.persist_message(&message.device_id, &message.payload)
.persist_message(&message.device_id, &message.payload, "message_id")
.await
.map_err(handle_ddb_error)?;

Expand Down
3 changes: 2 additions & 1 deletion services/tunnelbroker/src/websockets/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> WebsocketSession<S> {
.persist_message(
message_to_device.device_id.as_str(),
message_to_device.payload.as_str(),
"message_id",
)
.await?;

Expand Down Expand Up @@ -222,7 +223,7 @@ impl<S: AsyncRead + AsyncWrite + Unpin> WebsocketSession<S> {
self.send_message_to_device(device_message.payload).await;
if let Err(e) = self
.db_client
.delete_message(&self.device_info.device_id, &device_message.created_at)
.delete_message(&self.device_info.device_id, &device_message.message_id)
.await
{
error!("Failed to delete message: {}:", e);
Expand Down

0 comments on commit e18a7c4

Please sign in to comment.