Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 67b73c4

Browse files
tomusdrwbkchr
authored andcommitted
Fix asynchronous transaction rejections. (#3817)
* Fix handling transaction pool errors. * Add test. * Review suggestions.
1 parent 1ae7a90 commit 67b73c4

File tree

4 files changed

+73
-41
lines changed

4 files changed

+73
-41
lines changed

core/rpc/api/src/subscriptions.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ impl Subscriptions {
6969
}
7070
}
7171

72+
/// Borrows the internal task executor.
73+
///
74+
/// This can be used to spawn additional tasks on the underyling event loop.
75+
pub fn executor(&self) -> &TaskExecutor {
76+
&self.executor
77+
}
78+
7279
/// Creates new subscription for given subscriber.
7380
///
7481
/// Second parameter is a function that converts Subscriber sink into a future.

core/rpc/src/author/mod.rs

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,9 @@ use log::warn;
2626
use client::{self, Client};
2727
use rpc::futures::{
2828
Sink, Future,
29-
stream::Stream as _,
3029
future::result,
3130
};
32-
use futures03::{StreamExt as _, compat::Compat};
31+
use futures03::{StreamExt as _, compat::Compat, future::ready};
3332
use api::Subscriptions;
3433
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
3534
use codec::{Encode, Decode};
@@ -162,42 +161,44 @@ impl<B, E, P, RA> AuthorApi<ExHash<P>, BlockHash<P>> for Author<B, E, P, RA> whe
162161
let best_block_hash = self.client.info().chain.best_hash;
163162
let dxt = <<P as PoolChainApi>::Block as traits::Block>::Extrinsic::decode(&mut &xt[..])
164163
.map_err(error::Error::from)?;
165-
Ok(self.pool
166-
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
167-
.boxed()
168-
.compat()
169-
.map_err(|e| e.into_pool_error()
170-
.map(error::Error::from)
171-
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
172-
))
164+
Ok(
165+
self.pool
166+
.submit_and_watch(&generic::BlockId::hash(best_block_hash), dxt)
167+
.map_err(|e| e.into_pool_error()
168+
.map(error::Error::from)
169+
.unwrap_or_else(|e| error::Error::Verification(Box::new(e)).into())
170+
)
171+
)
173172
};
174173

175-
let future_watcher = match submit() {
176-
Ok(future_watcher) => future_watcher,
177-
Err(err) => {
178-
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
179-
let _ = subscriber.reject(err.into());
180-
return;
181-
},
182-
};
183-
184-
// make 'future' watcher be a future with output = stream of watcher events
185-
let future_watcher = future_watcher
186-
.map_err(|err| { warn!("Failed to submit extrinsic: {}", err); })
187-
.map(|watcher| Compat::new(watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))));
188-
189-
// convert a 'future' watcher into the stream with single element = stream of watcher events
190-
let watcher_stream = future_watcher.into_stream();
191-
192-
// and now flatten the 'watcher_stream' so that we'll have the stream with watcher events
193-
let watcher_stream = watcher_stream.flatten();
174+
let subscriptions = self.subscriptions.clone();
175+
let future = ready(submit())
176+
.and_then(|res| res)
177+
// convert the watcher into a `Stream`
178+
.map(|res| res.map(|watcher| watcher.into_stream().map(|v| Ok::<_, ()>(Ok(v)))))
179+
// now handle the import result,
180+
// start a new subscrition
181+
.map(move |result| match result {
182+
Ok(watcher) => {
183+
subscriptions.add(subscriber, move |sink| {
184+
sink
185+
.sink_map_err(|_| unimplemented!())
186+
.send_all(Compat::new(watcher))
187+
.map(|_| ())
188+
});
189+
},
190+
Err(err) => {
191+
warn!("Failed to submit extrinsic: {}", err);
192+
// reject the subscriber (ignore errors - we don't care if subscriber is no longer there).
193+
let _ = subscriber.reject(err.into());
194+
},
195+
});
194196

195-
self.subscriptions.add(subscriber, move |sink| {
196-
sink
197-
.sink_map_err(|e| warn!("Error sending notifications: {:?}", e))
198-
.send_all(watcher_stream)
199-
.map(|_| ())
200-
});
197+
let res = self.subscriptions.executor()
198+
.execute(Box::new(Compat::new(future.map(|_| Ok(())))));
199+
if res.is_err() {
200+
warn!("Error spawning subscription RPC task.");
201+
}
201202
}
202203

203204
fn unwatch_extrinsic(&self, _metadata: Option<Self::Metadata>, id: SubscriptionId) -> Result<bool> {

core/rpc/src/author/tests.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,19 @@ use super::*;
1919
use std::sync::Arc;
2020
use assert_matches::assert_matches;
2121
use codec::Encode;
22-
use transaction_pool::{
23-
txpool::Pool,
24-
FullChainApi,
25-
};
2622
use primitives::{
2723
H256, blake2_256, hexdisplay::HexDisplay, testing::{ED25519, SR25519, KeyStore}, ed25519,
2824
crypto::Pair,
2925
};
26+
use rpc::futures::Stream as _;
3027
use test_client::{
3128
self, AccountKeyring, runtime::{Extrinsic, Transfer, SessionKeys}, DefaultTestClientBuilderExt,
3229
TestClientBuilderExt,
3330
};
31+
use transaction_pool::{
32+
txpool::Pool,
33+
FullChainApi,
34+
};
3435
use tokio::runtime;
3536

3637
fn uxt(sender: AccountKeyring, nonce: u64) -> Extrinsic {
@@ -102,7 +103,7 @@ fn should_watch_extrinsic() {
102103
subscriptions: Subscriptions::new(Arc::new(runtime.executor())),
103104
keystore: keystore.clone(),
104105
};
105-
let (subscriber, id_rx, data) = ::jsonrpc_pubsub::typed::Subscriber::new_test("test");
106+
let (subscriber, id_rx, data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");
106107

107108
// when
108109
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 0).encode().into());
@@ -132,6 +133,29 @@ fn should_watch_extrinsic() {
132133
);
133134
}
134135

136+
#[test]
137+
fn should_return_watch_validation_error() {
138+
//given
139+
let mut runtime = runtime::Runtime::new().unwrap();
140+
let client = Arc::new(test_client::new());
141+
let pool = Arc::new(Pool::new(Default::default(), FullChainApi::new(client.clone())));
142+
let keystore = KeyStore::new();
143+
let p = Author {
144+
client,
145+
pool: pool.clone(),
146+
subscriptions: Subscriptions::new(Arc::new(runtime.executor())),
147+
keystore: keystore.clone(),
148+
};
149+
let (subscriber, id_rx, _data) = jsonrpc_pubsub::typed::Subscriber::new_test("test");
150+
151+
// when
152+
p.watch_extrinsic(Default::default(), subscriber, uxt(AccountKeyring::Alice, 179).encode().into());
153+
154+
// then
155+
let res = runtime.block_on(id_rx).unwrap();
156+
assert!(res.is_err(), "Expected the transaction to be rejected as invalid.");
157+
}
158+
135159
#[test]
136160
fn should_return_pending_extrinsics() {
137161
let runtime = runtime::Runtime::new().unwrap();

core/transaction-pool/graph/src/pool.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub trait ChainApi: Send + Sync {
6666
/// Error type.
6767
type Error: From<error::Error> + error::IntoPoolError;
6868
/// Validate transaction future.
69-
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send;
69+
type ValidationFuture: Future<Output=Result<TransactionValidity, Self::Error>> + Send + Unpin;
7070

7171
/// Verify extrinsic at given block.
7272
fn validate_transaction(

0 commit comments

Comments
 (0)