Skip to content

Commit

Permalink
Merge branch 'near:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
Longarithm authored Oct 18, 2024
2 parents 2951142 + 7ce9e95 commit 7ef7096
Show file tree
Hide file tree
Showing 63 changed files with 1,715 additions and 1,402 deletions.
9 changes: 0 additions & 9 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,6 @@ jobs:
tool: just
- run: just check-protocol-schema

lychee_checks:
name: "Lychee Lints"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: lycheeverse/lychee-action@2ac9f030ccdea0033e2510a23a67da2a2da98492
with:
fail: true

check_fmt:
name: "Cargo Fmt"
runs-on: ubuntu-latest
Expand Down
22 changes: 22 additions & 0 deletions .github/workflows/lychee_lints.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
name: Lychee Lints

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

on:
schedule:
# At 17:15 on every day-of-month.
- cron: '15 17 */1 * *'


jobs:
lychee_checks:
name: "Lychee Lints"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: lycheeverse/lychee-action@2ac9f030ccdea0033e2510a23a67da2a2da98492
with:
fail: true

1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions chain/chain-primitives/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ pub enum Error {
/// Resharding error.
#[error("Resharding Error: {0}")]
ReshardingError(String),
/// EpochSyncProof validation error.
#[error("EpochSyncProof Validation Error: {0}")]
InvalidEpochSyncProof(String),
/// Anything else
#[error("Other Error: {0}")]
Other(String),
Expand Down Expand Up @@ -300,6 +303,7 @@ impl Error {
| Error::MaliciousChallenge
| Error::IncorrectNumberOfChunkHeaders
| Error::InvalidEpochHash
| Error::InvalidEpochSyncProof(_)
| Error::InvalidNextBPHash
| Error::NotEnoughApprovals
| Error::InvalidFinalityInfo
Expand Down Expand Up @@ -377,6 +381,7 @@ impl Error {
Error::MaliciousChallenge => "malicious_challenge",
Error::IncorrectNumberOfChunkHeaders => "incorrect_number_of_chunk_headers",
Error::InvalidEpochHash => "invalid_epoch_hash",
Error::InvalidEpochSyncProof(_) => "invalid_epoch_sync_proof",
Error::InvalidNextBPHash => "invalid_next_bp_hash",
Error::NotEnoughApprovals => "not_enough_approvals",
Error::InvalidFinalityInfo => "invalid_finality_info",
Expand Down
2 changes: 1 addition & 1 deletion chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ strum.workspace = true
tempfile.workspace = true
thiserror.workspace = true
time.workspace = true
tokio.workspace = true
tracing.workspace = true

near-async.workspace = true
Expand Down Expand Up @@ -112,7 +113,6 @@ nightly_protocol = [
"node-runtime/nightly_protocol",
]
sandbox = ["near-o11y/sandbox", "near-primitives/sandbox"]
testloop = []
protocol_schema = [
"near-schema-checker-lib/protocol_schema",
"near-crypto/protocol_schema",
Expand Down
108 changes: 28 additions & 80 deletions chain/chain/src/block_processing_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use near_primitives::hash::CryptoHash;
use near_primitives::sharding::{ReceiptProof, ShardChunkHeader, StateSyncInfo};
use near_primitives::types::ShardId;
use std::collections::HashMap;
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;
use std::sync::Arc;

/// Max number of blocks that can be in the pool at once.
/// This number will likely never be hit unless there are many forks in the chain.
Expand All @@ -25,7 +24,7 @@ pub(crate) struct BlockPreprocessInfo {
pub(crate) challenged_blocks: Vec<CryptoHash>,
pub(crate) provenance: Provenance,
/// Used to get notified when the applying chunks of a block finishes.
pub(crate) apply_chunks_done_tracker: ApplyChunksDoneTracker,
pub(crate) apply_chunks_done_waiter: ApplyChunksDoneWaiter,
/// This is used to calculate block processing time metric
pub(crate) block_start_processing_time: Instant,
}
Expand Down Expand Up @@ -127,7 +126,7 @@ impl BlocksInProcessing {
/// Returns true if new blocks are done applying chunks
pub(crate) fn wait_for_all_blocks(&self) -> bool {
for (_, (_, block_preprocess_info)) in self.preprocessed_blocks.iter() {
let _ = block_preprocess_info.apply_chunks_done_tracker.wait_until_done();
let _ = block_preprocess_info.apply_chunks_done_waiter.wait();
}
!self.preprocessed_blocks.is_empty()
}
Expand All @@ -142,83 +141,32 @@ impl BlocksInProcessing {
.get(block_hash)
.ok_or(BlockNotInPoolError)?
.1
.apply_chunks_done_tracker
.wait_until_done();
.apply_chunks_done_waiter
.wait();
Ok(())
}
}

/// This is used to for the thread that applies chunks to notify other waiter threads.
/// The thread applying the chunks should call `set_done` to send the notification.
/// The waiter threads should call `wait_until_done` to wait (blocked) for the notification.
/// The waiter's wait() will block until the corresponding ApplyChunksStillApplying is dropped.
#[derive(Clone)]
pub struct ApplyChunksDoneTracker(Arc<(Mutex<bool>, Condvar)>);

impl ApplyChunksDoneTracker {
pub fn new() -> Self {
Self(Arc::new((Mutex::new(false), Condvar::new())))
}
pub struct ApplyChunksDoneWaiter(Arc<tokio::sync::Mutex<()>>);
pub struct ApplyChunksStillApplying {
// We're using tokio's mutex guard, because the std one is not Send.
_guard: tokio::sync::OwnedMutexGuard<()>,
}

/// Notifies all threads waiting on `wait_until_done` that apply chunks is done.
/// This should be called only once.
/// Returns an error if it is called more than once or the mutex used internally is poisoned.
pub fn set_done(&mut self) -> Result<(), &'static str> {
let (lock, cvar) = &*self.0;
match lock.lock() {
Ok(mut guard) => {
if *guard {
Err("Apply chunks done marker is already set to true.")
} else {
*guard = true;
cvar.notify_all();
Ok(())
}
}
Err(_poisoned) => Err("Mutex is poisoned."),
}
impl ApplyChunksDoneWaiter {
pub fn new() -> (Self, ApplyChunksStillApplying) {
let lock = Arc::new(tokio::sync::Mutex::new(()));
// Use try_lock_owned() rather than blocking_lock_owned(), because otherwise
// this causes a panic if we do this on a tokio runtime.
let guard = lock.clone().try_lock_owned().expect("should succeed on a fresh mutex");
(ApplyChunksDoneWaiter(lock), ApplyChunksStillApplying { _guard: guard })
}

/// Blocks the current thread until the `set_done` is called after applying the chunks.
/// to indicate that apply chunks is done.
pub fn wait_until_done(&self) {
#[cfg(feature = "testloop")]
let mut testloop_total_wait_time = Duration::from_millis(0);

let (lock, cvar) = &*self.0;
match lock.lock() {
Ok(mut guard) => loop {
let done = *guard;
if done {
break;
}
const WAIT_TIMEOUT: Duration = Duration::from_millis(100);
match cvar.wait_timeout(guard, WAIT_TIMEOUT) {
Ok(result) => {
guard = result.0;

// Panics during testing (eg. due to assertion failures) cause the waiter
// threads to miss the notification (see issue #11447). Thus, for testing only,
// we limit the total wait time for waiting for the notification.
#[cfg(feature = "testloop")]
if result.1.timed_out() {
const TESTLOOP_MAX_WAIT_TIME: Duration = Duration::from_millis(5000);
testloop_total_wait_time += WAIT_TIMEOUT;
if testloop_total_wait_time >= TESTLOOP_MAX_WAIT_TIME {
break;
}
}
}
Err(_poisoned) => {
tracing::error!("Mutex is poisoned.");
break;
}
}
},
Err(_poisoned) => {
tracing::error!("Mutex is poisoned.");
()
}
}
pub fn wait(&self) {
// This would only go through if the guard has been dropped.
drop(self.0.blocking_lock());
}
}

Expand All @@ -228,16 +176,16 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;

use super::ApplyChunksDoneTracker;
use super::ApplyChunksDoneWaiter;

#[test]
fn test_apply_chunks_with_multiple_waiters() {
let shared_value: Arc<AtomicBool> = Arc::new(AtomicBool::new(false));

let mut tracker = ApplyChunksDoneTracker::new();
let waiter1 = tracker.clone();
let waiter2 = tracker.clone();
let waiter3 = tracker.clone();
let (waiter, still_applying) = ApplyChunksDoneWaiter::new();
let waiter1 = waiter.clone();
let waiter2 = waiter.clone();
let waiter3 = waiter;

let (results_sender, results_receiver) = std::sync::mpsc::channel();

Expand All @@ -246,7 +194,7 @@ mod tests {
let current_sender = results_sender.clone();
let current_shared_value = shared_value.clone();
std::thread::spawn(move || {
waiter.wait_until_done();
waiter.wait();
let read_value = current_shared_value.load(Ordering::Relaxed);
current_sender.send(read_value).unwrap();
});
Expand All @@ -255,7 +203,7 @@ mod tests {
// Wait 300ms then set the shared_value to true, and notify the waiters.
std::thread::sleep(Duration::from_millis(300));
shared_value.store(true, Ordering::Relaxed);
tracker.set_done().unwrap();
drop(still_applying);

// Check values that waiters read
for _ in 0..3 {
Expand Down
Loading

0 comments on commit 7ef7096

Please sign in to comment.