Skip to content

wip: guarantee subscribe #317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/event_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ jobs:
- uses: actions/checkout@v3
- uses: WalletConnect/actions/github/paths-filter/@2.2.1
id: filter
with:
path-app: . # run CI when tests are changed
outputs:
infra: ${{ steps.filter.outputs.infra }}
app: ${{ steps.filter.outputs.app }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sub-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@ jobs:
uses: WalletConnect/actions-rs/[email protected]
with:
command: test
args: --test integration
args: --test integration -- --test-threads=1
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test-all:

test-integration:
@echo '==> Testing integration'
RUST_BACKTRACE=1 ANSI_LOGS=true cargo test --test integration -- {{test}}
RUST_BACKTRACE=1 ANSI_LOGS=true cargo test --test integration -- {{test}} --test-threads=1

# Clean build artifacts
clean:
Expand Down
52 changes: 52 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct Metrics {
relay_outgoing_message_failures: Counter<u64>,
relay_outgoing_message_latency: Histogram<u64>,
relay_outgoing_message_publish_latency: Histogram<u64>,
relay_subscribes: Counter<u64>,
relay_subscribe_failures: Counter<u64>,
relay_subscribe_latency: Histogram<u64>,
relay_subscribe_request_latency: Histogram<u64>,
postgres_queries: Counter<u64>,
postgres_query_latency: Histogram<u64>,
keys_server_requests: Counter<u64>,
Expand Down Expand Up @@ -112,6 +116,26 @@ impl Metrics {
.with_description("The latency publishing relay messages")
.init();

let relay_subscribes: Counter<u64> = meter
.u64_counter("relay_subscribes")
.with_description("The number of subscribes to relay topics (not including retries)")
.init();

let relay_subscribe_failures: Counter<u64> = meter
.u64_counter("relay_subscribe_failures")
.with_description("The number of failures to subscribe to relay topics")
.init();

let relay_subscribe_latency: Histogram<u64> = meter
.u64_histogram("relay_subscribe_latency")
.with_description("The latency subscribing to relay topics w/ built-in retry")
.init();

let relay_subscribe_request_latency: Histogram<u64> = meter
.u64_histogram("relay_subscribe_request_latency")
.with_description("The latency subscribing to relay topics")
.init();

let postgres_queries: Counter<u64> = meter
.u64_counter("postgres_queries")
.with_description("The number of Postgres queries executed")
Expand Down Expand Up @@ -194,6 +218,10 @@ impl Metrics {
relay_outgoing_message_failures,
relay_outgoing_message_latency,
relay_outgoing_message_publish_latency,
relay_subscribes,
relay_subscribe_failures,
relay_subscribe_latency,
relay_subscribe_request_latency,
postgres_queries,
postgres_query_latency,
keys_server_requests,
Expand Down Expand Up @@ -286,6 +314,30 @@ impl Metrics {
);
}

pub fn relay_subscribe(&self, success: bool, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
let attributes = [KeyValue::new("success", success.to_string())];
self.relay_subscribes.add(&ctx, 1, &attributes);
self.relay_subscribe_latency
.record(&ctx, elapsed.as_millis() as u64, &attributes);
}

pub fn relay_subscribe_failure(&self, is_permenant: bool) {
let ctx = Context::current();
let attributes = [KeyValue::new("is_permenant", is_permenant.to_string())];
self.relay_subscribe_failures.add(&ctx, 1, &attributes);
}

pub fn relay_subscribe_request(&self, start: Instant) {
let elapsed = start.elapsed();

let ctx = Context::current();
self.relay_subscribe_request_latency
.record(&ctx, elapsed.as_millis() as u64, &[]);
}

pub fn postgres_query(&self, query_name: &'static str, start: Instant) {
let elapsed = start.elapsed();

Expand Down
22 changes: 9 additions & 13 deletions src/model/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use {
ed25519_dalek::SigningKey,
relay_rpc::domain::{ProjectId, Topic},
serde::{Deserialize, Serialize},
sqlx::{FromRow, PgPool, Postgres},
sqlx::{FromRow, PgExecutor, PgPool, Postgres, Transaction},
std::{collections::HashSet, time::Instant},
tracing::instrument,
uuid::Uuid,
Expand All @@ -28,13 +28,13 @@ pub struct ProjectWithPublicKeys {
pub topic: String,
}

pub async fn upsert_project(
pub async fn upsert_project<'e>(
project_id: ProjectId,
app_domain: &str,
topic: Topic,
authentication_key: &SigningKey,
subscribe_key: &StaticSecret,
postgres: &PgPool,
postgres: impl sqlx::PgExecutor<'e>,
metrics: Option<&Metrics>,
) -> Result<ProjectWithPublicKeys, sqlx::error::Error> {
let authentication_public_key = encode_authentication_public_key(authentication_key);
Expand All @@ -58,15 +58,15 @@ pub async fn upsert_project(
// TODO test idempotency
#[allow(clippy::too_many_arguments)]
#[instrument(skip(authentication_private_key, subscribe_private_key, postgres, metrics))]
async fn upsert_project_impl(
async fn upsert_project_impl<'e>(
project_id: ProjectId,
app_domain: &str,
topic: Topic,
authentication_public_key: String,
authentication_private_key: String,
subscribe_public_key: String,
subscribe_private_key: String,
postgres: &PgPool,
postgres: impl sqlx::PgExecutor<'e>,
metrics: Option<&Metrics>,
) -> Result<ProjectWithPublicKeys, sqlx::error::Error> {
let query = "
Expand Down Expand Up @@ -324,18 +324,16 @@ pub struct SubscribeResponse {
}

// TODO test idempotency
#[instrument(skip(postgres, metrics))]
#[instrument(skip(txn, metrics))]
pub async fn upsert_subscriber(
project: Uuid,
account: AccountId,
scope: HashSet<Uuid>,
notify_key: &[u8; 32],
notify_topic: Topic,
postgres: &PgPool,
txn: &mut Transaction<'_, Postgres>,
metrics: Option<&Metrics>,
) -> Result<SubscribeResponse, sqlx::error::Error> {
let mut txn = postgres.begin().await?;

// `xmax = 0`: https://stackoverflow.com/a/39204667

let query = "
Expand Down Expand Up @@ -369,9 +367,7 @@ pub async fn upsert_subscriber(
metrics.postgres_query("upsert_subscriber", start);
}

update_subscriber_scope(subscriber.id, scope, &mut txn, metrics).await?;

txn.commit().await?;
update_subscriber_scope(subscriber.id, scope, txn, metrics).await?;

Ok(subscriber)
}
Expand Down Expand Up @@ -415,7 +411,7 @@ pub async fn update_subscriber(
async fn update_subscriber_scope(
subscriber: Uuid,
scope: HashSet<Uuid>,
txn: &mut sqlx::Transaction<'_, Postgres>,
txn: &mut Transaction<'_, Postgres>,
metrics: Option<&Metrics>,
) -> Result<(), sqlx::error::Error> {
let query = "
Expand Down
57 changes: 56 additions & 1 deletion src/publish_relay_message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use {
crate::metrics::Metrics,
relay_client::{error::Error, http::Client},
relay_rpc::rpc::{msg_id::MsgId, Publish},
relay_rpc::{
domain::Topic,
rpc::{msg_id::MsgId, Publish},
},
std::time::{Duration, Instant},
tokio::time::sleep,
tracing::{error, instrument, warn},
Expand Down Expand Up @@ -79,3 +82,55 @@ pub async fn publish_relay_message(
}
Ok(())
}

#[instrument(skip_all)]
pub async fn subscribe_relay_topic(
relay_ws_client: &relay_client::websocket::Client,
topic: &Topic,
metrics: Option<&Metrics>,
) -> Result<(), Error> {
let start = Instant::now();

let client_publish_call = || async {
let start = Instant::now();
let result = relay_ws_client.subscribe_blocking(topic.clone()).await;
if let Some(metrics) = metrics {
metrics.relay_subscribe_request(start);
}
result
};

let mut tries = 0;
while let Err(e) = client_publish_call().await {
tries += 1;
let is_permenant = tries >= 10;
if let Some(metrics) = metrics {
metrics.relay_subscribe_failure(is_permenant);
}

if is_permenant {
error!("Permenant error subscribing to topic {topic}, took {tries} tries: {e:?}");

if let Some(metrics) = metrics {
// TODO make DRY with end-of-function call
metrics.relay_subscribe(false, start);
}
return Err(e);
}

let retry_in = Duration::from_secs(1);
warn!(
"Temporary error subscribing to topic {topic}, retrying attempt {tries} in {retry_in:?}: {e:?}"
);
sleep(retry_in).await;
}

if let Some(metrics) = metrics {
metrics.relay_subscribe(true, start);
}

// Sleep to account for some replication lag. Without this, the subscription may not be active on all nodes
sleep(Duration::from_millis(250)).await;

Ok(())
}
33 changes: 29 additions & 4 deletions src/services/public_http_server/handlers/subscribe_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use {
crate::{
error::NotifyServerError,
model::helpers::upsert_project,
publish_relay_message::{publish_relay_message, subscribe_relay_topic},
rate_limit::{self, Clock, RateLimitError},
registry::{extractor::AuthedProjectId, storage::redis::Redis},
spec::{NOTIFY_NOOP_TAG, NOTIFY_NOOP_TTL},
state::AppState,
utils::topic_from_key,
},
Expand All @@ -17,10 +19,10 @@ use {
hyper::StatusCode,
once_cell::sync::Lazy,
regex::Regex,
relay_rpc::domain::ProjectId,
relay_rpc::{domain::ProjectId, rpc::Publish},
serde::{Deserialize, Serialize},
serde_json::json,
std::sync::Arc,
std::sync::{Arc, OnceLock},
tracing::{info, instrument},
x25519_dalek::{PublicKey, StaticSecret},
};
Expand Down Expand Up @@ -79,13 +81,14 @@ pub async fn handler(

let authentication_key = ed25519_dalek::SigningKey::generate(&mut OsRng);

let mut txn = state.postgres.begin().await?;
let project = upsert_project(
project_id,
&app_domain,
topic.clone(),
&authentication_key,
&subscribe_key,
&state.postgres,
&mut *txn,
state.metrics.as_ref(),
)
.await
Expand All @@ -101,9 +104,31 @@ pub async fn handler(
// Don't call subscribe if we are already subscribed in a previous request
if project.topic == topic.as_ref() {
info!("Subscribing to project topic: {topic}");
state.relay_ws_client.subscribe(topic).await?;
subscribe_relay_topic(&state.relay_ws_client, &topic, state.metrics.as_ref()).await?;

// Send noop to extend ttl of relay's mapping
info!("Timing: Publishing noop to notify_topic");
publish_relay_message(
&state.relay_http_client,
&Publish {
topic,
message: {
// Extremely minor performance optimization with OnceLock to avoid allocating the same empty string everytime
static LOCK: OnceLock<Arc<str>> = OnceLock::new();
LOCK.get_or_init(|| "".into()).clone()
},
tag: NOTIFY_NOOP_TAG,
ttl_secs: NOTIFY_NOOP_TTL.as_secs() as u32,
prompt: false,
},
state.metrics.as_ref(),
)
.await?;
info!("Timing: Finished publishing noop to notify_topic");
}

txn.commit().await?;

Ok(Json(SubscribeTopicResponseBody {
authentication_key: project.authentication_public_key,
subscribe_key: project.subscribe_public_key,
Expand Down
Loading