Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup filter handling #32

Merged
merged 2 commits into from
Jan 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 71 additions & 30 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,8 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
Ok(()) => (),
Err(e) => {
console_log!("could not verify event {}: {}", event.id, e);
let relay_msg = RelayMessage::new_ok(
event.id,
false,
"invalid event",
);
let relay_msg =
RelayMessage::new_ok(event.id, false, "invalid event");
return relay_response(relay_msg);
}
}
Expand Down Expand Up @@ -221,7 +218,11 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
match event.verify() {
Ok(()) => (),
Err(e) => {
console_log!("could not verify event {}: {}", event.id, e);
console_log!(
"could not verify event {}: {}",
event.id,
e
);
let relay_msg = RelayMessage::new_ok(
event.id,
false,
Expand Down Expand Up @@ -342,36 +343,58 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
filters,
} => {
new_subscription_req.swap(true, Ordering::Relaxed);
console_log!("got a new client request sub: {}, len: {}", subscription_id, filters.len());
console_log!(
"got a new client request sub: {}, len: {}",
subscription_id,
filters.len()
);
// for each filter we handle it every 10 seconds
// by reading storage and sending any new events
// one caveat is that this will send events multiple
// times if they are in multiple filters
let mut valid = false;
for filter in filters {
let valid_nwc = {
// has correct kinds
let kinds = filter.kinds.as_ref();
(
kinds
.unwrap_or(&vec![])
.contains(&Kind::WalletConnectResponse)
|| kinds.unwrap_or(&vec![])
.contains(&Kind::WalletConnectRequest)
) &&
// has authors or pubkeys
!filter.authors.as_ref().unwrap_or(&vec![]).is_empty() ||
!filter.pubkeys.as_ref().unwrap_or(&vec![]).is_empty()
let nwc_kinds = filter
.kinds
.as_ref()
.map(|k| {
k.contains(&Kind::WalletConnectResponse)
|| k.contains(
&Kind::WalletConnectRequest,
)
})
.unwrap_or(false);

let has_authors = filter
.authors
.as_ref()
.map(|a| !a.is_empty())
.unwrap_or(false);
let has_pks = filter
.pubkeys
.as_ref()
.map(|a| !a.is_empty())
.unwrap_or(false);

nwc_kinds && (has_authors || has_pks)
};

if valid_nwc {
let mut master_guard = requested_filters.lock().await;
let mut master_guard =
requested_filters.lock().await;
let master_filter = master_guard.deref_mut();
// now add the new filters to the main filter
// object. This is a bit of a hack but we only
// check certain sub filters for NWC.
combine_filters(master_filter, &filter);
console_log!("New filter count: {}", master_filter.pubkeys.as_ref().map_or(0, Vec::len));
console_log!(
"New filter count: {}",
master_filter
.pubkeys
.as_ref()
.map_or(0, Vec::len)
);
valid = true;
}
}
Expand All @@ -389,31 +412,49 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
let sub_id = subscription_id.clone();
let server_clone = server.clone();
let master_clone = requested_filters.clone();
let new_subscription_req_clone = new_subscription_req.clone();
let new_subscription_req_clone =
new_subscription_req.clone();
wasm_bindgen_futures::spawn_local(async move {
let mut sent_events = vec![];
loop {
let master = master_clone.lock().await;
console_log!("Checking filters: {}", master.pubkeys.as_ref().map_or(0, Vec::len));
console_log!(
"Checking filters: {}",
master.pubkeys.as_ref().map_or(0, Vec::len)
);
match handle_filter(
&sent_events,
sub_id.clone(),
master.clone(),
&server_clone,
&db,
).await
)
.await
{
Ok(new_event_ids) => {
// add new events to sent events
sent_events.extend(new_event_ids);
// send EOSE if necessary
if new_subscription_req_clone.load(Ordering::Relaxed) || sent_event_count != sent_events.len() {
let relay_msg = RelayMessage::new_eose(sub_id.clone());
if new_subscription_req_clone
.load(Ordering::Relaxed)
|| sent_event_count
!= sent_events.len()
{
let relay_msg =
RelayMessage::new_eose(
sub_id.clone(),
);
server_clone
.send_with_str(relay_msg.as_json())
.expect("failed to send response");
sent_event_count = sent_events.len();
new_subscription_req_clone.swap(false, Ordering::Relaxed);
.send_with_str(
relay_msg.as_json(),
)
.expect(
"failed to send response",
);
sent_event_count =
sent_events.len();
new_subscription_req_clone
.swap(false, Ordering::Relaxed);
}
}
Err(e) => console_log!(
Expand Down
Loading