Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cursor reset issue and cancelled effects spawning #184

Merged
merged 10 commits into from
Feb 8, 2024
21 changes: 20 additions & 1 deletion .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,28 @@
name: rust
version: 0.5.0
version: 0.6.0
schema: 1
scm: github.com/pubnub/rust
files: []
changelog:
- date: 2024-02-07
version: 0.6.0
changes:
- type: feature
text: "Make it possible to create `SubscriptionCursor` from the string slice."
- type: feature
text: "Add `add_subscriptions(..)` and `sub_subscriptions(..)` to `SubscriptionSet` to make it possible in addition to sets manipulation use list of subscriptions."
- type: bug
text: "Fix issue because of which `cursor` is not reset on `Subscription` and `SubscriptionSet` on unsubscribe."
- type: bug
text: "Fix issue because of which cancelled effects still asynchronously spawned for processing."
- type: improvement
text: "Change `client` to `pubnub` in inline docs."
- type: improvement
text: "Add subscription token validation."
- type: improvement
text: "Added a method to validate the provided subscription token to conform to PubNub time token requirements with precision."
- type: improvement
text: "Separate `subscribe` example into two to show separately `subscribe` feature and `presence state` maintenance with subscribe."
- date: 2024-01-25
version: 0.5.0
changes:
Expand Down
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pubnub"
version = "0.5.0"
version = "0.6.0"
edition = "2021"
license-file = "LICENSE"
authors = ["PubNub <[email protected]>"]
Expand Down Expand Up @@ -165,6 +165,10 @@ required-features = ["default"]

[[example]]
name = "subscribe"
required-features = ["default", "subscribe"]

[[example]]
name = "subscribe_with_presence_state"
required-features = ["default", "subscribe", "presence"]

[[example]]
Expand Down
56 changes: 44 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,32 @@ Add `pubnub` to your Rust project in the `Cargo.toml` file:
```toml
# default features
[dependencies]
pubnub = "0.5.0"
pubnub = "0.6.0"

# all features
[dependencies]
pubnub = { version = "0.5.0", features = ["full"] }
pubnub = { version = "0.6.0", features = ["full"] }
```

### Example

Try the following sample code to get up and running quickly!

```rust
use pubnub::{Keyset, PubNubClientBuilder};
use pubnub::dx::subscribe::{SubscribeStreamEvent, Update};
use pubnub::subscribe::Subscriber;
use futures::StreamExt;
use tokio::time::sleep;
use std::time::Duration;
use serde_json;

use pubnub::{
dx::subscribe::Update,
subscribe::EventSubscriber,
Keyset, PubNubClientBuilder,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
use pubnub::subscribe::{EventEmitter, SubscriptionParams};
let publish_key = "my_publish_key";
let publish_key = "my_publish_key";
let subscribe_key = "my_subscribe_key";
let client = PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
Expand All @@ -68,6 +71,7 @@ let publish_key = "my_publish_key";
})
.with_user_id("user_id")
.build()?;

println!("PubNub instance created");

let subscription = client.subscription(SubscriptionParams {
Expand All @@ -76,7 +80,13 @@ let publish_key = "my_publish_key";
options: None
});

println!("Subscribed to channel");
let channel_entity = client.channel("my_channel_2");
let channel_entity_subscription = channel_entity.subscription(None);

subscription.subscribe();
channel_entity_subscription.subscribe();

println!("Subscribed to channels");

// Launch a new task to print out each received message
tokio::spawn(client.status_stream().for_each(|status| async move {
Expand Down Expand Up @@ -107,7 +117,21 @@ let publish_key = "my_publish_key";
}
}));

sleep(Duration::from_secs(1)).await;
// Explicitly listen only for real-time `message` updates.
tokio::spawn(
channel_entity_subscription
.messages_stream()
.for_each(|message| async move {
if let Ok(utf8_message) = String::from_utf8(message.data.clone()) {
if let Ok(cleaned) = serde_json::from_str::<String>(&utf8_message) {
println!("message: {}", cleaned);
}
}
}),
);

sleep(Duration::from_secs(2)).await;

// Send a message to the channel
client
.publish_message("hello world!")
Expand All @@ -116,7 +140,15 @@ let publish_key = "my_publish_key";
.execute()
.await?;

sleep(Duration::from_secs(10)).await;
// Send a message to another channel
client
.publish_message("hello world on the other channel!")
.channel("my_channel_2")
.r#type("text-message")
.execute()
.await?;

sleep(Duration::from_secs(15)).await;

Ok(())
}
Expand All @@ -132,11 +164,11 @@ disable them in the `Cargo.toml` file, like so:
```toml
# only blocking and access + default features
[dependencies]
pubnub = { version = "0.5.0", features = ["blocking", "access"] }
pubnub = { version = "0.6.0", features = ["blocking", "access"] }

