Skip to content

Commit

Permalink
cargo fix
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonHartley committed Mar 24, 2024
1 parent 3f211f9 commit 879e19e
Show file tree
Hide file tree
Showing 47 changed files with 92 additions and 153 deletions.
2 changes: 1 addition & 1 deletion coerce/benches/actor_messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use coerce::actor::context::ActorContext;
use coerce::actor::message::{Handler, Message};
use coerce::actor::scheduler::ActorType::Anonymous;
use coerce::actor::system::ActorSystem;
use coerce::actor::{Actor, IntoActorId, LocalActorRef, ToActorId};
use coerce::actor::{Actor, IntoActorId, LocalActorRef};
use tokio::runtime::Runtime;

struct BenchmarkActor;
Expand Down
3 changes: 1 addition & 2 deletions coerce/src/actor/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ use crate::actor::{
};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::collections::HashMap;

use tokio::sync::oneshot::Sender;
use valuable::{Fields, NamedField, NamedValues, StructDef, Structable, Valuable, Value, Visit};
use valuable::NamedField;

use crate::actor::supervised::{ChildRef, Supervised};
use crate::actor::watch::watchers::Watchers;
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/actor/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl ActorLoop {

{
#[cfg(feature = "actor-tracing-info")]
let span = tracing::info_span!("actor.start", ctx = log.as_value(),);
let _span = tracing::info_span!("actor.start", ctx = log.as_value(),);

#[cfg(feature = "actor-tracing-debug")]
let span = tracing::debug_span!("actor.start", ctx = log.as_value(),);
Expand Down
25 changes: 4 additions & 21 deletions coerce/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,34 +138,17 @@
//! ```
//!
use crate::actor::context::{ActorContext, ActorStatus};
use crate::actor::describe::Describe;
use crate::actor::lifecycle::{Status, Stop};
use crate::actor::message::{
ActorMessage, Exec, Handler, Message, MessageHandler, MessageUnwrapErr, MessageWrapErr,
};
use crate::actor::metrics::ActorMetrics;
use crate::actor::message::{Handler, Message};
use crate::actor::scheduler::ActorType::{Anonymous, Tracked};
use crate::actor::supervised::Terminated;
use crate::actor::system::ActorSystem;
use std::any::Any;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};

use std::fmt::Debug;
use std::hash::Hasher;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;

use uuid::Uuid;

#[cfg(feature = "remote")]
use crate::actor::message::Envelope;

#[cfg(feature = "remote")]
use crate::remote::{system::NodeId, RemoteActorRef};

pub use refs::*;

pub mod blocking;
Expand Down
4 changes: 2 additions & 2 deletions coerce/src/actor/refs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::future::Future;
use std::hash::{Hash, Hasher};
use std::ops::Deref;
use std::sync::Arc;
use tokio::sync::mpsc::error::SendError;

use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -83,7 +83,7 @@ impl<A: Actor> Debug for ActorRef<A> {

impl<A: Actor> Display for ActorRef<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
(self as (&dyn Debug)).fmt(f)
(self as &dyn Debug).fmt(f)
}
}

Expand Down
2 changes: 1 addition & 1 deletion coerce/src/actor/watch/watchers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::actor::watch::ActorTerminated;
use crate::actor::{ActorId, BoxedActorRef, Receiver};
use crate::actor::{ActorId, Receiver};
use std::collections::HashMap;

#[derive(Default)]
Expand Down
3 changes: 1 addition & 2 deletions coerce/src/persistent/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::actor::context::ActorContext;
use crate::actor::message::Message;
use crate::actor::Actor;
use crate::persistent::storage::JournalEntry;

