Skip to content

Commit

Permalink
Add subscribe request (#159)
Browse files Browse the repository at this point in the history
feat(ee): add subscribe request

Add the ability to construct and exec subscribe request with the following response parsing.

feat: add emit messages / status effects

feat: add event listeners

Listeners implemented in form of a subscription object which allows polling on updates.

test(contract-test): completed contract testing for subscribe

Completed set of contract tests for subscription event engine.

refactor(clippy): apply clippy suggestions

fix: fix formatting warning

---------

Co-authored-by: Xavrax <[email protected]>
  • Loading branch information
parfeon and Xavrax authored Jul 27, 2023
1 parent 3de77c6 commit d578742
Show file tree
Hide file tree
Showing 65 changed files with 5,966 additions and 1,163 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ jobs:
token: ${{ secrets.GH_TOKEN }}
features-path: tests/features
- name: Run acceptance tests
env:
RUST_LOG: debug
run: |
cargo test --features contract_test --test contract_test
- name: Expose acceptance tests reports
Expand Down
16 changes: 7 additions & 9 deletions .github/workflows/run-validations.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,25 @@ jobs:

- name: Run cargo check tool to check if the code are valid
run: |
cargo check --workspace --all-targets --features="full"
cargo check --workspace --all-targets --features="full"
- name: Run cargo check tool to check if the raw domain code are valid
run: |
cargo check --workspace --no-default-features --features="pubnub_only"
cargo check --workspace --no-default-features --features="pubnub_only"
- name: Run cargo check tool to check if the `no_std` code are valid
run: |
cargo check --workspace --all-targets --no-default-features --features="full_no_std"
cargo check --workspace --all-targets --no-default-features --features="full_no_std"
- name: Run cargo clippy tool to check if all the best code practices are followed
run: |
cargo clippy --workspace --all-targets --features="full" -- -D warnings
cargo clippy --workspace --all-targets --features="full" -- -D warnings
- name: Run cargo clippy tool to check if all the best code practices are followed for raw domain code
run: |
cargo clippy --workspace --no-default-features --features="pubnub_only" -- -D warnings
cargo clippy --workspace --no-default-features --features="pubnub_only" -- -D warnings
- name: Run cargo clippy tool to check if all the best code practices are followed for `no_std` code
run: |
cargo clippy --workspace --all-targets --no-default-features --features="full_no_std" -- -D warnings
cargo clippy --workspace --all-targets --no-default-features --features="full_no_std" -- -D warnings
- name: Run cargo fmt tool to check if code are well formatted
run: |
cargo fmt --check --verbose --all
cargo fmt --check --verbose --all
cargo-deny:
name: Check Cargo crate dependencies
Expand Down
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ target
Cargo.lock
tests/features
tests/reports
tests/logs

# GitHub Actions #
##################
.github/.release

# IDE
.idea

# OS
.DS_Store
35 changes: 24 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ build = "build.rs"
[features]

# Enables all non-conflicting features
full = ["publish", "access", "serde", "reqwest", "aescbc", "parse_token", "blocking", "std"]
full = ["publish", "access", "serde", "reqwest", "aescbc", "parse_token", "blocking", "std", "tokio"]

# Enables all default features
default = ["publish", "serde", "reqwest", "aescbc", "std", "blocking"]
default = ["publish", "subscribe", "serde", "reqwest", "aescbc", "std", "blocking", "tokio"]

# [PubNub features]

Expand All @@ -41,6 +41,9 @@ serde = ["dep:serde", "dep:serde_json", "hashbrown/serde"]
## Enables reqwest implementation for transport layer
reqwest = ["dep:reqwest", "dep:bytes"]

## Enables tokio runtime for subscribe loop
tokio = ["dep:tokio"]

## Enables blocking implementation for transport layer
blocking = ["reqwest?/blocking"]

Expand All @@ -61,23 +64,25 @@ extra_platforms = ["spin/portable_atomic", "dep:portable-atomic"]

# [Internal features] (not intended for use outside of the library)
contract_test = ["parse_token", "publish", "access"]
full_no_std = ["serde", "reqwest", "aescbc", "parse_token", "blocking", "publish", "access", "subscribe"]
full_no_std = ["serde", "reqwest", "aescbc", "parse_token", "blocking", "publish", "access", "subscribe", "tokio"]
full_no_std_platform_independent = ["serde", "aescbc", "parse_token", "blocking", "publish", "access", "subscribe"]
pubnub_only = ["aescbc", "parse_token", "blocking", "publish", "access", "subscribe"]
mock_getrandom = ["getrandom/custom"]
event_engine = []
# TODO: temporary treated as internal until we officially release it
subscribe = ["event_engine"]
futures = ["dep:futures"]
futures_tokio = ["dep:tokio"]
subscribe = ["event_engine", "futures", "futures_tokio", "dep:async-channel"]

[dependencies]
async-trait = "0.1"
log = "0.4"
hashbrown = "0.13"
hashbrown = "0.14.0"
spin = "0.9"
phantom-type = { version = "0.4.2", default-features = false }
percent-encoding = { version = "2.1", default-features = false }
base64 = { version = "0.21", features = ["alloc"], default-features = false }
derive_builder = {version = "0.12", default-features = false }
derive_builder = { version = "0.12", default-features = false }
uuid = { version = "1.3", features = ["v4"], default-features = false }
snafu = { version = "0.7", features = ["rust_1_46"], default-features = false }
rand = { version = "0.8.5", default-features = false }
Expand All @@ -93,7 +98,7 @@ serde_json = { version = "1.0", optional = true, features = ["alloc"] ,default-f

# reqwest
reqwest = { version = "0.11", optional = true }
bytes = {version = "1.4", default-features = false, optional = true }
bytes = { version = "1.4", default-features = false, optional = true }

# crypto
aes = { version = "0.8.2", optional = true }
Expand All @@ -103,6 +108,11 @@ getrandom = { version = "0.2", optional = true }
# parse_token
ciborium = { version = "0.2.1", default-features = false, optional = true }

# subscribe
tokio = { version = "1", optional = true, features = ["rt-multi-thread", "macros", "time"] }
futures = { version = "0.3.28", optional = true }
async-channel = { version = "1.8", optional = true }

# extra_platforms
portable-atomic = { version = "1.3", optional = true, default-features = false, features = ["require-cas", "critical-section"] }

Expand All @@ -111,14 +121,13 @@ getrandom = { version = "0.2", features = ["js"] }

[dev-dependencies]
async-trait = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "time"] }
wiremock = "0.5"
env_logger = "0.10"
cucumber = { version = "0.19", features = ["output-junit"] }
futures = "0.3"
cucumber = { version = "0.20.0", features = ["output-junit"] }
reqwest = { version = "0.11", features = ["json"] }
test-case = "3.0"
hashbrown = { version = "0.13", features = ["serde"] }
hashbrown = { version = "0.14.0", features = ["serde"] }
getrandom = { version = "0.2", features = ["custom"] }

