Skip to content

Commit ea4a547

Browse files
committed
Convert to extension traits
1 parent f29d85c commit ea4a547

File tree

12 files changed

+238
-169
lines changed

12 files changed

+238
-169
lines changed

ethportal-peertest/src/main.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ use log::info;
44
use std::collections::HashMap;
55
use std::sync::Arc;
66
use tokio::sync::RwLock;
7+
use trin_core::locks::RwLoggingExt;
78
use trin_core::portalnet::utp::UtpListener;
89
use trin_core::portalnet::{
910
discovery::Discovery,
1011
overlay::{OverlayConfig, OverlayProtocol},
1112
types::{PortalnetConfig, ProtocolKind},
1213
Enr, U256,
1314
};
14-
use trin_core::rw_read;
1515
use trin_core::utils::setup_overlay_db;
1616

1717
#[tokio::main]
@@ -28,7 +28,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2828
let discovery = Arc::new(RwLock::new(Discovery::new(portal_config).unwrap()));
2929
discovery.write().await.start().await.unwrap();
3030

31-
let db = Arc::new(setup_overlay_db(rw_read!(discovery).local_enr().node_id()));
31+
let db = Arc::new(setup_overlay_db(
32+
discovery.read_with_warn().await.local_enr().node_id(),
33+
));
3234