# only parse_token + default features
[dependencies]
pubnub = { version = "0.5.0", features = ["parse_token"] }
pubnub = { version = "0.6.0", features = ["parse_token"] }
```

### Available features
Expand Down Expand Up @@ -175,7 +207,7 @@ you need, for example:

```toml
[dependencies]
pubnub = { version = "0.5.0", default-features = false, features = ["serde", "publish",
pubnub = { version = "0.6.0", default-features = false, features = ["serde", "publish",
"blocking"] }
```

Expand Down
69 changes: 27 additions & 42 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use futures::StreamExt;
use serde::Deserialize;
use std::env;
Expand All @@ -26,7 +24,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
let publish_key = env::var("SDK_PUB_KEY")?;
let subscribe_key = env::var("SDK_SUB_KEY")?;

let client = PubNubClientBuilder::with_reqwest_transport()
let pubnub = PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
subscribe_key,
publish_key: Some(publish_key),
Expand All @@ -40,36 +38,25 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {

println!("running!");

client
.set_presence_state(HashMap::<String, String>::from([
(
"is_doing".to_string(),
"Nothing... Just hanging around...".to_string(),
),
("flag".to_string(), "false".to_string()),
]))
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.user_id("user_id")
.execute()
.await?;

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let subscription = client.subscription(SubscriptionParams {
let subscription = pubnub.subscription(SubscriptionParams {
channels: Some(&["my_channel", "other_channel"]),
channel_groups: None,
options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
});
subscription.subscribe(None);
subscription.subscribe();
let subscription_clone = subscription.clone_empty();
parfeon marked this conversation as resolved.
Show resolved Hide resolved

// Attach connection status to the PubNub client instance.
tokio::spawn(
client
pubnub
.status_stream()
.for_each(|status| async move { println!("\nstatus: {:?}", status) }),
);

// Example of the "global" listener for multiplexed subscription object from
// PubNub client.
tokio::spawn(subscription.stream().for_each(|event| async move {
match event {
Update::Message(message) | Update::Signal(message) => {
Expand All @@ -96,48 +83,44 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
}
}));

tokio::spawn(subscription_clone.stream().for_each(|event| async move {
match event {
Update::Message(message) | Update::Signal(message) => {
// Explicitly listen only for real-time `message` updates.
tokio::spawn(
subscription_clone
.messages_stream()
.for_each(|message| async move {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("(b) defined message: {:?}", message),
Err(_) => {
println!("(b) other message: {:?}", String::from_utf8(message.data))
}
}
}
Update::Presence(presence) => {
println!("(b) presence: {:?}", presence)
}
Update::AppContext(object) => {
println!("(b) object: {:?}", object)
}
Update::MessageAction(action) => {
println!("(b) message action: {:?}", action)
}
Update::File(file) => {
println!("(b) file: {:?}", file)
}
}
}));
}),
);

// Explicitly listen only for real-time `file` updates.
tokio::spawn(
subscription_clone
.files_stream()
.for_each(|file| async move { println!("(b) file: {:?}", file) }),
);

// Sleep for a minute. Now you can send messages to the channels
// "my_channel" and "other_channel" and see them printed in the console.
// You can use the publish example or [PubNub console](https://www.pubnub.com/docs/console/)
// You can use the publishing example or [PubNub console](https://www.pubnub.com/docs/console/)
// to send messages.
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;

// You can also cancel the subscription at any time.
// subscription.unsubscribe();

println!("\nDisconnect from the real-time data stream");
client.disconnect();
pubnub.disconnect();

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

println!("\nReconnect to the real-time data stream");
client.reconnect(None);
pubnub.reconnect(None);

// Let event engine process unsubscribe request
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
Expand All @@ -147,9 +130,11 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
// drop(subscription_clone);

println!(
parfeon marked this conversation as resolved.
Show resolved Hide resolved
"\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." );
"\nUnsubscribe from all data streams. To restore call `subscription.subscribe()` or \
`subscription.subscribe_with_timetoken(Some(<timetoken>)) call."
);
// Clean up before complete work with PubNub client instance.
client.unsubscribe_all();
pubnub.unsubscribe_all();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

Ok(())
Expand Down
Loading
Loading