[build-dependencies]
Expand Down Expand Up @@ -152,3 +161,7 @@ required-features = ["default", "blocking", "access"]
name = "custom_origin"
required-features = ["default"]

[[example]]
name = "subscribe"
required-features = ["default", "subscribe"]

82 changes: 82 additions & 0 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use futures::StreamExt;
use pubnub::dx::subscribe::{SubscribeStreamEvent, Update};
use pubnub::{Keyset, PubNubClientBuilder};
use serde::Deserialize;
use std::env;

#[derive(Debug, Deserialize)]
struct Message {
// Allowing dead code because we don't use these fields
// in this example.
#[allow(dead_code)]
url: String,
#[allow(dead_code)]
description: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn snafu::Error>> {
let publish_key = env::var("SDK_PUB_KEY")?;
let subscribe_key = env::var("SDK_SUB_KEY")?;

let client = PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
subscribe_key,
publish_key: Some(publish_key),
secret_key: None,
})
.with_user_id("user_id")
.build()?;

println!("running!");

let subscription = client
.subscribe()
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.heartbeat(10)
.filter_expression("some_filter")
.execute()?;

tokio::spawn(subscription.stream().for_each(|event| async move {
match event {
SubscribeStreamEvent::Update(update) => {
println!("\nupdate: {:?}", update);
match update {
Update::Message(message) | Update::Signal(message) => {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("defined message: {:?}", message),
Err(_) => {
println!("other message: {:?}", String::from_utf8(message.data))
}
}
}
Update::Presence(presence) => {
println!("presence: {:?}", presence)
}
Update::Object(object) => {
println!("object: {:?}", object)
}
Update::MessageAction(action) => {
println!("message action: {:?}", action)
}
Update::File(file) => {
println!("file: {:?}", file)
}
}
}
SubscribeStreamEvent::Status(status) => println!("\nstatus: {:?}", status),
}
}));

// Sleep for a minute. Now you can send messages to the channels
// "my_channel" and "other_channel" and see them printed in the console.
// You can use the publish example or [PubNub console](https://www.pubnub.com/docs/console/)
// to send messages.
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;

// You can also cancel the subscription at any time.
subscription.unsubscribe().await;

Ok(())
}
25 changes: 7 additions & 18 deletions src/core/cryptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! encryption and decryption of published data.

use crate::core::error::PubNubError;
use crate::lib::alloc::vec::Vec;
use crate::lib::{alloc::vec::Vec, core::fmt::Debug};