3335
let overlay = Arc::new(
3436
OverlayProtocol::new(

src/main.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,8 @@ use tokio::sync::RwLock;
77

88
use trin_core::jsonrpc::handlers::JsonRpcHandler;
99
use trin_core::jsonrpc::types::PortalJsonRpcRequest;
10+
use trin_core::locks::RwLoggingExt;
1011
use trin_core::portalnet::events::PortalnetEvents;
11-
use trin_core::rw_read;
12-
use trin_core::rw_write;
1312
use trin_core::{
1413
cli::{TrinConfig, HISTORY_NETWORK, STATE_NETWORK},
1514
jsonrpc::service::launch_jsonrpc_server,
@@ -54,10 +53,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5453
let discovery = Arc::new(RwLock::new(
5554
Discovery::new(portalnet_config.clone()).unwrap(),
5655
));
57-
rw_write!(discovery).start().await.unwrap();
56+
discovery.write_with_warn().await.start().await.unwrap();
5857

5958
// Setup Overlay database
60-
let db = Arc::new(setup_overlay_db(rw_read!(discovery).local_enr().node_id()));
59+
let db = Arc::new(setup_overlay_db(
60+
discovery.read_with_warn().await.local_enr().node_id(),
61+
));
6162

6263
debug!("Selected networks to spawn: {:?}", trin_config.networks);
6364
// Initialize state sub-network service and event handlers, if selected

trin-core/src/jsonrpc/handlers.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ use tokio::sync::RwLock;
88

99
use crate::jsonrpc::endpoints::{Discv5Endpoint, HistoryEndpoint, StateEndpoint, TrinEndpoint};
1010
use crate::jsonrpc::types::{HistoryJsonRpcRequest, PortalJsonRpcRequest, StateJsonRpcRequest};
11+
use crate::locks::RwLoggingExt;
1112
use crate::portalnet::discovery::Discovery;
12-
use crate::rw_read;
13-
use crate::rw_write;
1413

1514
type Responder<T, E> = mpsc::UnboundedSender<Result<T, E>>;
1615

@@ -27,9 +26,9 @@ impl JsonRpcHandler {
2726
while let Some(request) = self.portal_jsonrpc_rx.recv().await {
2827
let response: Value = match request.endpoint {
2928
TrinEndpoint::Discv5Endpoint(endpoint) => match endpoint {
30-
Discv5Endpoint::NodeInfo => rw_read!(self.discovery).node_info(),
29+
Discv5Endpoint::NodeInfo => self.discovery.read_with_warn().await.node_info(),
3130
Discv5Endpoint::RoutingTableInfo => {
32-
rw_write!(self.discovery).routing_table_info()
31+
self.discovery.write_with_warn().await.routing_table_info()
3332
}
3433
},
3534
TrinEndpoint::HistoryEndpoint(endpoint) => {

trin-core/src/lib.rs

Lines changed: 1 addition & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -3,135 +3,7 @@ extern crate lazy_static;
33

44
pub mod cli;
55
pub mod jsonrpc;
6+
pub mod locks;
67
pub mod portalnet;
78
pub mod socket;
89
pub mod utils;
9-
10-
pub const ACQUIRE_TIMEOUT_MS: u64 = 100;
11-
pub const HOLD_TIMEOUT_MS: u64 = 100;
12-
13-
pub struct TimedGuard<T> {
14-
pub inner: T,
15-
pub acquisition_line: u32,
16-
pub acquisition_file: &'static str,
17-
pub acquisition_time: std::time::Instant,
18-
pub sleep_task: tokio::task::JoinHandle<()>,
19-
}
20-
21-
impl<T> TimedGuard<T> {
22-
pub async fn new(
23-
inner: T,
24-
acquisition_line: u32,
25-
acquisition_file: &'static str,
26-
) -> TimedGuard<T> {
27-
let now = std::time::Instant::now();
28-
let move_line = acquisition_line;
29-
let move_file = acquisition_file;
30-
let handle = tokio::spawn(async move {
31-
sleep_then_log(move_file, move_line).await;
32-
});
33-
34-
TimedGuard {
35-
inner,
36-
acquisition_line,
37-
acquisition_file,
38-
acquisition_time: now,
39-
sleep_task: handle,
40-
}
41-
}
42-
}
43-
44-
impl<T> std::ops::Deref for TimedGuard<T> {
45-
type Target = T;
46-
47-
fn deref(&self) -> &Self::Target {
48-
&self.inner
49-
}
50-
}
51-
52-
impl<T> std::ops::DerefMut for TimedGuard<T> {
53-
fn deref_mut(&mut self) -> &mut Self::Target {
54-
&mut self.inner
55-
}
56-
}
57-
58-
impl<T> Drop for TimedGuard<T> {
59-
fn drop(&mut self) {
60-
self.sleep_task.abort();
61-
let held_for = self.acquisition_time.elapsed().as_millis();
62-
if held_for > HOLD_TIMEOUT_MS.into() {
63-
log::warn!(
64-
"[{}:{}] lock held for too long: {}ms",
65-
self.acquisition_file,
66-
self.acquisition_line,
67-
held_for,
68-
)
69-
}
70-
}
71-
}
72-
73-
async fn sleep_then_log(file: &'static str, line: u32) {
74-
tokio::time::sleep(std::time::Duration::from_millis(HOLD_TIMEOUT_MS)).await;
75-
log::warn!(
76-
"[{}:{}] lock held for over {}ms, not yet released",
77-
file,
78-
line,
79-
HOLD_TIMEOUT_MS.to_string()
80-
);
81-
}
82-
83-
#[macro_export]
84-
macro_rules! inner_rw {
85-
($meth: ident, $lock_name:expr) => {{
86-
let acquire_timeout = std::time::Duration::from_millis($crate::ACQUIRE_TIMEOUT_MS);
87-
let sleep = tokio::time::sleep(acquire_timeout);
88-
tokio::pin!(sleep);
89-
90-
let mut did_log: bool = false;
91-
let now = std::time::Instant::now();
92-
93-
loop {
94-
tokio::select! {
95-
_ = &mut sleep, if !did_log => {
96-
log::warn!(
97-
"[{}:{}] waiting more than {}ms to acquire lock, still waiting",
98-
std::file!(),
99-
std::line!(),
100-
$crate::ACQUIRE_TIMEOUT_MS,
101-
);
102-
did_log = true;
103-
}
104-
guard = $lock_name.$meth() => {
105-
if did_log {
106-
let wait_time = now.elapsed().as_millis();
107-
log::warn!(
108-
"[{}:{}] waited {}ms to acquire lock",
109-
std::file!(),
110-
std::line!(),
111-
wait_time,
112-
);
113-
}
114-
115-
let wrapped = $crate::TimedGuard::new(
116-
guard, std::line!(), std::file!(),
117-
).await;
118-
break wrapped;
119-
}
120-
}
121-
}
122-
}};
123-
}
124-
125-
#[macro_export]
126-
macro_rules! rw_write {
127-
($lock_name:expr) => {{
128-
$crate::inner_rw!(write, $lock_name)
129-
}}
130-
}
131-
132-
#[macro_export]
133-
macro_rules! rw_read {
134-
($lock_name:expr) => {{
135-
$crate::inner_rw!(read, $lock_name)
136-
}}
137-
}

trin-core/src/locks.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
use futures::future::FutureExt;
2+
use std::future::Future;
3+
use std::marker::Sync;
4+
use std::ops::Deref;
5+
use std::ops::DerefMut;
6+
use std::panic::Location;
7+
use std::pin::Pin;
8+
use std::time::Duration;
9+
use std::time::Instant;
10+
use tokio::sync::RwLock;
11+
use tokio::sync::RwLockReadGuard;
12+
use tokio::sync::RwLockWriteGuard;
13+
use tokio::task::JoinHandle;
14+
15+
const ACQUIRE_TIMEOUT_MS: u64 = 100;
16+
const HOLD_TIMEOUT_MS: u64 = 100;
17+
18+
/// Tries to look exactly like a T, by implementing Deref and DerefMut, but emits
19+
/// a warning if drop() is not called soon enough.
20+
pub struct TimedGuard<T> {
21+
inner: T,
22+
acquisition_line: u32,
23+
acquisition_file: &'static str,
24+
acquisition_time: Instant,
25+
sleep_task: JoinHandle<()>,
26+
}
27+
28+
impl<T> TimedGuard<T> {
29+
fn new(inner: T, acquisition_line: u32, acquisition_file: &'static str) -> TimedGuard<T> {
30+
let now = Instant::now();
31+
let move_line = acquisition_line;
32+
let move_file = acquisition_file;
33+
let handle = tokio::spawn(async move {
34+
sleep_then_log(move_file, move_line).await;
35+
});
36+
37+
TimedGuard {
38+
inner,
39+
acquisition_line,
40+
acquisition_file,
41+
acquisition_time: now,
42+
sleep_task: handle,
43+
}
44+
}
45+
}
46+
47+
impl<T> Deref for TimedGuard<T> {
48+
type Target = T;
49+
50+
fn deref(&self) -> &Self::Target {
51+
&self.inner
52+
}
53+
}
54+
55+
impl<T> DerefMut for TimedGuard<T> {
56+
fn deref_mut(&mut self) -> &mut Self::Target {
57+
&mut self.inner
58+
}
59+
}
60+
61+
impl<T> Drop for TimedGuard<T> {
62+
fn drop(&mut self) {
63+
self.sleep_task.abort();
64+
let held_for = self.acquisition_time.elapsed().as_millis();
65+
if held_for > HOLD_TIMEOUT_MS.into() {
66+
log::warn!(
67+
"[{}:{}] lock held for too long: {}ms",
68+
self.acquisition_file,
69+
self.acquisition_line,
70+
held_for,
71+
)
72+
}
73+
}
74+
}
75+
76+
async fn sleep_then_log(file: &'static str, line: u32) {
77+
tokio::time::sleep(Duration::from_millis(HOLD_TIMEOUT_MS)).await;
78+
log::warn!(
79+
"[{}:{}] lock held for over {}ms, not yet released",
80+
file,
81+
line,
82+
HOLD_TIMEOUT_MS.to_string()
83+
);
84+
}
85+
86+
async fn try_lock<T, Fut>(fut: Fut, file: &'static str, line: u32) -> TimedGuard<T>
87+
where
88+
Fut: Future<Output = T>,
89+
{
90+
let acquire_timeout = Duration::from_millis(ACQUIRE_TIMEOUT_MS);
91+
let sleep = tokio::time::sleep(acquire_timeout).fuse();
92+
let fused = fut.fuse();
93+
94+
futures::pin_mut!(sleep, fused);
95+
96+
let now = Instant::now();
97+
98+
futures::select! {
99+
_ = sleep => {
100+
log::warn!(
101+
"[{}:{}] waiting more than {}ms to acquire lock, still waiting",
102+
file, line, ACQUIRE_TIMEOUT_MS,
103+
);
104+
},
105+
guard = fused => {
106+
return TimedGuard::new(guard, line, file);
107+
}
108+
}
109+
110+
let guard = fused.await;
111+
let wait_time = now.elapsed().as_millis();
112+
log::warn!("[{}:{}] waited {}ms to acquire lock", file, line, wait_time);
113+
114+
TimedGuard::new(guard, line, file)
115+
}
116+
117+
// this is a workaround:
118+
// - Rust does not support async in traits
119+
// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html
120+
// - async_trait does not give us enough flexibility to implement #[track_caller]
121+
//
122+
// So we manually desugar the async functions and have them return futures
123+
type Async<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
124+
125+
/// These methods should be used in favor of the stock read() and write() methods.
126+
///
127+
/// These methods emit warnings when the lock takes too long to acquire (meaning it's
128+
/// likely some other user is holding onto the lock for too long).
129+
///
130+
/// They also emit warnings when the returned TimedGuard is kept alive for too long.
131+
/// (The lock is held until the returned TimedGuard is dropped, so it should be dropped
132+
/// as soon as possible!)
133+
pub trait RwLoggingExt<T> {
134+
#[track_caller]
135+
fn read_with_warn(&self) -> Async<TimedGuard<RwLockReadGuard<T>>>;
136+
137+
#[track_caller]
138+
fn write_with_warn(&self) -> Async<TimedGuard<RwLockWriteGuard<T>>>;
139+
}
140+
141+
impl<T: Send + Sync> RwLoggingExt<T> for RwLock<T> {
142+
#[track_caller]
143+
fn read_with_warn(&self) -> Async<TimedGuard<RwLockReadGuard<T>>> {
144+
let loc = Location::caller();
145+
Box::pin(try_lock(self.read(), loc.file(), loc.line()))
146+
}
147+
148+
#[track_caller]
149+
fn write_with_warn(&self) -> Async<TimedGuard<RwLockWriteGuard<T>>> {
150+
let loc = Location::caller();
151+
Box::pin(try_lock(self.write(), loc.file(), loc.line()))
152+
}
153+
}

trin-core/src/portalnet/events.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use super::{
1010
utp::{UtpListener, UTP_PROTOCOL},
1111
};
1212
use crate::cli::{HISTORY_NETWORK, STATE_NETWORK};
13-
use crate::rw_write;
13+
use crate::locks::RwLoggingExt;
1414
use std::collections::HashMap;
1515
use std::convert::TryInto;
1616

@@ -28,7 +28,9 @@ impl PortalnetEvents {
2828
history_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
2929
state_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
3030
) -> Self {
31-
let protocol_receiver = rw_write!(discovery)
31+
let protocol_receiver = discovery
32+
.write_with_warn()
33+
.await
3234
.discv5
3335
.event_stream()
3436
.await

0 commit comments

Comments
 (0)