Skip to content

Commit 4870985

Browse files
authored
Merge pull request #252 from ekexium/batch-get-for-update
Fix batch_get_for_update
2 parents 9320198 + c61551c commit 4870985

File tree

8 files changed

+310
-115
lines changed

8 files changed

+310
-115
lines changed

src/request/mod.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ use tikv_client_store::{HasError, Request};
1010

1111
pub use self::{
1212
plan::{
13-
Collect, CollectError, DefaultProcessor, Dispatch, ExtractError, Merge, MergeResponse,
14-
MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion,
13+
Collect, CollectAndMatchKey, CollectError, DefaultProcessor, Dispatch, ExtractError,
14+
HasKeys, Merge, MergeResponse, MultiRegion, Plan, PreserveKey, Process, ProcessResponse,
15+
ResolveLock, RetryRegion,
1516
},
1617
plan_builder::{PlanBuilder, SingleKey},
1718
shard::Shardable,

src/request/plan.rs

+118-6
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ use crate::{
66
request::{KvRequest, Shardable},
77
stats::tikv_stats,
88
transaction::{resolve_locks, HasLocks},
9-
Error, Result,
9+
util::iter::FlatMapOkIterExt,
10+
Error, Key, KvPair, Result, Value,
1011
};
1112
use async_trait::async_trait;
1213
use futures::{prelude::*, stream::StreamExt};
1314
use std::{marker::PhantomData, sync::Arc};
15+
use tikv_client_proto::kvrpcpb;
1416
use tikv_client_store::{HasError, HasRegionError, KvClient};
1517

1618
/// A plan for how to execute a request. A user builds up a plan with various
@@ -55,6 +57,12 @@ impl<Req: KvRequest> Plan for Dispatch<Req> {
5557
}
5658
}
5759

60+
impl<Req: KvRequest + HasKeys> HasKeys for Dispatch<Req> {
61+
fn get_keys(&self) -> Vec<Key> {
62+
self.request.get_keys()
63+
}
64+
}
65+
5866
pub struct MultiRegion<P: Plan, PdC: PdClient> {
5967
pub(super) inner: P,
6068
pub pd_client: Arc<PdC>,
@@ -123,6 +131,12 @@ impl<In: Clone + Send + Sync + 'static, P: Plan<Result = Vec<Result<In>>>, M: Me
123131
#[derive(Clone, Copy)]
124132
pub struct Collect;
125133

134+
/// A merge strategy to be used with
135+
/// [`preserve_keys`](super::plan_builder::PlanBuilder::preserve_keys).
136+
/// It matches the keys preserved before and the values returned in the response.
137+
#[derive(Clone, Debug)]
138+
pub struct CollectAndMatchKey;
139+
126140
/// A merge strategy which returns an error if any response is an error and
127141
/// otherwise returns a Vec of the results.
128142
#[derive(Clone, Copy)]
@@ -256,6 +270,17 @@ where
256270
}
257271
}
258272

273+
impl<P: Plan + HasKeys, PdC: PdClient> HasKeys for ResolveLock<P, PdC> {
274+
fn get_keys(&self) -> Vec<Key> {
275+
self.inner.get_keys()
276+
}
277+
}
278+
279+
/// When executed, the plan extracts errors from its inner plan, and
280+
/// returns an `Err` wrapping the error.
281+
///
282+
/// The errors come from two places: `Err` from inner plans, and `Ok(response)`
283+
/// where `response` contains unresolved errors (`error` and `region_error`).
259284
pub struct ExtractError<P: Plan> {
260285
pub inner: P,
261286
}
@@ -268,11 +293,6 @@ impl<P: Plan> Clone for ExtractError<P> {
268293
}
269294
}
270295

271-
/// When executed, the plan extracts errors from its inner plan, and
272-
/// returns an `Err` wrapping the error.
273-
///
274-
/// The errors come from two places: `Err` from inner plans, and `Ok(response)`
275-
/// where `response` contains unresolved errors (`error` and `region_error`).
276296
#[async_trait]
277297
impl<P: Plan> Plan for ExtractError<P>
278298
where
@@ -292,6 +312,98 @@ where
292312
}
293313
}
294314