/// This trait is used to encrypt and decrypt messages sent to the
/// [`PubNub API`].
Expand All @@ -24,47 +24,36 @@ use crate::lib::alloc::vec::Vec;
/// ```
/// use pubnub::core::{Cryptor, error::PubNubError};
///
/// #[derive(Debug)]
/// struct MyCryptor;
///
/// impl Cryptor for MyCryptor {
/// fn encrypt<'en, T>(&self, source: T) -> Result<Vec<u8>, PubNubError>
/// where
/// T: Into<&'en [u8]>
/// {
/// fn encrypt(&self, source: Vec<u8>) -> Result<Vec<u8>, PubNubError> {
/// // Encrypt provided data here
///
/// Ok(vec![])
/// }
///
/// fn decrypt<'de, T>(&self, source: T) -> Result<Vec<u8>, PubNubError>
/// where
/// T: Into<&'de [u8]>
/// {
/// fn decrypt(&self, source: Vec<u8>) -> Result<Vec<u8>, PubNubError> {
/// // Decrypt provided data here
///
/// Ok(vec![])
/// }
/// }
/// ```
///
/// [`dx`]: ../dx/index.html
/// [`PubNub API`]: https://www.pubnub.com/docs
pub trait Cryptor {
pub trait Cryptor: Debug + Send + Sync {
/// Decrypt provided data.
///
/// # Errors
/// Should return an [`PubNubError::Encryption`] if provided data can't
/// be encrypted or underlying cryptor misconfigured.
fn encrypt<'en, T>(&self, source: T) -> Result<Vec<u8>, PubNubError>
where
T: Into<&'en [u8]>;
fn encrypt(&self, source: Vec<u8>) -> Result<Vec<u8>, PubNubError>;

/// Decrypt provided data.
///
/// # Errors
/// Should return an [`PubNubError::Decryption`] if provided data can't
/// be decrypted or underlying cryptor misconfigured.
fn decrypt<'de, T>(&self, source: T) -> Result<Vec<u8>, PubNubError>
where
T: Into<&'de [u8]>;
fn decrypt(&self, source: Vec<u8>) -> Result<Vec<u8>, PubNubError>;
}
26 changes: 26 additions & 0 deletions src/core/deserialize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//! Deserialization module
//!
//! This module provides a [`Deserialize`] trait for the Pubnub protocol.
//!
//! You can implement this trait for your own types, or use one of the provided
//! features to use a deserialization library.
//!
//! [`Deserialize`]: trait.Deserialize.html

use crate::core::PubNubError;

/// Deserialize values
///
/// This trait provides a [`deserialize`] method for the Pubnub protocol.
///
/// You can implement this trait for your own types, or use the provided
/// implementations for [`Into<Vec<u8>>`].
///
/// [`deserialize`]: #tymethod.deserialize
pub trait Deserialize<'de>: Send + Sync {
/// Type to which binary data should be mapped.
type Type;

/// Deserialize the value
fn deserialize(bytes: &'de [u8]) -> Result<Self::Type, PubNubError>;
}
12 changes: 6 additions & 6 deletions src/core/deserializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! This module contains the `Deserialize` trait which is used to implement
//! deserialization of Rust data structures.

use super::PubNubError;
use crate::core::PubNubError;

/// Trait for deserializing Rust data structures.
///
Expand All @@ -30,8 +30,8 @@ use super::PubNubError;
///
/// struct MyDeserializer;
///
/// impl<'de> Deserializer<'de, PublishResult> for MyDeserializer {
/// fn deserialize(&self, bytes: &'de [u8]) -> Result<PublishResult, PubNubError> {
/// impl Deserializer<PublishResult> for MyDeserializer {
/// fn deserialize(&self, bytes: &[u8]) -> Result<PublishResult, PubNubError> {
/// // ...
/// # unimplemented!()
/// }
Expand All @@ -42,14 +42,14 @@ use super::PubNubError;
/// [`PublishResponseBody`]: ../../dx/publish/result/enum.PublishResponseBody.html
/// [`GrantTokenResponseBody`]: ../../dx/access/result/enum.GrantTokenResponseBody.html
/// [`RevokeTokenResponseBody`]: ../../dx/access/result/enum.RevokeTokenResponseBody.html
pub trait Deserializer<'de, T> {
/// Deserialize a `&[u8]` into a `Result<T, PubNubError>`.
pub trait Deserializer<T>: Send + Sync {
/// Deserialize a `&Vec<u8>` into a `Result<T, PubNubError>`.
///
/// # Errors
///
/// This method should return [`PubNubError::DeserializationError`] if the
/// deserialization fails.
///
/// [`PubNubError::DeserializationError`]: ../enum.PubNubError.html#variant.DeserializationError
fn deserialize(&self, bytes: &'de [u8]) -> Result<T, PubNubError>;
fn deserialize(&self, bytes: &[u8]) -> Result<T, PubNubError>;
}
Loading

0 comments on commit d578742

Please sign in to comment.