Skip to content

Commit

Permalink
TcpService new architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Jan 16, 2024
1 parent 8d92766 commit 96aef88
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 38 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ my-http-server = { tag = "0.7.0", git = "https://github.com/MyJetTools/my-http-s
"static-files",
] }

my-no-sql-sdk = { tag = "0.3.0", git = "https://github.com/MyJetTools/my-no-sql-sdk.git", features = [
my-no-sql-sdk = { branch = "main", git = "https://github.com/MyJetTools/my-no-sql-sdk.git", features = [
"master-node",
"tcp-contracts",
] }

my-no-sql-server-core = { tag = "0.3.1", git = "https://github.com/MyJetTools/my-no-sql-server-core.git", features = [
my-no-sql-server-core = { branch = "main", git = "https://github.com/MyJetTools/my-no-sql-server-core.git", features = [
"master-node",
] }

Expand All @@ -33,7 +33,7 @@ rust-extensions = { tag = "0.1.4", git = "https://github.com/MyJetTools/rust-ext
my-logger = { tag = "1.1.0", git = "https://github.com/MyJetTools/my-logger.git" }
my-json = { tag = "0.2.2", git = "https://github.com/MyJetTools/my-json.git" }

my-tcp-sockets = { tag = "0.1.9", git = "https://github.com/MyJetTools/my-tcp-sockets.git" }
my-tcp-sockets = { branch = "main", git = "https://github.com/MyJetTools/my-tcp-sockets.git" }

hyper-tls = "*"
tonic = { version = "*", features = ["tls", "tls-roots", "prost"] }
Expand Down
2 changes: 2 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use background::{
metrics_updater::MetricsUpdater, persist::PersistTimer, sync::SyncEventLoop, BackupTimer,
};

use my_no_sql_sdk::tcp_contracts::MyNoSqlTcpSerializerFactory;
use my_tcp_sockets::TcpServer;
use rust_extensions::MyTimer;
use std::{net::SocketAddr, sync::Arc, time::Duration};
Expand Down Expand Up @@ -103,6 +104,7 @@ async fn main() {

tcp_server
.start(
Arc::new(MyNoSqlTcpSerializerFactory),
Arc::new(TcpServerEvents::new(app.clone())),
app.states.clone(),
my_logger::LOGGER.clone(),
Expand Down
82 changes: 47 additions & 35 deletions src/tcp/tcp_server_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;

use my_logger::LogEventCtx;
use my_no_sql_sdk::tcp_contracts::{MyNoSqlReaderTcpSerializer, MyNoSqlTcpContract};
use my_tcp_sockets::{tcp_connection::TcpSocketConnection, ConnectionEvent, SocketEventCallback};
use my_tcp_sockets::{tcp_connection::TcpSocketConnection, SocketEventCallback};

use crate::{app::AppContext, data_readers::tcp_connection::ReaderName};

Expand All @@ -17,13 +17,50 @@ impl TcpServerEvents {
pub fn new(app: Arc<AppContext>) -> Self {
Self { app }
}
}

#[async_trait::async_trait]
impl SocketEventCallback<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer, ()> for TcpServerEvents {
async fn connected(
&self,
_connection: Arc<TcpSocketConnection<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer, ()>>,
) {
//println!("New connection");
self.app.metrics.mark_new_tcp_connection();
}

async fn disconnected(
&self,
connection: Arc<TcpSocketConnection<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer, ()>>,
) {
let name =
if let Some(data_reader) = self.app.data_readers.get_tcp(connection.as_ref()).await {
data_reader.get_name().to_string()
} else {
"".to_string()
};

my_logger::LOGGER.write_info(
"TcpConnection",
"Disconnected",
LogEventCtx::new()
.add("id", connection.id.to_string())
.add("Name", name),
);
if let Some(data_reader) = self.app.data_readers.remove_tcp(connection.as_ref()).await {
self.app
.metrics
.remove_pending_to_sync(&data_reader.connection);
}
self.app.metrics.mark_new_tcp_disconnection();
}

pub async fn handle_incoming_packet(
async fn payload(
&self,
tcp_contract: MyNoSqlTcpContract,
connection: Arc<MyNoSqlTcpConnection>,
connection: &Arc<TcpSocketConnection<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer, ()>>,
contract: MyNoSqlTcpContract,
) {
match tcp_contract {
match contract {
MyNoSqlTcpContract::Ping => {
connection.send(&MyNoSqlTcpContract::Pong).await;
}
Expand All @@ -38,7 +75,7 @@ impl TcpServerEvents {

self.app
.data_readers
.add_tcp(connection, ReaderName::AsReader(name), false)
.add_tcp(connection.clone(), ReaderName::AsReader(name), false)
.await;
}

Expand All @@ -53,7 +90,7 @@ impl TcpServerEvents {
};
self.app
.data_readers
.add_tcp(connection, name, compress)
.add_tcp(connection.clone(), name, compress)
.await;
}

Expand Down Expand Up @@ -236,48 +273,23 @@ impl TcpServerEvents {
_ => {}
}
}
}

#[async_trait::async_trait]
impl SocketEventCallback<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer, ()> for TcpServerEvents {
/*
async fn handle(
&self,
connection_event: ConnectionEvent<MyNoSqlTcpContract, MyNoSqlReaderTcpSerializer, ()>,
) {
match connection_event {
ConnectionEvent::Connected(_connection) => {
println!("New connection");
self.app.metrics.mark_new_tcp_connection();
}
ConnectionEvent::Disconnected(connection) => {
let name = if let Some(data_reader) =
self.app.data_readers.get_tcp(connection.as_ref()).await
{
data_reader.get_name().to_string()
} else {
"".to_string()
};
my_logger::LOGGER.write_info(
"TcpConnection",
"Disconnected",
LogEventCtx::new()
.add("id", connection.id.to_string())
.add("Name", name),
);
if let Some(data_reader) =
self.app.data_readers.remove_tcp(connection.as_ref()).await
{
self.app
.metrics
.remove_pending_to_sync(&data_reader.connection);
}
self.app.metrics.mark_new_tcp_disconnection();
}
ConnectionEvent::Payload {
connection,
payload,
} => self.handle_incoming_packet(payload, connection).await,
}
}
*/
}

0 comments on commit 96aef88

Please sign in to comment.