315+
/// When executed, the plan clones the keys and execute its inner plan, then
316+
/// returns `(keys, response)`.
317+
///
318+
/// It's useful when the information of keys are lost in the response but needed
319+
/// for processing.
320+
pub struct PreserveKey<P: Plan + HasKeys> {
321+
pub inner: P,
322+
}
323+
324+
impl<P: Plan + HasKeys> Clone for PreserveKey<P> {
325+
fn clone(&self) -> Self {
326+
PreserveKey {
327+
inner: self.inner.clone(),
328+
}
329+
}
330+
}
331+
332+
#[async_trait]
333+
impl<P> Plan for PreserveKey<P>
334+
where
335+
P: Plan + HasKeys,
336+
{
337+
type Result = ResponseAndKeys<P::Result>;
338+
339+
async fn execute(&self) -> Result<Self::Result> {
340+
let keys = self.inner.get_keys();
341+
let res = self.inner.execute().await?;
342+
Ok(ResponseAndKeys(res, keys))
343+
}
344+
}
345+
346+
pub trait HasKeys {
347+
fn get_keys(&self) -> Vec<Key>;
348+
}
349+
350+
// contains a response and the corresponding keys
351+
// currently only used for matching keys and values in pessimistic lock requests
352+
#[derive(Debug, Clone)]
353+
pub struct ResponseAndKeys<Resp>(Resp, Vec<Key>);
354+
355+
impl<Resp: HasError> HasError for ResponseAndKeys<Resp> {
356+
fn error(&mut self) -> Option<Error> {
357+
self.0.error()
358+
}
359+
}
360+
361+
impl<Resp: HasLocks> HasLocks for ResponseAndKeys<Resp> {
362+
fn take_locks(&mut self) -> Vec<tikv_client_proto::kvrpcpb::LockInfo> {
363+
self.0.take_locks()
364+
}
365+
}
366+
367+
impl<Resp: HasRegionError> HasRegionError for ResponseAndKeys<Resp> {
368+
fn region_error(&mut self) -> Option<Error> {
369+
self.0.region_error()
370+
}
371+
}
372+
373+
impl Merge<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>> for CollectAndMatchKey {
374+
type Out = Vec<KvPair>;
375+
376+
fn merge(
377+
&self,
378+
input: Vec<Result<ResponseAndKeys<kvrpcpb::PessimisticLockResponse>>>,
379+
) -> Result<Self::Out> {
380+
input
381+
.into_iter()
382+
.flat_map_ok(|ResponseAndKeys(mut resp, keys)| {
383+
let values = resp.take_values();
384+
let not_founds = resp.take_not_founds();
385+
let v: Vec<_> = if not_founds.is_empty() {
386+
// Legacy TiKV does not distiguish not existing key and existing key
387+
// that with empty value. We assume that key does not exist if value
388+
// is empty.
389+
let values: Vec<Value> = values.into_iter().filter(|v| v.is_empty()).collect();
390+
keys.into_iter().zip(values).map(From::from).collect()
391+
} else {
392+
assert_eq!(values.len(), not_founds.len());
393+
let values: Vec<Value> = values
394+
.into_iter()
395+
.zip(not_founds.into_iter())
396+
.filter_map(|(v, not_found)| if not_found { None } else { Some(v) })
397+
.collect();
398+
keys.into_iter().zip(values).map(From::from).collect()
399+
};
400+
// FIXME sucks to collect and re-iterate, but the iterators have different types
401+
v.into_iter()
402+
})
403+
.collect()
404+
}
405+
}
406+
295407
#[cfg(test)]
296408
mod test {
297409
use super::*;

src/request/plan_builder.rs

+16-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
22

3+
use super::PreserveKey;
34
use crate::{
45
backoff::Backoff,
56
pd::PdClient,
67
request::{
7-
DefaultProcessor, Dispatch, ExtractError, KvRequest, Merge, MergeResponse, MultiRegion,
8-
Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable,
8+
DefaultProcessor, Dispatch, ExtractError, HasKeys, KvRequest, Merge, MergeResponse,
9+
MultiRegion, Plan, Process, ProcessResponse, ResolveLock, RetryRegion, Shardable,
910
},
1011
store::Store,
1112
transaction::HasLocks,
@@ -161,6 +162,19 @@ impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
161162
}
162163
}
163164