use crate::persistent::types::JournalTypes;
use crate::persistent::{PersistentActor, Recover};
use std::sync::Arc;
Expand Down
6 changes: 3 additions & 3 deletions coerce/src/persistent/journal/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub mod inmemory {
persistence_id: &str,
sequence_id: i64,
) -> anyhow::Result<Option<JournalEntry>> {
let mut store = self.store.read();
let store = self.store.read();
let journal = store.get(persistence_id);
match journal {
None => Ok(None),
Expand All @@ -193,7 +193,7 @@ pub mod inmemory {
from_sequence: i64,
to_sequence: i64,
) -> anyhow::Result<Option<Vec<JournalEntry>>> {
let mut store = self.store.read();
let store = self.store.read();
let journal = store.get(persistence_id);
match journal {
None => Ok(None),
Expand Down Expand Up @@ -255,7 +255,7 @@ pub mod inmemory {
to_sequence: i64,
) -> anyhow::Result<()> {
let mut store = self.store.write();
let mut journal = store.entry(persistence_id.to_string());
let journal = store.entry(persistence_id.to_string());
if let Entry::Occupied(mut journal) = journal {
let journal = journal.get_mut();

Expand Down
1 change: 0 additions & 1 deletion coerce/src/remote/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::actor::ActorRefErr;
use crate::remote::handler::{ActorHandler, ActorMessageHandler};

use std::collections::HashMap;
use uuid::Uuid;

pub mod clients;
pub mod message;
Expand Down
5 changes: 2 additions & 3 deletions coerce/src/remote/actor/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::remote::system::{NodeId, RemoteActorSystem};
use protobuf::well_known_types::wrappers::UInt64Value;
use protobuf::Message;
use std::collections::HashMap;
use uuid::Uuid;

pub struct RemoteRegistry {
nodes: RemoteNodeStore,
Expand Down Expand Up @@ -238,8 +237,8 @@ impl Handler<Receive<SystemTopic>> for RemoteRegistry {
match event.0.as_ref() {
SystemEvent::Cluster(e) => {
debug!("cluster event - {:?}", e);
let system = self.system.as_ref().unwrap().clone();
let registry_ref = self.actor_ref(ctx);
let _system = self.system.as_ref().unwrap().clone();
let _registry_ref = self.actor_ref(ctx);
//
// // TODO: remove all of this stuff
// tokio::spawn(async move {
Expand Down
2 changes: 0 additions & 2 deletions coerce/src/remote/actor_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use std::marker::PhantomData;

use tokio::sync::oneshot;

use uuid::Uuid;

pub struct RemoteActorRef<A: Actor>
where
A: 'static + Sync + Send,
Expand Down
13 changes: 5 additions & 8 deletions coerce/src/remote/cluster/group/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,22 @@ mod builder;

use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::system::ActorSystem;

use crate::actor::{
Actor, ActorId, ActorRef, ActorRefErr, IntoActor, IntoActorId, LocalActorRef, Receiver,
ToActorId,
};
use crate::remote::cluster::group::builder::NodeGroupBuilder;
use crate::remote::cluster::node::{NodeSelector, RemoteNodeRef};
use crate::remote::stream::pubsub::{PubSub, Receive, Subscription};
use crate::remote::stream::system::{ClusterEvent, ClusterMemberUp, SystemEvent, SystemTopic};
use crate::remote::system::{NodeId, RemoteActorSystem};
use crate::remote::RemoteActorRef;
use crate::singleton::factory::SingletonFactory;
use crate::singleton::manager::lease::{LeaseAck, RequestLease};
use crate::singleton::manager::{Manager, SingletonStarted, State};

use chrono::{DateTime, Utc};
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::mem;

use std::sync::Arc;

pub enum NodeGroupEvent<A: Actor> {
Expand Down Expand Up @@ -250,8 +247,8 @@ impl<A: Actor> Handler<Receive<SystemTopic>> for NodeGroup<A> {
let mut event = None;

if self.selector.includes(node.as_ref()) {
let mut entry = self.nodes.entry(node.id);
if let Entry::Vacant(mut entry) = entry {
let entry = self.nodes.entry(node.id);
if let Entry::Vacant(entry) = entry {
let actor_id = self
.actor_id_provider
.get_actor_id(self.group_name.as_ref(), node.id);
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/remote/cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use hashring::HashRing;
use crate::remote::config::SystemCapabilities;
use crate::remote::net::message::{datetime_to_timestamp, timestamp_to_datetime};
use crate::remote::net::proto::network;
use crate::remote::stream::system::ClusterEvent::NodeAdded;

use chrono::{DateTime, Utc};
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
Expand Down
6 changes: 3 additions & 3 deletions coerce/src/remote/net/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::remote::net::proto::network::{self as proto, IdentifyEvent};
use crate::remote::net::{receive_loop, StreamData};

use bytes::Bytes;
use chrono::Utc;

use protobuf::EnumOrUnknown;
use std::time::Duration;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -172,7 +172,7 @@ impl Handler<Connect> for RemoteClient {
#[async_trait]
impl Handler<BeginHandshake> for RemoteClient {
async fn handle(&mut self, message: BeginHandshake, ctx: &mut ActorContext) {
let mut connection = match &mut self.state {
let connection = match &mut self.state {
Some(ClientState::Connected(connection)) => connection,
_ => {
// let actor_ref = self.actor_ref(ctx);
Expand Down Expand Up @@ -311,7 +311,7 @@ impl Handler<Disconnected> for RemoteClient {
}
}

ClientState::Connected(mut state) => ClientState::Idle {
ClientState::Connected(_state) => ClientState::Idle {
connection_attempts: 1,
},

Expand Down
2 changes: 1 addition & 1 deletion coerce/src/remote/net/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bytes::Bytes;
use chrono::{DateTime, Utc};

use futures::SinkExt;
use std::collections::VecDeque;
use std::fmt::{Display, Formatter};
Expand Down
2 changes: 0 additions & 2 deletions coerce/src/remote/net/client/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use protobuf::Message as ProtoMessage;
use std::time::Instant;
use tokio::sync::oneshot;

use uuid::Uuid;

use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::scheduler::timer::TimerTick;
Expand Down
3 changes: 0 additions & 3 deletions coerce/src/remote/net/client/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@ use crate::remote::system::{NodeId, RemoteActorSystem};
use chrono::{DateTime, Utc};
use protobuf::Message as ProtoMessage;

use std::str::FromStr;

use crate::remote::config::SystemCapabilities;
use tokio::sync::oneshot::Sender;
use uuid::Uuid;

pub struct ClientMessageReceiver {
actor_ref: LocalActorRef<RemoteClient>,
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/remote/net/client/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::actor::message::{Handler, Message};
use crate::remote::net::client::connect::Disconnected;
use crate::remote::net::client::{ClientState, ConnectionState, RemoteClient, RemoteClientErr};
use crate::remote::net::StreamData;
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use futures::SinkExt;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
Expand Down
4 changes: 2 additions & 2 deletions coerce/src/remote/net/server/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use protobuf::{Message as ProtoMessage, MessageField};
use bytes::Bytes;
use std::io::Error;
use std::net::SocketAddr;
use std::str::FromStr;

use std::sync::Arc;
use tokio::io::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio::sync::oneshot;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

use valuable::Valuable;

pub mod store;
Expand Down
1 change: 0 additions & 1 deletion coerce/src/remote/net/server/session/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::actor::{Actor, IntoActorId, LocalActorRef};
use crate::remote::net::message::ClientEvent;
use crate::remote::net::server::session::RemoteSession;
use std::collections::HashMap;
use uuid::Uuid;

pub struct RemoteSessionStore {
sessions: HashMap<i64, LocalActorRef<RemoteSession>>,
Expand Down
4 changes: 1 addition & 3 deletions coerce/src/remote/stream/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ use crate::remote::net::proto::network::{
LeaderChangedEvent, MemberUpEvent, NewNodeEvent, NodeRemovedEvent, SystemEvent as SysEvent,
};
use crate::remote::net::StreamData;
use crate::remote::stream::pubsub::{PubSub, Subscription, Topic};
use crate::remote::stream::pubsub::Topic;
use std::sync::Arc;

use crate::actor::context::ActorContext;
use crate::remote::system::NodeId;
use protobuf::{Enum, Error, Message};

use crate::remote::cluster::node::RemoteNodeRef;
use crate::remote::stream::mediator::SubscribeErr;

pub struct SystemTopic;

Expand Down
1 change: 0 additions & 1 deletion coerce/src/remote/system/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::remote::{RemoteActorRef, RemoteMessageHeader};
use protobuf::well_known_types::wrappers::UInt64Value;
use protobuf::{Message as ProtoMessage, MessageField};
use tokio::sync::oneshot;
use uuid::Uuid;

#[derive(Debug, Eq, PartialEq)]
pub enum RemoteActorErr {
Expand Down
2 changes: 1 addition & 1 deletion coerce/src/remote/system/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod rpc;

use crate::remote::config::RemoteSystemConfig;
pub use actor::*;
pub use cluster::*;

pub use rpc::*;

#[derive(Clone)]
Expand Down
1 change: 0 additions & 1 deletion coerce/src/remote/system/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use protobuf::Message as ProtoMessage;
use std::error::Error;
use std::fmt::{Display, Formatter};
use tokio::sync::oneshot;
use uuid::Uuid;

use crate::actor::{ActorId, ActorRefErr};
use crate::remote::actor::{RemoteRequest, RemoteResponse};
Expand Down
3 changes: 1 addition & 2 deletions coerce/src/sharding/coordinator/balancing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use futures::future::join_all;
use std::mem;
use std::time::Instant;
use tokio::sync::oneshot;
use uuid::Uuid;

#[derive(Debug)]
pub enum Rebalance {
Expand Down Expand Up @@ -149,7 +148,7 @@ impl ShardCoordinator {
debug!("beginning re-balance of shards hosted on node={}", node_id);

let shards_to_rebalance = {
let mut shard_host_state = match self.hosts.get_mut(&node_id) {
let shard_host_state = match self.hosts.get_mut(&node_id) {
None => return,
Some(shard_host_state) => shard_host_state,
};
Expand Down
3 changes: 1 addition & 2 deletions coerce/src/sharding/coordinator/discovery.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message};
use crate::actor::message::Handler;
use crate::remote::cluster::node::RemoteNode;
use crate::sharding::coordinator::balancing::Rebalance;
use crate::sharding::coordinator::{ShardCoordinator, ShardHostState, ShardHostStatus};
Expand All @@ -9,7 +9,6 @@ use crate::remote::stream::pubsub::Receive;
use crate::remote::stream::system::{ClusterEvent, SystemEvent, SystemTopic};
use crate::remote::system::NodeId;
use std::collections::hash_map::Entry;
use std::sync::Arc;

#[async_trait]
impl Handler<Receive<SystemTopic>> for ShardCoordinator {
Expand Down
1 change: 0 additions & 1 deletion coerce/src/sharding/coordinator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::sharding::host::ShardHost;

use crate::remote::system::NodeId;

use crate::actor::message::Handler;
use crate::remote::cluster::node::NodeStatus::{Healthy, Joining};
use crate::remote::heartbeat::Heartbeat;
use crate::remote::stream::pubsub::{PubSub, Subscription};
Expand Down
Loading

0 comments on commit 879e19e

Please sign in to comment.