Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

7311: add GetReceiptsFromPeerTask #7638

Merged
merged 185 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
185 commits
Select commit Hold shift + click to select a range
4b80016
7311: Add PeerTask system for use in future PRs
Matilda-Clerke Sep 17, 2024
a8d5a9f
7311: Clean up some warnings
Matilda-Clerke Sep 17, 2024
4c64dbe
7311: Add feature toggle for enabling use of the peertask system wher…
Matilda-Clerke Sep 17, 2024
7a94fe2
7311: Remove log used for testing, apply spotless
Matilda-Clerke Sep 17, 2024
ace5dd1
7311: Add private constructor to PeerTaskFeatureToggle to prevent ins…
Matilda-Clerke Sep 17, 2024
52d440a
7311: Switch to logging a warning instead of throwing an exception wh…
Matilda-Clerke Sep 17, 2024
f392a0f
7311: Update javadoc to match previous commit
Matilda-Clerke Sep 17, 2024
2fb2690
7311: spotless
Matilda-Clerke Sep 17, 2024
f14aaeb
7311: Fix broken BesuCommandTest
Matilda-Clerke Sep 18, 2024
06553be
Merge branch 'refs/heads/7311-add-cli-feature-toggle-for-peertask-sys…
Matilda-Clerke Sep 18, 2024
1ba5184
7311: add class
Matilda-Clerke Sep 18, 2024
f2500dd
7311: Move PeerTaskFeatureToggle to more appropriate location
Matilda-Clerke Sep 18, 2024
0992c35
Merge branch 'refs/heads/7311-add-cli-feature-toggle-for-peertask-sys…
Matilda-Clerke Sep 18, 2024
33b810b
7311: add X prefix to peertask-system-enabled
Matilda-Clerke Sep 18, 2024
a378d22
Merge branch 'refs/heads/7311-add-cli-feature-toggle-for-peertask-sys…
Matilda-Clerke Sep 18, 2024
15b6bdf
7311: Move --Xpeertask-system-enabled out of BesuCommand and make hidden
Matilda-Clerke Sep 18, 2024
a9f6714
Merge branch 'refs/heads/7311-add-cli-feature-toggle-for-peertask-sys…
Matilda-Clerke Sep 18, 2024
e3fbc6c
7311: spotless
Matilda-Clerke Sep 18, 2024
c03bffb
Merge branch 'refs/heads/7311-add-cli-feature-toggle-for-peertask-sys…
Matilda-Clerke Sep 18, 2024
5e8b750
Merge branch 'main' into 7311-add-cli-feature-toggle-for-peertask-system
macfarla Sep 18, 2024
e4be5c0
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 18, 2024
bc11e0c
Merge branch 'main' into 7311-add-cli-feature-toggle-for-peertask-system
Matilda-Clerke Sep 18, 2024
b2a45c2
7311: Add GetReceiptsFromPeerTask
Matilda-Clerke Sep 19, 2024
513b74f
7311: Move isPeerTaskSystemEnabled to SynchronizerOptions
Matilda-Clerke Sep 19, 2024
6b86919
Merge remote-tracking branch 'origin/7311-add-cli-feature-toggle-for-…
Matilda-Clerke Sep 19, 2024
645e0e3
Merge branch 'refs/heads/7311-add-cli-feature-toggle-for-peertask-sys…
Matilda-Clerke Sep 19, 2024
4522414
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Sep 19, 2024
28fc2cd
Merge remote-tracking branch 'origin/7311-add-GetReceiptsFromPeerTask…
Matilda-Clerke Sep 19, 2024
2364ed5
7311: Fix javadoc issue
Matilda-Clerke Sep 19, 2024
ced19cd
Merge branch 'refs/heads/7311-add-cli-feature-toggle-for-peertask-sys…
Matilda-Clerke Sep 19, 2024
03f6495
7311: Fix javadoc issue
Matilda-Clerke Sep 19, 2024
5859444
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 19, 2024
9a4f3dd
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Sep 19, 2024
97e5918
7311: Move PeerTaskFeatureToggleTestHelper to TestUtil and fix Runner…
Matilda-Clerke Sep 20, 2024
e0f736d
7311: spotless
Matilda-Clerke Sep 20, 2024
df7f62d
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Sep 20, 2024
c335cbe
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 20, 2024
98aefcd
Merge branch 'main' into 7311-add-cli-feature-toggle-for-peertask-system
Matilda-Clerke Sep 20, 2024
6e734f9
7311: Remove PeerTaskFeatureToggle in favor of including isPeerTaskSy…
Matilda-Clerke Sep 20, 2024
42ca85b
Merge branch 'main' into 7311-add-cli-feature-toggle-for-peertask-system
Matilda-Clerke Sep 20, 2024
76724ed
Merge branch 'refs/heads/7311-add-cli-feature-toggle-for-peertask-sys…
Matilda-Clerke Sep 20, 2024
fc9b3f2
7311: Adjust to the removal of PeerTaskFeatureToggle and use Synchron…
Matilda-Clerke Sep 20, 2024
08c66fd
7311: Reduce timeout in PeerTaskRequestSender to 5s
Matilda-Clerke Sep 20, 2024
049cae2
7311: Refactor PeerManager to be an interface
Matilda-Clerke Sep 20, 2024
b6ec075
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Sep 20, 2024
5afba63
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 20, 2024
3c1178c
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Sep 20, 2024
5c3a61a
7311: Fix up compile errors after merge
Matilda-Clerke Sep 20, 2024
8448898
7311: Fix MetricsAcceptanceTest
Matilda-Clerke Sep 20, 2024
ab21100
7311: Fix MetricsAcceptanceTest
Matilda-Clerke Sep 20, 2024
2ac52f0
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Sep 20, 2024
f2ac53e
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 23, 2024
3de578d
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Sep 23, 2024
e901fdf
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 24, 2024
24e73a8
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Sep 24, 2024
f077206
7311: Fix DownloadReceiptsStep when using peer task system
Matilda-Clerke Sep 25, 2024
fa12495
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Sep 25, 2024
6e349e1
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 25, 2024
ad86ae6
7311: Rename PeerManager to PeerSelector
Matilda-Clerke Sep 25, 2024
38f04ab
7311: Reword PeerSelector javadoc to avoid implementation details
Matilda-Clerke Sep 25, 2024
6de3fb3
7311: Use ConcurrentHashMap in DefaultPeerSelector
Matilda-Clerke Sep 25, 2024
da9cd43
7311: Reword trace log in DefaultPeerSelector
Matilda-Clerke Sep 25, 2024
ce7d245
7311: Remove unused imports
Matilda-Clerke Sep 25, 2024
c9eb22e
7311: Use a 1 second delay between retries in PeerTaskExecutor to mat…
Matilda-Clerke Sep 25, 2024
e2fda73
7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest
Matilda-Clerke Sep 25, 2024
608fece
7311: Add testGetPeerButNoPeerMatchesFilter to DefaultPeerSelectorTest
Matilda-Clerke Sep 25, 2024
0e76000
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Sep 25, 2024
2d07800
7311: spotless
Matilda-Clerke Sep 25, 2024
b910b4d
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Sep 25, 2024
ad26297
7311: Fix MetricsAcceptanceTest
Matilda-Clerke Sep 20, 2024
96c8030
7311: Fix MetricsAcceptanceTest
Matilda-Clerke Sep 20, 2024
b0f2ed0
7311: Modify PeerTaskExecutor metric to include response time from peer
Matilda-Clerke Sep 26, 2024
598b519
7311: Use SubProtocol instead of subprotocol name string in PeerTask
Matilda-Clerke Sep 26, 2024
bc25b16
7311: rename timing context to ignored to prevent intellij warnings
Matilda-Clerke Sep 26, 2024
e31bb70
7311: Use constants for number of retries
Matilda-Clerke Sep 26, 2024
41923d3
7311: Convert PeerTaskExecutorResult to a record
Matilda-Clerke Sep 26, 2024
720f94e
7311: Rename PeerTaskBehavior to PeerTaskRetryBehavior
Matilda-Clerke Sep 29, 2024
7d845b3
7311: Move peer selection logic to PeerSelector
Matilda-Clerke Sep 30, 2024
50c26f1
7311: spotless
Matilda-Clerke Sep 30, 2024
b7c0c95
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 30, 2024
a81855d
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Sep 30, 2024
64adedc
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Sep 30, 2024
2c1446e
7311: Fix up everything broken after merge
Matilda-Clerke Oct 1, 2024
3c0c47b
7311: Attempt to improve performance of peer task system in pipeline
Matilda-Clerke Oct 2, 2024
d0bd5ed
7311: fix compile check
Matilda-Clerke Oct 2, 2024
1c25ac5
7311: Fix broken workflow
Matilda-Clerke Oct 2, 2024
2e6dfd9
7311: Reduce logging in JsonRpcExecutor to trace level
Matilda-Clerke Oct 2, 2024
aca8058
7311: More changes in DownloadReceiptsStep
Matilda-Clerke Oct 2, 2024
4d59b10
7311: Rework DownloadReceiptsStep
Matilda-Clerke Oct 3, 2024
8718102
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 3, 2024
e63f473
7311: Make changes as discussed in walkthrough meeting
Matilda-Clerke Oct 3, 2024
fae39a8
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 3, 2024
d1847f2
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 4, 2024
07852dc
7311: Update after merge and make discussed changes from walkthrough …
Matilda-Clerke Oct 4, 2024
c477d70
7311: Change to regular HashMap
Matilda-Clerke Oct 4, 2024
6c57a7c
7311: Remove runtime exception again
Matilda-Clerke Oct 4, 2024
6d2cb95
7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior
Matilda-Clerke Oct 6, 2024
d84520a
7311: Rename getPeerTaskBehavior to getPeerTaskRetryBehavior
Matilda-Clerke Oct 6, 2024
77ed748
Merge remote-tracking branch 'origin/7311-add-peertask-foundation-cod…
Matilda-Clerke Oct 6, 2024
0896e31
7311: Rework PeerTaskExecutor retry system to be 0-based
Matilda-Clerke Oct 6, 2024
5f924c4
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 6, 2024
b13ac92
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 6, 2024
5006b39
7311: Fix up compile errors after merge
Matilda-Clerke Oct 6, 2024
07f3a7e
7311: Fix broken DownloadReceiptsStepTest test
Matilda-Clerke Oct 6, 2024
493ac91
7311: Move GetReceipts to services worker for parallelism
Matilda-Clerke Oct 7, 2024
1a30174
7311: Refactor peer task system usage in DownloadReceiptsStep to bett…
Matilda-Clerke Oct 7, 2024
84af9f9
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Oct 7, 2024
2865625
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 7, 2024
2f86ed9
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Oct 7, 2024
82cedb0
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 7, 2024
bdd96ba
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 8, 2024
c047f42
7311: Remove unused async methods in PeerTaskExecutor
Matilda-Clerke Oct 8, 2024
5aa6b0b
7311: Return Optional<EthPeer> in PeerSelector.getPeer and utilise ex…
Matilda-Clerke Oct 8, 2024
4fd4724
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 8, 2024
d6120b0
7311: Update after merge
Matilda-Clerke Oct 8, 2024
8becdb3
7311: Redo getPeer again to include hasAvailableRequestCapacity check
Matilda-Clerke Oct 8, 2024
4ad85e8
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 8, 2024
86a1f0b
7311: Add protocol spec supplier to GetReceiptsFromPeerTask
Matilda-Clerke Oct 8, 2024
8186a77
7311: Rework getPeer again to use LEAST_TO_MOST_BUSY comparator
Matilda-Clerke Oct 8, 2024
37b0ec2
7311: Import PeerNotConnected class instead of using fully qualified …
Matilda-Clerke Oct 8, 2024
545fd5c
7311: Change to specifying retry counts in PeerTask instead of behavi…
Matilda-Clerke Oct 9, 2024
7bd048b
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 9, 2024
e9d08f3
7311: clean up after merge
Matilda-Clerke Oct 9, 2024
20478d3
7311: clean up after merge
Matilda-Clerke Oct 9, 2024
4f544f4
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 9, 2024
66a9de2
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 9, 2024
3ddfe71
7311: Fix up javadoc
Matilda-Clerke Oct 10, 2024
1c268b7
7311: Add additional metrics to PeerTaskExecutor
Matilda-Clerke Oct 10, 2024
b06f38b
7311: Add Predicate to PeerTask to check for partial success
Matilda-Clerke Oct 10, 2024
09ee1c8
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 10, 2024
3c12d3d
7311: Fix incorrect name on isPartialSuccessTest
Matilda-Clerke Oct 10, 2024
b1c47ae
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 10, 2024
4664db9
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 10, 2024
3b8b7d5
7311: Implement isPartialSuccess and add unit tests
Matilda-Clerke Oct 10, 2024
d66dd3a
7311: Add partialSuccessCounter and inflightRequestGauge in PeerTaskE…
Matilda-Clerke Oct 11, 2024
fa22e93
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 11, 2024
cff0099
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 11, 2024
a3f5d4a
7311: Also filter by whether a peer is fully validated
Matilda-Clerke Oct 11, 2024
382f7a5
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 11, 2024
714db0a
7311: Remove unneeded throws in RunnerTest
Matilda-Clerke Oct 11, 2024
3a68980
7311: Fix up inflight requests gauge in PeerTaskExecutor
Matilda-Clerke Oct 11, 2024
3ce476d
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 11, 2024
c422bc5
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 11, 2024
74aa7a0
7311: Update plugin api hash
Matilda-Clerke Oct 11, 2024
56c1f9d
7311: Update plugin api hash
Matilda-Clerke Oct 11, 2024
fe50d95
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 11, 2024
e733452
7311: Add javadoc to LabelledGauge.isLabelsObserved
Matilda-Clerke Oct 13, 2024
b3a252b
7311: Update plugin-api hash
Matilda-Clerke Oct 13, 2024
4f9cf52
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 13, 2024
af93824
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 13, 2024
3c96eba
7311: Update changelog
Matilda-Clerke Oct 13, 2024
e664a51
7311: Handle headers with no receipts as a special case in DownloadRe…
Matilda-Clerke Oct 15, 2024
7daf30f
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 15, 2024
c4e685d
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 15, 2024
6800cdd
7311: Complete merge
Matilda-Clerke Oct 15, 2024
44fd3a8
7311: Use taskName instead of className for labelNames
Matilda-Clerke Oct 16, 2024
ac1c4ed
7311: Use snake_case for metric names
Matilda-Clerke Oct 16, 2024
7503535
7311: Use _total metric name suffix
Matilda-Clerke Oct 16, 2024
a59bd30
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 16, 2024
ed25941
7311: rework partial success handling
Matilda-Clerke Oct 17, 2024
4f4b091
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 17, 2024
c396fb5
7311: Update GetReceiptsFromPeerTask with partialSuccess changes
Matilda-Clerke Oct 17, 2024
5a79636
7311: Add default implementation to LabelledGauge.isLabelsObserved
Matilda-Clerke Oct 17, 2024
0668083
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 17, 2024
b075a91
7311: Fix broken unit test
Matilda-Clerke Oct 17, 2024
74995bb
Merge branch 'main' into 7311-add-peertask-foundation-code
Matilda-Clerke Oct 17, 2024
17edf67
Merge branch 'refs/heads/7311-add-peertask-foundation-code' into 7311…
Matilda-Clerke Oct 17, 2024
e2fea48
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Oct 18, 2024
875375e
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Oct 20, 2024
095e31d
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Oct 23, 2024
c2205c4
7311: Rename parseResponse to processResponse
Matilda-Clerke Oct 23, 2024
1b9922f
7311: Wrap peer task system usage in ethScheduler call to match other…
Matilda-Clerke Oct 24, 2024
d827e68
7311: apply spotless
Matilda-Clerke Oct 27, 2024
26476ae
7311: Move check for empty trie hash into GetReceiptsFromPeerTask and…
Matilda-Clerke Oct 27, 2024
a357ecf
7311: spotless
Matilda-Clerke Oct 27, 2024
e7a13a4
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Oct 27, 2024
3d20f56
7311: Fix compile issue after merge
Matilda-Clerke Oct 27, 2024
f1db42e
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Oct 28, 2024
6ee320b
7311: Remove BodyValidator and update code and test to match
Matilda-Clerke Oct 28, 2024
5fcad25
7311: spotless
Matilda-Clerke Oct 28, 2024
1923996
7311: Fix up pre-fill and add test to test failure scenario
Matilda-Clerke Oct 29, 2024
84b422c
7311: Use ProtocolSchedule.anyMatch to find if any ProtocolSpecs are …
Matilda-Clerke Oct 29, 2024
8e6e2b0
7311: Only attempt to remove headers on successful requests
Matilda-Clerke Oct 29, 2024
24d8f99
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Oct 29, 2024
8e72f70
Merge branch 'main' into 7311-add-GetReceiptsFromPeerTask
Matilda-Clerke Oct 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskRequestSender;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.CheckpointBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator;
Expand Down Expand Up @@ -653,6 +655,8 @@ public BesuController build() {
}