165+
impl<PdC: PdClient, P: Plan + HasKeys> PlanBuilder<PdC, P, NoTarget>
166+
where
167+
P::Result: HasError,
168+
{
169+
pub fn preserve_keys(self) -> PlanBuilder<PdC, PreserveKey<P>, NoTarget> {
170+
PlanBuilder {
171+
pd_client: self.pd_client.clone(),
172+
plan: PreserveKey { inner: self.plan },
173+
phantom: PhantomData,
174+
}
175+
}
176+
}
177+
164178
impl<PdC: PdClient, P: Plan> PlanBuilder<PdC, P, Targetted>
165179
where
166180
P::Result: HasError,

src/request/shard.rs

+23-24
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,30 @@
22

33
use crate::{
44
pd::PdClient,
5-
request::{Dispatch, KvRequest, Plan, ResolveLock, RetryRegion},
5+
request::{Dispatch, HasKeys, KvRequest, Plan, PreserveKey, ResolveLock, RetryRegion},
66
store::Store,
77
Result,
88
};
99
use futures::stream::BoxStream;
1010
use std::sync::Arc;
1111

12+
macro_rules! impl_inner_shardable {
13+
() => {
14+
type Shard = P::Shard;
15+
16+
fn shards(
17+
&self,
18+
pd_client: &Arc<impl PdClient>,
19+
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
20+
self.inner.shards(pd_client)
21+
}
22+
23+
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
24+
self.inner.apply_shard(shard, store)
25+
}
26+
};
27+
}
28+
1229
pub trait Shardable {
1330
type Shard: Send;
1431

@@ -37,33 +54,15 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
3754
}
3855

3956
impl<P: Plan + Shardable, PdC: PdClient> Shardable for ResolveLock<P, PdC> {
40-
type Shard = P::Shard;
41-
42-
fn shards(
43-
&self,
44-
pd_client: &Arc<impl PdClient>,
45-
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
46-
self.inner.shards(pd_client)
47-
}
57+
impl_inner_shardable!();
58+
}
4859

49-
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
50-
self.inner.apply_shard(shard, store)
51-
}
60+
impl<P: Plan + HasKeys + Shardable> Shardable for PreserveKey<P> {
61+
impl_inner_shardable!();
5262
}
5363

5464
impl<P: Plan + Shardable, PdC: PdClient> Shardable for RetryRegion<P, PdC> {
55-
type Shard = P::Shard;
56-
57-
fn shards(
58-
&self,
59-
pd_client: &Arc<impl PdClient>,
60-
) -> BoxStream<'static, Result<(Self::Shard, Store)>> {
61-
self.inner.shards(pd_client)
62-
}
63-
64-
fn apply_shard(&mut self, shard: Self::Shard, store: &Store) -> Result<()> {
65-
self.inner.apply_shard(shard, store)
66-
}
65+
impl_inner_shardable!();
6766
}
6867

6968
#[macro_export]

src/transaction/requests.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
use crate::{
44
pd::PdClient,
5-
request::{Collect, DefaultProcessor, KvRequest, Merge, Process, Shardable, SingleKey},
5+
request::{
6+
Collect, DefaultProcessor, HasKeys, KvRequest, Merge, Process, Shardable, SingleKey,
7+
},
68
store::{store_stream_for_keys, store_stream_for_range_by_start_key, Store},
79
timestamp::TimestampExt,
810
transaction::HasLocks,
@@ -359,6 +361,15 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
359361
}
360362
}
361363

364+
impl HasKeys for kvrpcpb::PessimisticLockRequest {
365+
fn get_keys(&self) -> Vec<Key> {
366+
self.mutations
367+
.iter()
368+
.map(|m| m.key.clone().into())
369+
.collect()
370+
}
371+
}
372+
362373
impl Merge<kvrpcpb::PessimisticLockResponse> for Collect {
363374
// FIXME: PessimisticLockResponse only contains values.
364375
// We need to pair keys and values returned somewhere.

0 commit comments

Comments
 (0)