Skip to content

Commit

Permalink
fix UpdateActivity replication (fixes #35)
Browse files Browse the repository at this point in the history
  • Loading branch information
popravich committed Jul 5, 2017
1 parent b2d9859 commit 2dfe91a
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 18 deletions.
12 changes: 10 additions & 2 deletions src/chat/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use serde_json::{Error as JsonError};

use http_pools::{REQUESTS, FAILED_503};
use runtime::Runtime;
use intern::SessionId;
use config::chat::Chat;
use config::SessionPool;
use chat::{Cid, ConnectionSender, CloseReason, CONNECTIONS};
use chat::message::{self, Meta, Args, Kwargs};
use chat::processor::{Action, ProcessorPool, ConnectionMessage};
use chat::backend::CallCodec;
use chat::error::MessageError;
use chat::replication::{RemotePool, RemoteAction};

use metrics::{Counter};

Expand All @@ -29,11 +31,13 @@ lazy_static! {

pub struct Dispatcher {
pub cid: Cid,
pub session_id: SessionId,
pub auth: Arc<String>,
pub runtime: Arc<Runtime>,
pub settings: Arc<Chat>,
pub pool_settings: Arc<SessionPool>,
pub processor: ProcessorPool,
pub remote: RemotePool,
pub handle: Handle, // Does it belong here?
pub channel: ConnectionSender,
}
Expand Down Expand Up @@ -159,10 +163,14 @@ impl Dispatcher {
let seconds = Duration::from_secs(seconds);
let seconds = cmp::max(cmp::min(seconds, max), min);
let timestamp = Instant::now() + seconds;
self.processor.send(Action::UpdateActivity{
conn_id: self.cid,
self.processor.send(Action::UpdateActivity {
session_id: self.session_id.clone(),
timestamp: timestamp,
});
self.remote.send(RemoteAction::UpdateActivity {
session_id: self.session_id.clone(),
duration: seconds,
})
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/chat/listener/pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use config::{SessionPool, ListenSocket};
pub struct SessionPools {
pools: Arc<RwLock<HashMap<SessionPoolName, Worker>>>,
pub processor: Processor,
remote_sender: RemoteSender,
pub remote_sender: RemoteSender,
}

struct Worker {
Expand Down
4 changes: 2 additions & 2 deletions src/chat/processor/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ fn pool_action(pool: &mut Pool, ts: Instant, action: Action) {
Associate { session_id, conn_id, metadata } => {
pool.associate(conn_id, session_id, ts, metadata);
}
UpdateActivity { conn_id, timestamp } => {
pool.update_activity(conn_id, timestamp);
UpdateActivity { session_id, timestamp } => {
pool.update_activity(session_id, timestamp);
}
Disconnect { conn_id } => {
pool.del_connection(conn_id);
Expand Down
6 changes: 3 additions & 3 deletions src/chat/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub enum Action {
metadata: Arc<Json>
},
UpdateActivity {
conn_id: Cid,
session_id: SessionId,
// We receive duration from client, but we expect request handling
// code to validate and normalize it for us
timestamp: Instant,
Expand Down Expand Up @@ -244,8 +244,8 @@ impl fmt::Debug for Action {
&Associate { ref conn_id, ref session_id, .. } => {
write!(f, "Action::Associate({:?}, {:?})", conn_id, session_id)
}
&UpdateActivity { ref conn_id, .. } => {
write!(f, "Action::UpdateActivity({:?})", conn_id)
&UpdateActivity { ref session_id, .. } => {
write!(f, "Action::UpdateActivity({:?})", session_id)
}
&Disconnect { ref conn_id } => {
write!(f, "Action::Disconnect({:?})", conn_id)
Expand Down
11 changes: 3 additions & 8 deletions src/chat/processor/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,14 @@ impl Pool {
}
}

pub fn update_activity(&mut self, conn_id: Cid, activity_ts: Instant)
pub fn update_activity(&mut self, sess_id: SessionId, activity_ts: Instant)
{
let sess_id = if let Some(conn) = self.connections.get(&conn_id) {
&conn.session_id
} else {
return;
};
if let Some(session) = self.sessions.inactive.remove(sess_id) {
if let Some(session) = self.sessions.inactive.remove(&sess_id) {
INACTIVE_SESSIONS.decr(1);
ACTIVE_SESSIONS.incr(1);
self.sessions.active.insert(sess_id.clone(), activity_ts, session);
} else {
self.sessions.active.update_if_smaller(sess_id, activity_ts)
self.sessions.active.update_if_smaller(&sess_id, activity_ts)
}
}

Expand Down
15 changes: 14 additions & 1 deletion src/chat/replication/action.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::sync::Arc;
use std::time::{Instant, Duration};
use serde_json::Value as Json;

use runtime::ServerId;
use intern::{SessionPoolName, Topic, Lattice as Namespace};
use intern::{SessionId, SessionPoolName, Topic, Lattice as Namespace};
use config::Replication;
use chat::Cid;
use chat::processor::{Action, Delta};
Expand Down Expand Up @@ -66,6 +67,12 @@ pub enum RemoteAction {
// TODO: probably use String here
delta: Delta,
},

// NOTE: In remote action we send original duration, not timestamp;

This comment has been minimized.

Copy link
@tailhook

tailhook Jul 5, 2017

Collaborator

Why do we send duration instead of timestamp?

UpdateActivity {
session_id: SessionId,
duration: Duration,
},
}

impl Into<Action> for RemoteAction {
Expand Down Expand Up @@ -108,6 +115,12 @@ impl Into<Action> for RemoteAction {
delta: delta,
}
}
UpdateActivity { session_id, duration } => {
Action::UpdateActivity {
session_id: session_id,
timestamp: Instant::now() + duration,
}
}
}
}
}
6 changes: 5 additions & 1 deletion src/handlers/swindon_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ impl<S: AsyncRead + AsyncWrite + 'static> Codec<S> for WebsockReply {
let processor = self.runtime.session_pools.processor
// TODO(tailhook) this doesn't check that pool is created
.pool(&self.settings.session_pool);
let remote = self.runtime.session_pools.remote_sender
.pool(&self.settings.session_pool);
let h1 = self.handle.clone();
let h2 = self.handle.clone();
let r1 = self.runtime.clone();
Expand All @@ -99,7 +101,7 @@ impl<S: AsyncRead + AsyncWrite + 'static> Codec<S> for WebsockReply {
format!("{}", TangleAuth(&session_id)));
Either::A(
out.send(Packet::Text(
json_encode(&Hello(session_id, data))
json_encode(&Hello(session_id.clone(), data))
.expect("every message can be encoded")))
.map_err(|e| info!("error sending userinfo: {:?}", e))
.and_then(move |out| {
Expand All @@ -121,10 +123,12 @@ impl<S: AsyncRead + AsyncWrite + 'static> Codec<S> for WebsockReply {
websocket::Loop::server(out, inp, rx,
chat::Dispatcher {
cid: cid,
session_id: session_id,
auth: auth,
handle: h1,
pool_settings: pool_settings.clone(),
processor: processor,
remote: remote,
runtime: r1,
settings: s1,
channel: tx,
Expand Down

0 comments on commit 2dfe91a

Please sign in to comment.