final EthContext ethContext = new EthContext(ethPeers, ethMessages, snapMessages, scheduler);
final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(ethPeers, new PeerTaskRequestSender(), metricsSystem);
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);

Expand Down Expand Up @@ -704,6 +708,7 @@ public BesuController build() {
worldStateStorageCoordinator,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
ethProtocolManager,
pivotBlockSelector);
Expand Down Expand Up @@ -830,6 +835,7 @@ private TrieLogPruner createTrieLogPruner(
* @param worldStateStorageCoordinator the world state storage
* @param protocolContext the protocol context
* @param ethContext the eth context
* @param peerTaskExecutor the PeerTaskExecutor
* @param syncState the sync state
* @param ethProtocolManager the eth protocol manager
* @param pivotBlockSelector the pivot block selector
Expand All @@ -840,6 +846,7 @@ protected DefaultSynchronizer createSynchronizer(
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {
Expand All @@ -851,6 +858,7 @@ protected DefaultSynchronizer createSynchronizer(
worldStateStorageCoordinator,
ethProtocolManager.getBlockBroadcaster(),
ethContext,
peerTaskExecutor,
syncState,
dataDirectory,
storageProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
Expand Down Expand Up @@ -225,6 +226,7 @@ protected DefaultSynchronizer createSynchronizer(
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final ProtocolContext protocolContext,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {
Expand All @@ -235,6 +237,7 @@ protected DefaultSynchronizer createSynchronizer(
worldStateStorageCoordinator,
protocolContext,
ethContext,
peerTaskExecutor,
syncState,
ethProtocolManager,
pivotBlockSelector);
Expand Down
30 changes: 26 additions & 4 deletions besu/src/test/java/org/hyperledger/besu/RunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,26 +153,48 @@ public void fullSyncFromGenesis() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FULL, getFastSyncGenesis());
syncFromGenesis(SyncMode.FULL, getFastSyncGenesis(), false);
}

@Test
public void fullSyncFromGenesisUsingPeerTaskSystem() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FULL, getFastSyncGenesis(), true);
}

@Test
public void fastSyncFromGenesis() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FAST, getFastSyncGenesis());
syncFromGenesis(SyncMode.FAST, getFastSyncGenesis(), false);
}

