Skip to content

Commit 8517939

Browse files
authored
Add support for transaction_v1_broadcast (#1724)
* Add support for `transaction_v1_broadcast` * PR link * Tweaks * Change error type for stop
1 parent f06c591 commit 8517939

File tree

5 files changed

+196
-37
lines changed

5 files changed

+196
-37
lines changed

lib/src/json_rpc/methods.rs

+3
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,9 @@ define_methods! {
498498
sudo_unstable_p2pDiscover(multiaddr: Cow<'a, str>) -> (),
499499
sudo_unstable_version() -> Cow<'a, str>,
500500

501+
transaction_v1_broadcast(transaction: HexString) -> Cow<'a, str>,
502+
transaction_v1_stop(#[rename = "operationId"] operation_id: Cow<'a, str>) -> (),
503+
501504
transactionWatch_unstable_submitAndWatch(transaction: HexString) -> Cow<'a, str>,
502505
transactionWatch_unstable_unwatch(subscription: Cow<'a, str>) -> (),
503506

lib/src/json_rpc/service/client_main_task.rs

+7
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,7 @@ impl ClientMainTask {
469469
| methods::MethodCall::chain_subscribeNewHeads { .. }
470470
| methods::MethodCall::state_subscribeRuntimeVersion { .. }
471471
| methods::MethodCall::state_subscribeStorage { .. }
472+
| methods::MethodCall::transaction_v1_broadcast { .. }
472473
| methods::MethodCall::transactionWatch_unstable_submitAndWatch { .. }
473474
| methods::MethodCall::sudo_network_unstable_watch { .. }
474475
| methods::MethodCall::chainHead_unstable_follow { .. } => {
@@ -539,6 +540,9 @@ impl ClientMainTask {
539540
methods::MethodCall::author_unwatchExtrinsic { subscription, .. }
540541
| methods::MethodCall::state_unsubscribeRuntimeVersion { subscription, .. }
541542
| methods::MethodCall::state_unsubscribeStorage { subscription, .. }
543+
| methods::MethodCall::transaction_v1_stop {
544+
operation_id: subscription,
545+
}
542546
| methods::MethodCall::transactionWatch_unstable_unwatch { subscription, .. }
543547
| methods::MethodCall::sudo_network_unstable_unwatch { subscription, .. }
544548
| methods::MethodCall::chainHead_unstable_unfollow {
@@ -562,6 +566,9 @@ impl ClientMainTask {
562566
methods::MethodCall::state_unsubscribeStorage { .. } => {
563567
methods::Response::state_unsubscribeStorage(true)
564568
}
569+
methods::MethodCall::transaction_v1_stop { .. } => {
570+
methods::Response::transaction_v1_stop(())
571+
}
565572
methods::MethodCall::transactionWatch_unstable_unwatch {
566573
..
567574
} => methods::Response::transactionWatch_unstable_unwatch(()),

light-base/src/json_rpc_service/background.rs

+146-26
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,13 @@ struct Background<TPlat: PlatformRef> {
123123
/// List of all active `state_subscribeRuntimeVersion` subscriptions, indexed by the
124124
/// subscription ID.
125125
runtime_version_subscriptions: hashbrown::HashSet<String, fnv::FnvBuildHasher>,
126-
/// List of all active `author_submitAndWatchExtrinsic` and
126+
/// List of all active `author_submitAndWatchExtrinsic`, `transaction_v1_broadcast`, and
127127
/// `transactionWatch_unstable_submitAndWatch` subscriptions, indexed by the subscription ID.
128+
/// When it comes to `author_submitAndWatchExtrinsic` and
129+
/// `transactionWatch_unstable_submitAndWatch`, transactions are removed from this list when
130+
/// they are dropped from the transactions service. When it comes
131+
/// to `transaction_v1_broadcast`, transactions are left forever until the API user
132+
/// unsubscribes.
128133
transactions_subscriptions: hashbrown::HashMap<String, TransactionWatch, fnv::FnvBuildHasher>,
129134

130135
/// List of all active `state_subscribeStorage` subscriptions, indexed by the subscription ID.
@@ -419,8 +424,14 @@ struct TransactionWatch {
419424
enum TransactionWatchTy {
420425
/// `author_submitAndWatchExtrinsic`.
421426
Legacy,
427+
/// `transaction_v1_broadcast`.
428+
NewApi {
429+
/// A copy of the body of the transaction is kept, as it might be necessary to re-insert
430+
/// it in the transactions service later, for example if it reports having crashed.
431+
transaction_bytes: Vec<u8>,
432+
},
422433
/// `transactionWatch_unstable_submitAndWatch`.
423-
NewApi,
434+
NewApiWatch,
424435
}
425436

426437
/// See [`Background::state_get_keys_paged_cache`].
@@ -771,6 +782,8 @@ pub(super) async fn run<TPlat: PlatformRef>(
771782
| methods::MethodCall::rpc_methods { .. }
772783
| methods::MethodCall::sudo_unstable_p2pDiscover { .. }
773784
| methods::MethodCall::sudo_unstable_version { .. }
785+
| methods::MethodCall::transaction_v1_broadcast { .. }
786+
| methods::MethodCall::transaction_v1_stop { .. }
774787
| methods::MethodCall::transactionWatch_unstable_submitAndWatch { .. }
775788
| methods::MethodCall::transactionWatch_unstable_unwatch { .. }
776789
| methods::MethodCall::sudo_network_unstable_watch { .. }
@@ -837,7 +850,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
837850

838851
let mut transaction_updates = Box::pin(
839852
me.transactions_service
840-
.submit_and_watch_transaction(transaction.0, 16)
853+
.submit_and_watch_transaction(transaction.0, 16, true)
841854
.await,
842855
);
843856

@@ -2504,43 +2517,62 @@ pub(super) async fn run<TPlat: PlatformRef>(
25042517
.await;
25052518
}
25062519

2507-
methods::MethodCall::transactionWatch_unstable_submitAndWatch {
2508-
transaction: methods::HexString(transaction),
2509-
} => {
2520+
request_parsed @ (methods::MethodCall::transaction_v1_broadcast { .. }
2521+
| methods::MethodCall::transactionWatch_unstable_submitAndWatch { .. }) => {
2522+
let (transaction, watched) = match request_parsed {
2523+
methods::MethodCall::transaction_v1_broadcast {
2524+
transaction: methods::HexString(transaction),
2525+
} => (transaction, false),
2526+
methods::MethodCall::transactionWatch_unstable_submitAndWatch {
2527+
transaction: methods::HexString(transaction),
2528+
} => (transaction, true),
2529+
_ => unreachable!()
2530+
};
2531+
25102532
let subscription_id = {
25112533
let mut subscription_id = [0u8; 32];
25122534
me.randomness.fill_bytes(&mut subscription_id);
25132535
bs58::encode(subscription_id).into_string()
25142536
};
25152537

2516-
let mut transaction_updates = Box::pin(
2517-
me.transactions_service
2518-
.submit_and_watch_transaction(transaction, 16)
2519-
.await,
2520-
);
2521-
25222538
let _prev_value = me.transactions_subscriptions.insert(
25232539
subscription_id.clone(),
25242540
TransactionWatch {
25252541
included_block: None,
25262542
num_broadcasted_peers: 0,
2527-
ty: TransactionWatchTy::NewApi,
2543+
ty: if watched { TransactionWatchTy::NewApiWatch } else {
2544+
TransactionWatchTy::NewApi { transaction_bytes: transaction.clone() }
2545+
},
25282546
},
25292547
);
25302548
debug_assert!(_prev_value.is_none());
25312549

2550+
let mut transaction_updates = Box::pin(
2551+
me.transactions_service
2552+
.submit_and_watch_transaction(transaction, 16, watched)
2553+
.await,
2554+
);
2555+
25322556
let _ = me
25332557
.responses_tx
2534-
.send(
2558+
.send(if watched {
25352559
methods::Response::transactionWatch_unstable_submitAndWatch(
25362560
Cow::Borrowed(&subscription_id),
25372561
)
2562+
} else {
2563+
methods::Response::transaction_v1_broadcast(
2564+
Cow::Borrowed(&subscription_id),
2565+
)
2566+
}
25382567
.to_json_response(request_id_json),
25392568
)
25402569
.await;
25412570

25422571
// A task is started that will yield when the transactions service
25432572
// generates a notification.
2573+
// Note that we do that even for `transaction_v1_broadcast`, as it is
2574+
// important to pull notifications from the channel in order to not
2575+
// clog it.
25442576
me.background_tasks.push(Box::pin(async move {
25452577
let Some(status) = transaction_updates.as_mut().next().await else {
25462578
unreachable!()
@@ -2553,11 +2585,41 @@ pub(super) async fn run<TPlat: PlatformRef>(
25532585
}));
25542586
}
25552587

2588+
methods::MethodCall::transaction_v1_stop { operation_id } => {
2589+
let exists = me
2590+
.transactions_subscriptions
2591+
.get(&*operation_id)
2592+
.map_or(false, |sub| {
2593+
matches!(sub.ty, TransactionWatchTy::NewApi { .. })
2594+
});
2595+
if exists {
2596+
me.transactions_subscriptions.remove(&*operation_id);
2597+
let _ = me
2598+
.responses_tx
2599+
.send(
2600+
methods::Response::transaction_v1_stop(())
2601+
.to_json_response(request_id_json),
2602+
)
2603+
.await;
2604+
} else {
2605+
let _ = me
2606+
.responses_tx
2607+
.send(parse::build_error_response(
2608+
request_id_json,
2609+
json_rpc::parse::ErrorResponse::InvalidParams,
2610+
None,
2611+
))
2612+
.await;
2613+
}
2614+
}
2615+
25562616
methods::MethodCall::transactionWatch_unstable_unwatch { subscription } => {
25572617
let exists = me
25582618
.transactions_subscriptions
25592619
.get(&*subscription)
2560-
.map_or(false, |sub| matches!(sub.ty, TransactionWatchTy::NewApi));
2620+
.map_or(false, |sub| {
2621+
matches!(sub.ty, TransactionWatchTy::NewApiWatch)
2622+
});
25612623
if exists {
25622624
me.transactions_subscriptions.remove(&*subscription);
25632625
}
@@ -4785,7 +4847,55 @@ pub(super) async fn run<TPlat: PlatformRef>(
47854847
continue;
47864848
};
47874849

4788-
match (drop_reason, transaction_watch.ty) {
4850+
match (drop_reason, &transaction_watch.ty) {
4851+
(
4852+
transactions_service::DropReason::GapInChain
4853+
| transactions_service::DropReason::Crashed,
4854+
TransactionWatchTy::NewApi { transaction_bytes },
4855+
) => {
4856+
// In case of `transaction_v1_broadcast`, we re-submit the transaction
4857+
// if it was dropped for a temporary reasons.
4858+
let mut new_watcher = Box::pin(
4859+
me.transactions_service
4860+
.submit_and_watch_transaction(transaction_bytes.clone(), 16, false)
4861+
.await,
4862+
);
4863+
4864+
let _prev_value = me
4865+
.transactions_subscriptions
4866+
.insert(subscription_id.clone(), transaction_watch);
4867+
debug_assert!(_prev_value.is_none());
4868+
4869+
// Push a new background task that waits for the next notification.
4870+
me.background_tasks.push(Box::pin(async move {
4871+
let Some(status) = new_watcher.as_mut().next().await else {
4872+
unreachable!()
4873+
};
4874+
Event::TransactionEvent {
4875+
subscription_id,
4876+
event: status,
4877+
watcher: new_watcher,
4878+
}
4879+
}));
4880+
}
4881+
4882+
(
4883+
transactions_service::DropReason::Finalized { .. }
4884+
| transactions_service::DropReason::Invalid(_)
4885+
| transactions_service::DropReason::MaxPendingTransactionsReached
4886+
| transactions_service::DropReason::ValidateError(_),
4887+
TransactionWatchTy::NewApi { .. },
4888+
) => {
4889+
// In case of `transaction_v1_broadcast`, the transaction is re-inserted
4890+
// in the list, but no new notification-generating task is pushed, making
4891+
// the transaction effectively dead and waiting for `transaction_v1_stop`
4892+
// to be called to remove it.
4893+
let _prev_value = me
4894+
.transactions_subscriptions
4895+
.insert(subscription_id.clone(), transaction_watch);
4896+
debug_assert!(_prev_value.is_none());
4897+
}
4898+
47894899
(transactions_service::DropReason::GapInChain, TransactionWatchTy::Legacy)
47904900
| (
47914901
transactions_service::DropReason::MaxPendingTransactionsReached,
@@ -4808,7 +4918,10 @@ pub(super) async fn run<TPlat: PlatformRef>(
48084918
)
48094919
.await;
48104920
}
4811-
(transactions_service::DropReason::GapInChain, TransactionWatchTy::NewApi) => {
4921+
(
4922+
transactions_service::DropReason::GapInChain,
4923+
TransactionWatchTy::NewApiWatch,
4924+
) => {
48124925
let _ = me
48134926
.responses_tx
48144927
.send(
@@ -4825,7 +4938,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
48254938
}
48264939
(
48274940
transactions_service::DropReason::MaxPendingTransactionsReached,
4828-
TransactionWatchTy::NewApi,
4941+
TransactionWatchTy::NewApiWatch,
48294942
) => {
48304943
let _ = me
48314944
.responses_tx
@@ -4843,7 +4956,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
48434956
}
48444957
(
48454958
transactions_service::DropReason::Invalid(error),
4846-
TransactionWatchTy::NewApi,
4959+
TransactionWatchTy::NewApiWatch,
48474960
) => {
48484961
let _ = me
48494962
.responses_tx
@@ -4860,7 +4973,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
48604973
}
48614974
(
48624975
transactions_service::DropReason::ValidateError(error),
4863-
TransactionWatchTy::NewApi,
4976+
TransactionWatchTy::NewApiWatch,
48644977
) => {
48654978
let _ = me
48664979
.responses_tx
@@ -4875,7 +4988,10 @@ pub(super) async fn run<TPlat: PlatformRef>(
48754988
)
48764989
.await;
48774990
}
4878-
(transactions_service::DropReason::Crashed, TransactionWatchTy::NewApi) => {
4991+
(
4992+
transactions_service::DropReason::Crashed,
4993+
TransactionWatchTy::NewApiWatch,
4994+
) => {
48794995
let _ = me
48804996
.responses_tx
48814997
.send(
@@ -4909,7 +5025,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
49095025
}
49105026
(
49115027
transactions_service::DropReason::Finalized { block_hash, index },
4912-
TransactionWatchTy::NewApi,
5028+
TransactionWatchTy::NewApiWatch,
49135029
) => {
49145030
let _ = me
49155031
.responses_tx
@@ -4946,6 +5062,10 @@ pub(super) async fn run<TPlat: PlatformRef>(
49465062
};
49475063

49485064
match (event, &transaction_watch.ty) {
5065+
(_, TransactionWatchTy::NewApi { .. }) => {
5066+
// Events are ignored when it comes to `transaction_v1_broadcast`.
5067+
}
5068+
49495069
(
49505070
transactions_service::TransactionStatus::Broadcast(peers),
49515071
TransactionWatchTy::Legacy,
@@ -4965,7 +5085,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
49655085
}
49665086
(
49675087
transactions_service::TransactionStatus::Broadcast(peers),
4968-
TransactionWatchTy::NewApi,
5088+
TransactionWatchTy::NewApiWatch,
49695089
) => {
49705090
transaction_watch.num_broadcasted_peers += peers.len();
49715091
let _ = me
@@ -4993,7 +5113,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
49935113
}
49945114
(
49955115
transactions_service::TransactionStatus::Validated,
4996-
TransactionWatchTy::NewApi,
5116+
TransactionWatchTy::NewApiWatch,
49975117
) => {
49985118
let _ = me
49995119
.responses_tx
@@ -5052,7 +5172,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
50525172
transactions_service::TransactionStatus::IncludedBlockUpdate {
50535173
block_hash: Some((block_hash, index)),
50545174
},
5055-
TransactionWatchTy::NewApi,
5175+
TransactionWatchTy::NewApiWatch,
50565176
) => {
50575177
transaction_watch.included_block = Some(block_hash);
50585178
let _ = me
@@ -5076,7 +5196,7 @@ pub(super) async fn run<TPlat: PlatformRef>(
50765196
transactions_service::TransactionStatus::IncludedBlockUpdate {
50775197
block_hash: None,
50785198
},
5079-
TransactionWatchTy::NewApi,
5199+
TransactionWatchTy::NewApiWatch,
50805200
) => {
50815201
let _ = me
50825202
.responses_tx

0 commit comments

Comments
 (0)