Skip to content

Commit 140d2d1

Browse files
committed
Emit warnings when locks are held for too long
- Add read_with_warn() and write_with_warn() methods to RwLock - They emit warnings if lock acqusition takes over 100ms - They emit warnings if locks are held for over 100ms - Use read_with_warn() and write_with_warn() in most places RwLock's are accessed Using these methods I was able to immediately identify the cause of a deadlock during boot.
1 parent 07d5813 commit 140d2d1

File tree

12 files changed

+218
-43
lines changed

12 files changed

+218
-43
lines changed

ethportal-peertest/src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use ethportal_peertest::events::PortalnetEvents;
99
use ethportal_peertest::jsonrpc::{
1010
test_jsonrpc_endpoints_over_http, test_jsonrpc_endpoints_over_ipc,
1111
};
12+
use trin_core::locks::RwLoggingExt;
1213
use trin_core::portalnet::{
1314
discovery::Discovery,
1415
overlay::{OverlayConfig, OverlayProtocol},
@@ -30,10 +31,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
3031
};
3132

3233
let discovery = Arc::new(RwLock::new(Discovery::new(portal_config).unwrap()));
33-
discovery.write().await.start().await.unwrap();
34+
discovery.write_with_warn().await.start().await.unwrap();
3435

3536
let db = Arc::new(setup_overlay_db(
36-
discovery.read().await.local_enr().node_id(),
37+
discovery.read_with_warn().await.local_enr().node_id(),
3738
));
3839