@Test
public void fastSyncFromGenesisUsingPeerTaskSystem() throws Exception {
// set merge flag to false, otherwise this test can fail if a merge test runs first
MergeConfiguration.setMergeEnabled(false);

syncFromGenesis(SyncMode.FAST, getFastSyncGenesis(), true);
}

private void syncFromGenesis(final SyncMode mode, final GenesisConfigFile genesisConfig)
private void syncFromGenesis(
final SyncMode mode,
final GenesisConfigFile genesisConfig,
final boolean isPeerTaskSystemEnabled)
throws Exception {
final Path dataDirAhead = Files.createTempDirectory(temp, "db-ahead");
final Path dbAhead = dataDirAhead.resolve("database");
final int blockCount = 500;
final NodeKey aheadDbNodeKey = NodeKeyUtils.createFrom(KeyPairUtil.loadKeyPair(dataDirAhead));
final NodeKey behindDbNodeKey = NodeKeyUtils.generate();
final SynchronizerConfiguration syncConfigAhead =
SynchronizerConfiguration.builder().syncMode(SyncMode.FULL).build();
SynchronizerConfiguration.builder()
.syncMode(SyncMode.FULL)
.isPeerTaskSystemEnabled(isPeerTaskSystemEnabled)
.build();
final ObservableMetricsSystem noOpMetricsSystem = new NoOpMetricsSystem();
final var miningParameters = MiningParameters.newDefault();
final var dataStorageConfiguration = DataStorageConfiguration.DEFAULT_FOREST_CONFIG;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ public JsonRpcResponse execute(
private Optional<RpcErrorType> validateMethodAvailability(final JsonRpcRequest request) {
final String name = request.getMethod();

if (LOG.isDebugEnabled()) {
if (LOG.isTraceEnabled()) {
final JsonArray params = JsonObject.mapFrom(request).getJsonArray("params");
LOG.debug("JSON-RPC request -> {} {}", name, params);
LOG.trace("JSON-RPC request -> {} {}", name, params);
}

final JsonRpcMethod method = rpcMethods.get(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ public void processMessage(final Capability cap, final Message message) {
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerNewConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection);

final Capability cap = connection.capability(getSupportedProtocol());
final ForkId latestForkId =
cap.getVersion() >= 64 ? forkIdManager.getForkIdForChainHead() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public void executeServiceTask(final Runnable command) {
servicesExecutor.execute(command);
}

public <T> CompletableFuture<Void> scheduleServiceTask(final Runnable task) {
public CompletableFuture<Void> scheduleServiceTask(final Runnable task) {
return CompletableFuture.runAsync(task, servicesExecutor);
}

Expand All @@ -156,6 +156,19 @@ public <T> CompletableFuture<T> scheduleServiceTask(final EthTask<T> task) {
return serviceFuture;
}

public <T> CompletableFuture<T> scheduleServiceTask(final Supplier<CompletableFuture<T>> future) {
final CompletableFuture<T> promise = new CompletableFuture<>();
final Future<?> workerFuture = servicesExecutor.submit(() -> propagateResult(future, promise));
// If returned promise is cancelled, cancel the worker future
promise.whenComplete(
(r, t) -> {
if (t instanceof CancellationException) {
workerFuture.cancel(false);
}
});
return promise;
}

public CompletableFuture<Void> startPipeline(final Pipeline<?> pipeline) {
final CompletableFuture<Void> pipelineFuture = pipeline.start(servicesExecutor);
pendingFutures.add(pipelineFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ public interface PeerTask<T> {
MessageData getRequestMessage();

/**
* Parses the MessageData response from the EthPeer
* Parses and processes the MessageData response from the EthPeer
*
* @param messageData the response MessageData to be parsed
* @return a T built from the response MessageData
* @throws InvalidPeerTaskResponseException if the response messageData is invalid
*/
T parseResponse(MessageData messageData) throws InvalidPeerTaskResponseException;
T processResponse(MessageData messageData) throws InvalidPeerTaskResponseException;

/**
* Gets the number of times this task may be attempted against other peers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public <T> PeerTaskExecutorResult<T> executeAgainstPeer(
MessageData responseMessageData =
requestSender.sendRequest(peerTask.getSubProtocol(), requestMessageData, peer);

result = peerTask.parseResponse(responseMessageData);
result = peerTask.processResponse(responseMessageData);
} finally {
inflightRequestCountForThisTaskClass.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask.task;

import static java.util.Collections.emptyList;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.peertask.InvalidPeerTaskResponseException;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTask;
import org.hyperledger.besu.ethereum.eth.messages.GetReceiptsMessage;
import org.hyperledger.besu.ethereum.eth.messages.ReceiptsMessage;
import org.hyperledger.besu.ethereum.mainnet.BodyValidation;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.SubProtocol;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

public class GetReceiptsFromPeerTask
implements PeerTask<Map<BlockHeader, List<TransactionReceipt>>> {

private final Collection<BlockHeader> blockHeaders;
private final ProtocolSchedule protocolSchedule;
private final Map<BlockHeader, List<TransactionReceipt>> receiptsByBlockHeader = new HashMap<>();
private final Map<Hash, List<BlockHeader>> headersByReceiptsRoot = new HashMap<>();
private final long requiredBlockchainHeight;

public GetReceiptsFromPeerTask(
final Collection<BlockHeader> blockHeaders, final ProtocolSchedule protocolSchedule) {
this.blockHeaders = new ArrayList<>(blockHeaders);
this.protocolSchedule = protocolSchedule;

// pre-fill any headers with an empty receipts root into the result map
this.blockHeaders.stream()
.filter(header -> header.getReceiptsRoot().equals(Hash.EMPTY_TRIE_HASH))
.forEach(header -> receiptsByBlockHeader.put(header, emptyList()));
this.blockHeaders.removeAll(receiptsByBlockHeader.keySet());

// group headers by their receipts root hash to reduce total number of receipts hashes requested
// for
this.blockHeaders.forEach(
header ->
headersByReceiptsRoot
.computeIfAbsent(header.getReceiptsRoot(), key -> new ArrayList<>())
.add(header));

// calculate the minimum required blockchain height a peer will need to be able to fulfil this
// request
requiredBlockchainHeight =
this.blockHeaders.stream()
.mapToLong(BlockHeader::getNumber)
.max()
.orElse(BlockHeader.GENESIS_BLOCK_NUMBER);
}

@Override
public SubProtocol getSubProtocol() {
return EthProtocol.get();
}

@Override
public MessageData getRequestMessage() {
// Since we have to match up the data by receipt root, we only need to request receipts
// for one of the headers with each unique receipt root.
final List<Hash> blockHashes =
headersByReceiptsRoot.values().stream()
.map(headers -> headers.getFirst().getHash())
.toList();
return GetReceiptsMessage.create(blockHashes);
}

@Override
public Map<BlockHeader, List<TransactionReceipt>> processResponse(final MessageData messageData)
throws InvalidPeerTaskResponseException {
if (messageData == null) {
throw new InvalidPeerTaskResponseException();
}
final ReceiptsMessage receiptsMessage = ReceiptsMessage.readFrom(messageData);
final List<List<TransactionReceipt>> receiptsByBlock = receiptsMessage.receipts();
// take a copy of the pre-filled receiptsByBlockHeader, to ensure idempotency of subsequent
// calls to processResponse
final Map<BlockHeader, List<TransactionReceipt>> receiptsByHeader =
new HashMap<>(receiptsByBlockHeader);
if (!blockHeaders.isEmpty()) {
if (receiptsByBlock.isEmpty() || receiptsByBlock.size() > blockHeaders.size()) {
throw new InvalidPeerTaskResponseException();
}

for (final List<TransactionReceipt> receiptsInBlock : receiptsByBlock) {
final List<BlockHeader> blockHeaders =
headersByReceiptsRoot.get(BodyValidation.receiptsRoot(receiptsInBlock));
if (blockHeaders == null) {
// Contains receipts that we didn't request, so mustn't be the response we're looking for.
throw new InvalidPeerTaskResponseException();
}
blockHeaders.forEach(header -> receiptsByHeader.put(header, receiptsInBlock));
}
}
return receiptsByHeader;
}

@Override
public Predicate<EthPeer> getPeerRequirementFilter() {
return (ethPeer) ->
ethPeer.getProtocolName().equals(getSubProtocol().getName())
&& (protocolSchedule.anyMatch((ps) -> ps.spec().isPoS())
|| ethPeer.chainState().getEstimatedHeight() >= requiredBlockchainHeight);
}

@Override
public boolean isSuccess(final Map<BlockHeader, List<TransactionReceipt>> result) {
return !result.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.core.Synchronizer;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.sync.checkpointsync.CheckpointDownloaderFactory;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncDownloader;
import org.hyperledger.besu.ethereum.eth.sync.fastsync.FastSyncState;
Expand Down Expand Up @@ -82,6 +83,7 @@ public DefaultSynchronizer(
final WorldStateStorageCoordinator worldStateStorageCoordinator,
final BlockBroadcaster blockBroadcaster,
final EthContext ethContext,
final PeerTaskExecutor peerTaskExecutor,
final SyncState syncState,
final Path dataDirectory,
final StorageProvider storageProvider,
Expand Down Expand Up @@ -147,6 +149,7 @@ public DefaultSynchronizer(
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
Expand All @@ -163,6 +166,7 @@ public DefaultSynchronizer(
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
Expand All @@ -179,6 +183,7 @@ public DefaultSynchronizer(
protocolContext,
metricsSystem,
ethContext,
peerTaskExecutor,
worldStateStorageCoordinator,
syncState,
clock,
Expand Down
Loading
Loading