Skip to content

Commit

Permalink
docs(inline): change client to pubnub in inline docs
Browse files Browse the repository at this point in the history
refactor(examples): refactor `subscribe` example

Separate `subscribe` example into two to show separately `subscribe` feature and `presence state`
maintenance with subscribe.
  • Loading branch information
parfeon committed Jan 31, 2024
1 parent c3b921f commit f98ed52
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 118 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ required-features = ["default"]

[[example]]
name = "subscribe"
required-features = ["default", "subscribe"]

[[example]]
name = "subscribe_with_presence_state"
required-features = ["default", "subscribe", "presence"]

[[example]]
Expand Down
63 changes: 23 additions & 40 deletions examples/subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::HashMap;

use futures::StreamExt;
use serde::Deserialize;
use std::env;
Expand All @@ -26,7 +24,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
let publish_key = env::var("SDK_PUB_KEY")?;
let subscribe_key = env::var("SDK_SUB_KEY")?;

let client = PubNubClientBuilder::with_reqwest_transport()
let pubnub = PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
subscribe_key,
publish_key: Some(publish_key),
Expand All @@ -40,22 +38,9 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {

println!("running!");

client
.set_presence_state(HashMap::<String, String>::from([
(
"is_doing".to_string(),
"Nothing... Just hanging around...".to_string(),
),
("flag".to_string(), "false".to_string()),
]))
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.user_id("user_id")
.execute()
.await?;

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let subscription = client.subscription(SubscriptionParams {
let subscription = pubnub.subscription(SubscriptionParams {
channels: Some(&["my_channel", "other_channel"]),
channel_groups: None,
options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
Expand All @@ -65,11 +50,13 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {

// Attach connection status to the PubNub client instance.
tokio::spawn(
client
pubnub
.status_stream()
.for_each(|status| async move { println!("\nstatus: {:?}", status) }),
);

// Example of the "global" listener for multiplexed subscription object from
// PubNub client.
tokio::spawn(subscription.stream().for_each(|event| async move {
match event {
Update::Message(message) | Update::Signal(message) => {
Expand All @@ -96,48 +83,44 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
}
}));

tokio::spawn(subscription_clone.stream().for_each(|event| async move {
match event {
Update::Message(message) | Update::Signal(message) => {
// Explicitly listen only for real-time `message` updates.
tokio::spawn(
subscription_clone
.messages_stream()
.for_each(|message| async move {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("(b) defined message: {:?}", message),
Err(_) => {
println!("(b) other message: {:?}", String::from_utf8(message.data))
}
}
}
Update::Presence(presence) => {
println!("(b) presence: {:?}", presence)
}
Update::AppContext(object) => {
println!("(b) object: {:?}", object)
}
Update::MessageAction(action) => {
println!("(b) message action: {:?}", action)
}
Update::File(file) => {
println!("(b) file: {:?}", file)
}
}
}));
}),
);

// Explicitly listen only for real-time `file` updates.
tokio::spawn(
subscription_clone
.files_stream()
.for_each(|file| async move { println!("(b) file: {:?}", file) }),
);

// Sleep for a minute. Now you can send messages to the channels
// "my_channel" and "other_channel" and see them printed in the console.
// You can use the publish example or [PubNub console](https://www.pubnub.com/docs/console/)
// You can use the publishing example or [PubNub console](https://www.pubnub.com/docs/console/)
// to send messages.
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;

// You can also cancel the subscription at any time.
// subscription.unsubscribe();

println!("\nDisconnect from the real-time data stream");
client.disconnect();
pubnub.disconnect();

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

println!("\nReconnect to the real-time data stream");
client.reconnect(None);
pubnub.reconnect(None);

// Let event engine process unsubscribe request
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
Expand All @@ -149,7 +132,7 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
println!(
"\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." );
// Clean up before complete work with PubNub client instance.
client.unsubscribe_all();
pubnub.unsubscribe_all();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

Ok(())
Expand Down
154 changes: 154 additions & 0 deletions examples/subscribe_with_presence_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::collections::HashMap;

use futures::StreamExt;
use serde::Deserialize;
use std::env;

use pubnub::subscribe::{SubscriptionOptions, SubscriptionParams};
use pubnub::{
dx::subscribe::Update,
subscribe::{EventEmitter, EventSubscriber},
Keyset, PubNubClientBuilder,
};

#[derive(Debug, Deserialize)]
struct Message {
// Allowing dead code because we don't use these fields
// in this example.
#[allow(dead_code)]
url: String,
#[allow(dead_code)]
description: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn snafu::Error>> {
let publish_key = env::var("SDK_PUB_KEY")?;
let subscribe_key = env::var("SDK_SUB_KEY")?;

let pubnub = PubNubClientBuilder::with_reqwest_transport()
.with_keyset(Keyset {
subscribe_key,
publish_key: Some(publish_key),
secret_key: None,
})
.with_user_id("user_id")
.with_filter_expression("some_filter")
.with_heartbeat_value(100)
.with_heartbeat_interval(5)
.build()?;

println!("running!");

// Setting up state which will be associated with the user id as long as he is
// subscribed and not timeout.
pubnub
.set_presence_state(HashMap::<String, String>::from([
(
"is_doing".to_string(),
"Nothing... Just hanging around...".to_string(),
),
("flag".to_string(), "false".to_string()),
]))
.channels(["my_channel".into(), "other_channel".into()].to_vec())
.user_id("user_id")
.execute()
.await?;

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

let subscription = pubnub.subscription(SubscriptionParams {
channels: Some(&["my_channel", "other_channel"]),
channel_groups: None,
options: Some(vec![SubscriptionOptions::ReceivePresenceEvents]),
});
subscription.subscribe(None);
let subscription_clone = subscription.clone_empty();

// Attach connection status to the PubNub client instance.
tokio::spawn(
pubnub
.status_stream()
.for_each(|status| async move { println!("\nstatus: {:?}", status) }),
);

tokio::spawn(subscription.stream().for_each(|event| async move {
match event {
Update::Message(message) | Update::Signal(message) => {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("(a) defined message: {:?}", message),
Err(_) => {
println!("(a) other message: {:?}", String::from_utf8(message.data))
}
}
}
Update::Presence(presence) => {
println!("(a) presence: {:?}", presence)
}
Update::AppContext(object) => {
println!("(a) object: {:?}", object)
}
Update::MessageAction(action) => {
println!("(a) message action: {:?}", action)
}
Update::File(file) => {
println!("(a) file: {:?}", file)
}
}
}));

// Explicitly listen only for real-time `message` updates.
tokio::spawn(
subscription_clone
.messages_stream()
.for_each(|message| async move {
// Deserialize the message payload as you wish
match serde_json::from_slice::<Message>(&message.data) {
Ok(message) => println!("(b) defined message: {:?}", message),
Err(_) => {
println!("(b) other message: {:?}", String::from_utf8(message.data))
}
}
}),
);

// Explicitly listen only for real-time `file` updates.
tokio::spawn(
subscription_clone
.files_stream()
.for_each(|file| async move { println!("(b) file: {:?}", file) }),
);

// Sleep for a minute. Now you can send messages to the channels
// "my_channel" and "other_channel" and see them printed in the console.
// You can use the publishing example or [PubNub console](https://www.pubnub.com/docs/console/)
// to send messages.
tokio::time::sleep(tokio::time::Duration::from_secs(15)).await;

// You can also cancel the subscription at any time.
// subscription.unsubscribe();

println!("\nDisconnect from the real-time data stream");
pubnub.disconnect();

tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

println!("\nReconnect to the real-time data stream");
pubnub.reconnect(None);

// Let event engine process unsubscribe request
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;

// If Subscription or Subscription will go out of scope they will unsubscribe.
// drop(subscription);
// drop(subscription_clone);

println!(
"\nUnsubscribe from all data streams. To restore requires `subscription.subscribe(None)` call." );
// Clean up before complete work with PubNub client instance.
pubnub.unsubscribe_all();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;

Ok(())
}
Loading

0 comments on commit f98ed52

Please sign in to comment.