Skip to content

Commit e57f340

Browse files
committed
Emit warnings when locks are held for too long
- Add rw_read and rw_write macros which wrap access to locks - They emit warnings if lock acqusition takes over 100ms - They emit warnings if locks are held for over 100ms - Use rw_read and rw_write macros in most places RwLock's are accessed Using these macros I was able to immediately identify the cause of a deadlock during boot.
1 parent 989a019 commit e57f340

File tree

11 files changed

+178
-67
lines changed

11 files changed

+178
-67
lines changed

ethportal-peertest/src/main.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use trin_core::portalnet::{
1111
types::{PortalnetConfig, ProtocolKind},
1212
Enr, U256,
1313
};
14+
use trin_core::rw_read;
1415
use trin_core::utils::setup_overlay_db;
1516

1617
#[tokio::main]
@@ -27,9 +28,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2728
let discovery = Arc::new(RwLock::new(Discovery::new(portal_config).unwrap()));
2829
discovery.write().await.start().await.unwrap();
2930

30-
let db = Arc::new(setup_overlay_db(
31-
discovery.read().await.local_enr().node_id(),
32-
));
31+
let db = Arc::new(setup_overlay_db(rw_read!(discovery).local_enr().node_id()));
3332

3433
let overlay = Arc::new(
3534
OverlayProtocol::new(

src/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use tokio::sync::RwLock;
88
use trin_core::jsonrpc::handlers::JsonRpcHandler;
99
use trin_core::jsonrpc::types::PortalJsonRpcRequest;
1010
use trin_core::portalnet::events::PortalnetEvents;
11+
use trin_core::rw_read;
12+
use trin_core::rw_write;
1113
use trin_core::{
1214
cli::{TrinConfig, HISTORY_NETWORK, STATE_NETWORK},
1315
jsonrpc::service::launch_jsonrpc_server,
@@ -52,12 +54,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5254
let discovery = Arc::new(RwLock::new(
5355
Discovery::new(portalnet_config.clone()).unwrap(),
5456
));
55-
discovery.write().await.start().await.unwrap();
57+
rw_write!(discovery).start().await.unwrap();
5658

5759
// Setup Overlay database
58-
let db = Arc::new(setup_overlay_db(
59-
discovery.read().await.local_enr().node_id(),
60-
));
60+
let db = Arc::new(setup_overlay_db(rw_read!(discovery).local_enr().node_id()));
6161

6262
debug!("Selected networks to spawn: {:?}", trin_config.networks);
6363
// Initialize state sub-network service and event handlers, if selected

trin-core/src/jsonrpc/handlers.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use tokio::sync::RwLock;
99
use crate::jsonrpc::endpoints::{Discv5Endpoint, HistoryEndpoint, StateEndpoint, TrinEndpoint};
1010
use crate::jsonrpc::types::{HistoryJsonRpcRequest, PortalJsonRpcRequest, StateJsonRpcRequest};
1111
use crate::portalnet::discovery::Discovery;
12+
use crate::rw_read;
13+
use crate::rw_write;
1214

1315
type Responder<T, E> = mpsc::UnboundedSender<Result<T, E>>;
1416

@@ -25,9 +27,9 @@ impl JsonRpcHandler {
2527
while let Some(request) = self.portal_jsonrpc_rx.recv().await {
2628
let response: Value = match request.endpoint {
2729
TrinEndpoint::Discv5Endpoint(endpoint) => match endpoint {
28-
Discv5Endpoint::NodeInfo => self.discovery.read().await.node_info(),
30+
Discv5Endpoint::NodeInfo => rw_read!(self.discovery).node_info(),
2931
Discv5Endpoint::RoutingTableInfo => {
30-
self.discovery.write().await.routing_table_info()
32+
rw_write!(self.discovery).routing_table_info()
3133
}
3234
},
3335
TrinEndpoint::HistoryEndpoint(endpoint) => {

trin-core/src/lib.rs

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,137 @@ pub mod jsonrpc;
66
pub mod portalnet;
77
pub mod socket;
88
pub mod utils;
9+
10+
pub const ACQUIRE_TIMEOUT_MS: u64 = 100;
11+
pub const HOLD_TIMEOUT_MS: u64 = 100;
12+
13+
pub struct TimedRwReadGuard<'a, T: ?Sized> {
14+
pub inner: tokio::sync::RwLockReadGuard<'a, T>,
15+
pub acquisition_line: u32,
16+
pub acquisition_file: &'static str,
17+
pub acquisition_time: std::time::Instant,
18+
pub sleep_task: tokio::task::JoinHandle<()>,
19+
}
20+
21+
async fn sleep_then_log(file: &'static str, line: u32) {
22+
tokio::time::sleep(std::time::Duration::from_millis(HOLD_TIMEOUT_MS)).await;
23+
log::warn!(
24+
"[{}:{}] lock held for over {}ms, not yet released",
25+
file,
26+
line,
27+
HOLD_TIMEOUT_MS.to_string()
28+
);
29+
}
30+
31+
impl<'a, T: ?Sized> TimedRwReadGuard<'a, T> {
32+
pub async fn new(
33+
inner: tokio::sync::RwLockReadGuard<'a, T>,
34+
acquisition_line: u32,
35+
acquisition_file: &'static str,
36+
) -> TimedRwReadGuard<'a, T> {
37+
let now = std::time::Instant::now();
38+
let move_line = acquisition_line;
39+
let move_file = acquisition_file;
40+
let handle = tokio::spawn(async move {
41+
sleep_then_log(move_file, move_line).await;
42+
});
43+
44+
TimedRwReadGuard {
45+
inner,
46+
acquisition_line,
47+
acquisition_file,
48+
acquisition_time: now,
49+
sleep_task: handle,
50+
}
51+
}
52+
}
53+
54+
impl<'a, T> std::ops::Deref for TimedRwReadGuard<'a, T> {
55+
type Target = tokio::sync::RwLockReadGuard<'a, T>;
56+
57+
fn deref(&self) -> &Self::Target {
58+
&self.inner
59+
}
60+
}
61+
62+
impl<'a, T: ?Sized> Drop for TimedRwReadGuard<'a, T> {
63+
fn drop(&mut self) {
64+
self.sleep_task.abort();
65+
let held_for = self.acquisition_time.elapsed().as_millis();
66+
if held_for > HOLD_TIMEOUT_MS.into() {
67+
log::warn!(
68+
"[{}:{}] lock held for too long: {}ms",
69+
self.acquisition_file,
70+
self.acquisition_line,
71+
held_for,
72+
)
73+
}
74+
}
75+
}
76+
77+
#[macro_export]
78+
macro_rules! rw_read {
79+
($lock_name:expr) => {{
80+
let acquire_timeout = std::time::Duration::from_millis($crate::ACQUIRE_TIMEOUT_MS);
81+
let sleep = tokio::time::sleep(acquire_timeout);
82+
tokio::pin!(sleep);
83+
84+
let mut did_log: bool = false;
85+
let now = std::time::Instant::now();
86+
87+
loop {
88+
tokio::select! {
89+
_ = &mut sleep, if !did_log => {
90+
log::warn!(
91+
"[{}:{}] waiting more than {}ms to acquire lock, still waiting",
92+
std::file!(),
93+
std::line!(),
94+
$crate::ACQUIRE_TIMEOUT_MS,
95+
);
96+
did_log = true;
97+
}
98+
guard = $lock_name.read() => {
99+
if did_log {
100+
let wait_time = now.elapsed().as_millis();
101+
log::warn!(
102+
"[{}:{}] waited {}ms to acquire lock",
103+
std::file!(),
104+
std::line!(),
105+
wait_time,
106+
);
107+
}
108+
109+
let wrapped = $crate::TimedRwReadGuard::new(
110+
guard, std::line!(), std::file!(),
111+
).await;
112+
break wrapped;
113+
}
114+
}
115+
}
116+
}};
117+
}
118+
119+
#[macro_export]
120+
macro_rules! rw_write {
121+
($lock_name:expr) => {{
122+
let acquire_timeout = std::time::Duration::from_millis($crate::ACQUIRE_TIMEOUT_MS);
123+
let sleep = tokio::time::sleep(acquire_timeout);
124+
tokio::pin!(sleep);
125+
126+
loop {
127+
tokio::select! {
128+
_ = &mut sleep, if !sleep.is_elapsed() => {
129+
log::warn!(
130+
"[{}:{}] took more than {} ms to acquire lock",
131+
std::file!(),
132+
std::line!(),
133+
$crate::ACQUIRE_TIMEOUT_MS,
134+
);
135+
}
136+
guard = $lock_name.write() => {
137+
break guard;
138+
}
139+
}
140+
}
141+
}};
142+
}

trin-core/src/portalnet/events.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use super::{
1010
utp::{UtpListener, UTP_PROTOCOL},
1111
};
1212
use crate::cli::{HISTORY_NETWORK, STATE_NETWORK};
13+
use crate::rw_write;
1314
use std::collections::HashMap;
1415
use std::convert::TryInto;
1516

@@ -27,9 +28,7 @@ impl PortalnetEvents {
2728
history_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
2829
state_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
2930
) -> Self {
30-
let protocol_receiver = discovery
31-
.write()
32-
.await
31+
let protocol_receiver = rw_write!(discovery)
3332
.discv5
3433
.event_stream()
3534
.await

trin-core/src/portalnet/overlay.rs

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::rw_read;
12
use crate::utils::xor_two_values;
23

34
use super::{
@@ -9,6 +10,7 @@ use super::{
910
Enr, U256,
1011
};
1112
use crate::portalnet::types::CustomPayload;
13+
use crate::rw_write;
1214
use discv5::{
1315
enr::NodeId,
1416
kbucket::{Filter, KBucketsTable},
@@ -112,7 +114,7 @@ impl OverlayProtocol {
112114
data_radius: U256,
113115
) -> Self {
114116
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
115-
discovery.read().await.local_enr().node_id().into(),
117+
rw_read!(discovery).local_enr().node_id().into(),
116118
config.bucket_pending_timeout,
117119
config.max_incoming_per_bucket,
118120
config.table_filter,
@@ -140,7 +142,7 @@ impl OverlayProtocol {
140142
let response = match request {
141143
Request::Ping(Ping { .. }) => {
142144
debug!("Got overlay ping request {:?}", request);
143-
let enr_seq = self.discovery.read().await.local_enr().seq();
145+
let enr_seq = rw_read!(self.discovery).local_enr().seq();
144146
let payload = CustomPayload::new(self.data_radius().await, None);
145147
Response::Pong(Pong {
146148
enr_seq,
@@ -181,12 +183,12 @@ impl OverlayProtocol {
181183

182184
/// Returns the local ENR of the node.
183185
pub async fn local_enr(&self) -> Enr {
184-
self.discovery.read().await.discv5.local_enr()
186+
rw_read!(self.discovery).discv5.local_enr()
185187
}
186188

187189
// Returns the data radius of the node.
188190
pub async fn data_radius(&self) -> U256 {
189-
self.data_radius.read().await.clone()
191+
rw_read!(self.data_radius).clone()
190192
}
191193

192194
/// Returns a vector of the ENRs of the closest nodes by the given log2 distances.
@@ -203,7 +205,7 @@ impl OverlayProtocol {
203205
}
204206

205207
if !log2_distances.is_empty() {
206-
let mut kbuckets = self.kbuckets.write().await;
208+
let mut kbuckets = rw_write!(self.kbuckets);
207209
for node in kbuckets
208210
.nodes_by_distances(&log2_distances, FIND_NODES_MAX_NODES)
209211
.into_iter()
@@ -246,19 +248,15 @@ impl OverlayProtocol {
246248

247249
/// Returns a vector of all ENR node IDs of nodes currently contained in the routing table.
248250
pub async fn table_entries_id(&self) -> Vec<NodeId> {
249-
self.kbuckets
250-
.write()
251-
.await
251+
rw_write!(self.kbuckets)
252252
.iter()
253253
.map(|entry| *entry.node.key.preimage())
254254
.collect()
255255
}
256256

257257
/// Returns a vector of all the ENRs of nodes currently contained in the routing table.
258258
pub async fn table_entries_enr(&self) -> Vec<Enr> {
259-
self.kbuckets
260-
.write()
261-
.await
259+
rw_write!(self.kbuckets)
262260
.iter()
263261
.map(|entry| entry.node.value.enr().clone())
264262
.collect()
@@ -271,16 +269,14 @@ impl OverlayProtocol {
271269
protocol: ProtocolKind,
272270
payload: Option<Vec<u8>>,
273271
) -> Result<Vec<u8>, SendPingError> {
274-
let enr_seq = self.discovery.read().await.local_enr().seq();
272+
let enr_seq = rw_read!(self.discovery).local_enr().seq();
273+
275274
let payload = CustomPayload::new(data_radius, payload);
276275
let msg = Ping {
277276
enr_seq,
278277
payload: Some(payload),
279278
};
280-
Ok(self
281-
.discovery
282-
.read()
283-
.await
279+
Ok(rw_read!(self.discovery)
284280
.send_talkreq(
285281
enr,
286282
protocol.to_string(),
@@ -296,9 +292,7 @@ impl OverlayProtocol {
296292
protocol: ProtocolKind,
297293
) -> Result<Vec<u8>, RequestError> {
298294
let msg = FindNodes { distances };
299-
self.discovery
300-
.read()
301-
.await
295+
rw_read!(self.discovery)
302296
.send_talkreq(
303297
enr,
304298
protocol.to_string(),
@@ -314,9 +308,7 @@ impl OverlayProtocol {
314308
protocol: ProtocolKind,
315309
) -> Result<Vec<u8>, RequestError> {
316310
let msg = FindContent { content_key };
317-
self.discovery
318-
.read()
319-
.await
311+
rw_read!(self.discovery)
320312
.send_talkreq(
321313
enr,
322314
protocol.to_string(),

trin-core/src/portalnet/utp.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use std::sync::Arc;
1212
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1313
use tokio::sync::RwLock;
1414

15+
use crate::rw_read;
16+
1517
pub const UTP_PROTOCOL: &str = "utp";
1618
pub const HEADER_SIZE: usize = 20;
1719
pub const MAX_DISCV5_PACKET_SIZE: usize = 1280;
@@ -433,7 +435,7 @@ impl UtpListener {
433435
}
434436
}
435437
Type::StSyn => {
436-
if let Some(enr) = self.discovery.read().await.discv5.find_enr(&node_id) {
438+
if let Some(enr) = rw_read!(self.discovery).discv5.find_enr(&node_id) {
437439
// If neither of those cases happened handle this is a new request
438440
let mut conn = UtpStream::init(Arc::clone(&self.discovery), enr);
439441
conn.handle_packet(packet).await;
@@ -466,7 +468,7 @@ impl UtpListener {
466468

467469
// I am honestly not sure if I should init this with Enr or NodeId since we could use both
468470
async fn connect(&mut self, connection_id: u16, node_id: NodeId) {
469-
if let Some(enr) = self.discovery.read().await.discv5.find_enr(&node_id) {
471+
if let Some(enr) = rw_read!(self.discovery).discv5.find_enr(&node_id) {
470472
let mut conn = UtpStream::init(Arc::clone(&self.discovery), enr);
471473
conn.make_connection(connection_id).await;
472474
self.utp_connections.insert(
@@ -585,10 +587,7 @@ impl UtpStream {
585587
// It doesn't waste cpu cycles well waiting.
586588
tokio::time::sleep(Duration::from_millis(100)).await;
587589
}
588-
let talk_request_result = self
589-
.discovery
590-
.read()
591-
.await
590+
let talk_request_result = rw_read!(self.discovery)
592591
.send_talkreq(self.enr.clone(), UTP_PROTOCOL.to_string(), packet.0.clone())
593592
.await;
594593
debug!("uTP TalkRequest result: {:?}", talk_request_result);

trin-state/src/events.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use log::{debug, error, warn};
44
use std::sync::Arc;
55
use tokio::sync::{mpsc::UnboundedReceiver, RwLock};
66
use trin_core::portalnet::types::Message;
7+
use trin_core::rw_write;
78

89
pub struct StateEvents {
910
pub network: Arc<RwLock<StateNetwork>>,
@@ -15,10 +16,7 @@ impl StateEvents {
1516
while let Some(talk_request) = self.event_rx.recv().await {
1617
debug!("Got state request {:?}", talk_request);
1718

18-
let reply = match self
19-
.network
20-
.write()
21-
.await
19+
let reply = match rw_write!(self.network)
2220
.overlay
2321
.process_one_request(&talk_request)
2422
.await

0 commit comments

Comments
 (0)