3940
let overlay = Arc::new(

src/main.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use tokio::sync::RwLock;
77

88
use trin_core::jsonrpc::handlers::JsonRpcHandler;
99
use trin_core::jsonrpc::types::PortalJsonRpcRequest;
10+
use trin_core::locks::RwLoggingExt;
1011
use trin_core::portalnet::events::PortalnetEvents;
1112
use trin_core::{
1213
cli::{TrinConfig, HISTORY_NETWORK, STATE_NETWORK},
@@ -52,11 +53,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5253
let discovery = Arc::new(RwLock::new(
5354
Discovery::new(portalnet_config.clone()).unwrap(),
5455
));
55-
discovery.write().await.start().await.unwrap();
56+
discovery.write_with_warn().await.start().await.unwrap();
5657

5758
// Setup Overlay database
5859
let db = Arc::new(setup_overlay_db(
59-
discovery.read().await.local_enr().node_id(),
60+
discovery.read_with_warn().await.local_enr().node_id(),
6061
));
6162

6263
debug!("Selected networks to spawn: {:?}", trin_config.networks);

trin-core/src/jsonrpc/handlers.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use tokio::sync::RwLock;
88

99
use crate::jsonrpc::endpoints::{Discv5Endpoint, HistoryEndpoint, StateEndpoint, TrinEndpoint};
1010
use crate::jsonrpc::types::{HistoryJsonRpcRequest, PortalJsonRpcRequest, StateJsonRpcRequest};
11+
use crate::locks::RwLoggingExt;
1112
use crate::portalnet::discovery::Discovery;
1213

1314
type Responder<T, E> = mpsc::UnboundedSender<Result<T, E>>;
@@ -25,9 +26,9 @@ impl JsonRpcHandler {
2526
while let Some(request) = self.portal_jsonrpc_rx.recv().await {
2627
let response: Value = match request.endpoint {
2728
TrinEndpoint::Discv5Endpoint(endpoint) => match endpoint {
28-
Discv5Endpoint::NodeInfo => self.discovery.read().await.node_info(),
29+
Discv5Endpoint::NodeInfo => self.discovery.read_with_warn().await.node_info(),
2930
Discv5Endpoint::RoutingTableInfo => {
30-
self.discovery.write().await.routing_table_info()
31+
self.discovery.write_with_warn().await.routing_table_info()
3132
}
3233
},
3334
TrinEndpoint::HistoryEndpoint(endpoint) => {

trin-core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ extern crate lazy_static;
33

44
pub mod cli;
55
pub mod jsonrpc;
6+
pub mod locks;
67
pub mod portalnet;
78
pub mod socket;
89
pub mod utils;

trin-core/src/locks.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
use futures::future::FutureExt;
2+
use std::future::Future;
3+
use std::marker::Sync;
4+
use std::ops::Deref;
5+
use std::ops::DerefMut;
6+
use std::panic::Location;
7+
use std::pin::Pin;
8+
use std::time::Duration;
9+
use std::time::Instant;
10+
use tokio::sync::RwLock;
11+
use tokio::sync::RwLockReadGuard;
12+
use tokio::sync::RwLockWriteGuard;
13+
use tokio::task::JoinHandle;
14+
15+
const ACQUIRE_TIMEOUT_MS: u64 = 100;
16+
const HOLD_TIMEOUT_MS: u64 = 100;
17+
18+
/// Tries to look exactly like a T, by implementing Deref and DerefMut, but emits
19+
/// a warning if drop() is not called soon enough.
20+
pub struct TimedGuard<T> {
21+
inner: T,
22+
acquisition_line: u32,
23+
acquisition_file: &'static str,
24+
acquisition_time: Instant,
25+
sleep_task: JoinHandle<()>,
26+
}
27+
28+
impl<T> TimedGuard<T> {
29+
fn new(inner: T, acquisition_line: u32, acquisition_file: &'static str) -> TimedGuard<T> {
30+
let now = Instant::now();
31+
let move_line = acquisition_line;
32+
let move_file = acquisition_file;
33+
let handle = tokio::spawn(async move {
34+
sleep_then_log(move_file, move_line).await;
35+
});
36+
37+
TimedGuard {
38+
inner,
39+
acquisition_line,
40+
acquisition_file,
41+
acquisition_time: now,
42+
sleep_task: handle,
43+
}
44+
}
45+
}
46+
47+
impl<T> Deref for TimedGuard<T> {
48+
type Target = T;
49+
50+
fn deref(&self) -> &Self::Target {
51+
&self.inner
52+
}
53+
}
54+
55+
impl<T> DerefMut for TimedGuard<T> {
56+
fn deref_mut(&mut self) -> &mut Self::Target {
57+
&mut self.inner
58+
}
59+
}
60+
61+
impl<T> Drop for TimedGuard<T> {
62+
fn drop(&mut self) {
63+
self.sleep_task.abort();
64+
let held_for = self.acquisition_time.elapsed().as_millis();
65+
if held_for > HOLD_TIMEOUT_MS.into() {
66+
log::warn!(
67+
"[{}:{}] lock held for too long: {}ms",
68+
self.acquisition_file,
69+
self.acquisition_line,
70+
held_for,
71+
)
72+
}
73+
}
74+
}
75+
76+
async fn sleep_then_log(file: &'static str, line: u32) {
77+
tokio::time::sleep(Duration::from_millis(HOLD_TIMEOUT_MS)).await;
78+
log::warn!(
79+
"[{}:{}] lock held for over {}ms, not yet released",
80+
file,
81+
line,
82+
HOLD_TIMEOUT_MS.to_string()
83+
);
84+
}
85+
86+
async fn try_lock<T, Fut>(fut: Fut, file: &'static str, line: u32) -> TimedGuard<T>
87+
where
88+
Fut: Future<Output = T>,
89+
{
90+
let acquire_timeout = Duration::from_millis(ACQUIRE_TIMEOUT_MS);
91+
let sleep = tokio::time::sleep(acquire_timeout).fuse();
92+
let fused = fut.fuse();
93+
94+
futures::pin_mut!(sleep, fused);
95+
96+
let now = Instant::now();
97+
98+
futures::select! {
99+
_ = sleep => {
100+
log::warn!(
101+
"[{}:{}] waiting more than {}ms to acquire lock, still waiting",
102+
file, line, ACQUIRE_TIMEOUT_MS,
103+
);
104+
},
105+
guard = fused => {
106+
return TimedGuard::new(guard, line, file);
107+
}
108+
}
109+
110+
let guard = fused.await;
111+
let wait_time = now.elapsed().as_millis();
112+
log::warn!("[{}:{}] waited {}ms to acquire lock", file, line, wait_time);
113+
114+
TimedGuard::new(guard, line, file)
115+
}
116+
117+
// this is a workaround:
118+
// - Rust does not support async in traits
119+
// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html
120+
// - async_trait does not give us enough flexibility to implement #[track_caller]
121+
//
122+
// So we manually desugar the async functions and have them return futures
123+
type Async<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
124+
125+
/// These methods should be used in favor of the stock read() and write() methods.
126+
///
127+
/// These methods emit warnings when the lock takes too long to acquire (meaning it's
128+
/// likely some other user is holding onto the lock for too long).
129+
///
130+
/// They also emit warnings when the returned TimedGuard is kept alive for too long.
131+
/// (The lock is held until the returned TimedGuard is dropped, so it should be dropped
132+
/// as soon as possible!)
133+
pub trait RwLoggingExt<T> {
134+
#[track_caller]
135+
fn read_with_warn(&self) -> Async<TimedGuard<RwLockReadGuard<T>>>;
136+
137+
#[track_caller]
138+
fn write_with_warn(&self) -> Async<TimedGuard<RwLockWriteGuard<T>>>;
139+
}
140+
141+
impl<T: Send + Sync> RwLoggingExt<T> for RwLock<T> {
142+
#[track_caller]
143+
fn read_with_warn(&self) -> Async<TimedGuard<RwLockReadGuard<T>>> {
144+
let loc = Location::caller();
145+
Box::pin(try_lock(self.read(), loc.file(), loc.line()))
146+
}
147+
148+
#[track_caller]
149+
fn write_with_warn(&self) -> Async<TimedGuard<RwLockWriteGuard<T>>> {
150+
let loc = Location::caller();
151+
Box::pin(try_lock(self.write(), loc.file(), loc.line()))
152+
}
153+
}

trin-core/src/portalnet/events.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use super::{
1010
utp::{UtpListener, UTP_PROTOCOL},
1111
};
1212
use crate::cli::{HISTORY_NETWORK, STATE_NETWORK};
13+
use crate::locks::RwLoggingExt;
1314
use std::collections::HashMap;
1415
use std::convert::TryInto;
1516

@@ -28,7 +29,7 @@ impl PortalnetEvents {
2829
state_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
2930
) -> Self {
3031
let protocol_receiver = discovery
31-
.write()
32+
.write_with_warn()
3233
.await
3334
.discv5
3435
.event_stream()

trin-core/src/portalnet/overlay.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::locks::RwLoggingExt;
12
use crate::utils::xor_two_values;
23

34
use super::{
@@ -112,7 +113,12 @@ impl OverlayProtocol {
112113
data_radius: U256,
113114
) -> Self {
114115
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
115-
discovery.read().await.local_enr().node_id().into(),
116+
discovery
117+
.read_with_warn()
118+
.await
119+
.local_enr()
120+
.node_id()
121+
.into(),
116122
config.bucket_pending_timeout,
117123
config.max_incoming_per_bucket,
118124
config.table_filter,
@@ -140,7 +146,7 @@ impl OverlayProtocol {
140146
let response = match request {
141147
Request::Ping(Ping { .. }) => {
142148
debug!("Got overlay ping request {:?}", request);
143-
let enr_seq = self.discovery.read().await.local_enr().seq();
149+
let enr_seq = self.discovery.read_with_warn().await.local_enr().seq();
144150
let payload = CustomPayload::new(self.data_radius().await, None);
145151
Response::Pong(Pong {
146152
enr_seq,
@@ -181,12 +187,12 @@ impl OverlayProtocol {
181187

182188
/// Returns the local ENR of the node.
183189
pub async fn local_enr(&self) -> Enr {
184-
self.discovery.read().await.discv5.local_enr()
190+
self.discovery.read_with_warn().await.discv5.local_enr()
185191
}
186192

187193
// Returns the data radius of the node.
188194
pub async fn data_radius(&self) -> U256 {
189-
self.data_radius.read().await.clone()
195+
self.data_radius.read_with_warn().await.clone()
190196
}
191197

192198
/// Returns a vector of the ENRs of the closest nodes by the given log2 distances.
@@ -203,7 +209,7 @@ impl OverlayProtocol {
203209
}
204210

205211
if !log2_distances.is_empty() {
206-
let mut kbuckets = self.kbuckets.write().await;
212+
let mut kbuckets = self.kbuckets.write_with_warn().await;
207213
for node in kbuckets
208214
.nodes_by_distances(&log2_distances, FIND_NODES_MAX_NODES)
209215
.into_iter()
@@ -247,7 +253,7 @@ impl OverlayProtocol {
247253
/// Returns a vector of all ENR node IDs of nodes currently contained in the routing table.
248254
pub async fn table_entries_id(&self) -> Vec<NodeId> {
249255
self.kbuckets
250-
.write()
256+
.write_with_warn()
251257
.await
252258
.iter()
253259
.map(|entry| *entry.node.key.preimage())
@@ -257,7 +263,7 @@ impl OverlayProtocol {
257263
/// Returns a vector of all the ENRs of nodes currently contained in the routing table.
258264
pub async fn table_entries_enr(&self) -> Vec<Enr> {
259265
self.kbuckets
260-
.write()
266+
.write_with_warn()
261267
.await
262268
.iter()
263269
.map(|entry| entry.node.value.enr().clone())
@@ -271,15 +277,16 @@ impl OverlayProtocol {
271277
protocol: ProtocolKind,
272278
payload: Option<Vec<u8>>,
273279
) -> Result<Vec<u8>, SendPingError> {
274-
let enr_seq = self.discovery.read().await.local_enr().seq();
280+
let enr_seq = self.discovery.read_with_warn().await.local_enr().seq();
281+
275282
let payload = CustomPayload::new(data_radius, payload);
276283
let msg = Ping {
277284
enr_seq,
278285
payload: Some(payload),
279286
};
280287
Ok(self
281288
.discovery
282-
.read()
289+
.read_with_warn()
283290
.await
284291
.send_talkreq(
285292
enr,
@@ -297,7 +304,7 @@ impl OverlayProtocol {
297304
) -> Result<Vec<u8>, RequestError> {
298305
let msg = FindNodes { distances };
299306
self.discovery
300-
.read()
307+
.read_with_warn()
301308
.await
302309
.send_talkreq(
303310
enr,
@@ -315,7 +322,7 @@ impl OverlayProtocol {
315322
) -> Result<Vec<u8>, RequestError> {
316323
let msg = FindContent { content_key };
317324
self.discovery
318-
.read()
325+
.read_with_warn()
319326
.await
320327
.send_talkreq(
321328
enr,

trin-core/src/portalnet/utp.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use std::sync::Arc;
1212
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1313
use tokio::sync::RwLock;
1414

15+
use crate::locks::RwLoggingExt;
16+
1517
pub const UTP_PROTOCOL: &str = "utp";
1618
pub const HEADER_SIZE: usize = 20;
1719
pub const MAX_DISCV5_PACKET_SIZE: usize = 1280;
@@ -433,7 +435,13 @@ impl UtpListener {
433435
}
434436
}
435437
Type::StSyn => {
436-
if let Some(enr) = self.discovery.read().await.discv5.find_enr(&node_id) {
438+
if let Some(enr) = self
439+
.discovery
440+
.read_with_warn()
441+
.await
442+
.discv5
443+
.find_enr(&node_id)
444+
{
437445
// If neither of those cases happened handle this is a new request
438446
let mut conn = UtpStream::init(Arc::clone(&self.discovery), enr);
439447
conn.handle_packet(packet).await;
@@ -466,7 +474,13 @@ impl UtpListener {
466474

467475
// I am honestly not sure if I should init this with Enr or NodeId since we could use both
468476
async fn connect(&mut self, connection_id: u16, node_id: NodeId) {
469-
if let Some(enr) = self.discovery.read().await.discv5.find_enr(&node_id) {
477+
if let Some(enr) = self
478+
.discovery
479+
.read_with_warn()
480+
.await
481+
.discv5
482+
.find_enr(&node_id)
483+
{
470484
let mut conn = UtpStream::init(Arc::clone(&self.discovery), enr);
471485
conn.make_connection(connection_id).await;
472486
self.utp_connections.insert(
@@ -587,7 +601,7 @@ impl UtpStream {
587601
}
588602
let talk_request_result = self
589603
.discovery
590-
.read()
604+
.read_with_warn()
591605
.await
592606
.send_talkreq(self.enr.clone(), UTP_PROTOCOL.to_string(), packet.0.clone())
593607
.await;

0 commit comments

Comments
 (0)