Skip to content

Commit 0314d63

Browse files
authored
Merge pull request #253 from nrc/unlock-delete
Fix two bugs with insert and delete
2 parents 4870985 + e8a9175 commit 0314d63

File tree

5 files changed

+132
-38
lines changed

5 files changed

+132
-38
lines changed

rust-toolchain

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
nightly-2021-03-15

src/transaction/buffer.rs

+35-23
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
22

33
use crate::{BoundRange, Key, KvPair, Result, Value};
4+
use derive_new::new;
45
use std::{
5-
collections::{BTreeMap, HashMap},
6+
collections::{btree_map::Entry, BTreeMap, HashMap},
67
future::Future,
78
};
89
use tikv_client_proto::kvrpcpb;
910
use tokio::sync::{Mutex, MutexGuard};
1011

11-
#[derive(Default)]
12+
#[derive(new)]
1213
struct InnerBuffer {
14+
#[new(default)]
1315
primary_key: Option<Key>,
16+
#[new(default)]
1417
entry_map: BTreeMap<Key, BufferEntry>,
18+
is_pessimistic: bool,
1519
}
1620

1721
impl InnerBuffer {
@@ -29,16 +33,22 @@ impl InnerBuffer {
2933
}
3034

3135
/// A caching layer which buffers reads and writes in a transaction.
32-
#[derive(Default)]
3336
pub struct Buffer {
3437
mutations: Mutex<InnerBuffer>,
3538
}
3639

3740
impl Buffer {
41+
pub fn new(is_pessimistic: bool) -> Buffer {
42+
Buffer {
43+
mutations: Mutex::new(InnerBuffer::new(is_pessimistic)),
44+
}
45+
}
46+
3847
/// Get the primary key of the buffer.
3948
pub async fn get_primary_key(&self) -> Option<Key> {
4049
self.mutations.lock().await.primary_key.clone()
4150
}
51+
4252
/// Get the primary key of the buffer, if not exists, use `key` as the primary key.
4353
pub async fn get_primary_key_or(&self, key: &Key) -> Key {
4454
self.mutations.lock().await.get_primary_key_or(key).clone()
@@ -203,28 +213,30 @@ impl Buffer {
203213

204214
/// Mark a value as Insert mutation into the buffer (does not write through).
205215
pub async fn insert(&self, key: Key, value: Value) {
206-
self.mutations
207-
.lock()
208-
.await
209-
.insert(key, BufferEntry::Insert(value));
216+
let mut mutations = self.mutations.lock().await;
217+
let mut entry = mutations.entry_map.entry(key.clone());
218+
match entry {
219+
Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => {
220+
o.insert(BufferEntry::Put(value));
221+
}
222+
_ => mutations.insert(key, BufferEntry::Insert(value)),
223+
}
210224
}
211225

212226
/// Mark a value as deleted.
213227
pub async fn delete(&self, key: Key) {
214228
let mut mutations = self.mutations.lock().await;
215-
let value = mutations
216-
.entry_map
217-
.entry(key.clone())
218-
.or_insert(BufferEntry::Del);
219-
220-
let new_value: BufferEntry;
221-
if let BufferEntry::Insert(_) = value {
222-
new_value = BufferEntry::CheckNotExist
223-
} else {
224-
new_value = BufferEntry::Del
229+
let is_pessimistic = mutations.is_pessimistic;
230+
let mut entry = mutations.entry_map.entry(key.clone());
231+
232+
match entry {
233+
Entry::Occupied(ref mut o)
234+
if matches!(o.get(), BufferEntry::Insert(_)) && !is_pessimistic =>
235+
{
236+
o.insert(BufferEntry::CheckNotExist);
237+
}
238+
_ => mutations.insert(key, BufferEntry::Del),
225239
}
226-
227-
mutations.insert(key, new_value);
228240
}
229241

230242
/// Converts the buffered mutations to the proto buffer version
@@ -378,7 +390,7 @@ mod tests {
378390
#[tokio::test]
379391
#[allow(unreachable_code)]
380392
async fn set_and_get_from_buffer() {
381-
let buffer = Buffer::default();
393+
let buffer = Buffer::new(false);
382394
buffer
383395
.put(b"key1".to_vec().into(), b"value1".to_vec())
384396
.await;
@@ -411,7 +423,7 @@ mod tests {
411423
#[tokio::test]
412424
#[allow(unreachable_code)]
413425
async fn insert_and_get_from_buffer() {
414-
let buffer = Buffer::default();
426+
let buffer = Buffer::new(false);
415427
buffer
416428
.insert(b"key1".to_vec().into(), b"value1".to_vec())
417429
.await;
@@ -453,13 +465,13 @@ mod tests {
453465
let v2: Value = b"value2".to_vec();
454466
let v2_ = v2.clone();
455467

456-
let buffer = Buffer::default();
468+
let buffer = Buffer::new(false);
457469
let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_)))));
458470
let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!())));
459471
assert_eq!(r1.unwrap().unwrap(), v1);
460472
assert_eq!(r2.unwrap().unwrap(), v1);
461473

462-
let buffer = Buffer::default();
474+
let buffer = Buffer::new(false);
463475
let r1 = block_on(
464476
buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| {
465477
ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()]))

src/transaction/lowering.rs

+36-8
Original file line numberDiff line numberDiff line change
@@ -106,22 +106,50 @@ pub fn new_pessimistic_rollback_request(
106106
)
107107
}
108108

109+
pub trait PessimisticLock: Clone {
110+
fn key(self) -> Key;
111+
112+
fn assertion(&self) -> kvrpcpb::Assertion;
113+
}
114+
115+
impl PessimisticLock for Key {
116+
fn key(self) -> Key {
117+
self
118+
}
119+
120+
fn assertion(&self) -> kvrpcpb::Assertion {
121+
kvrpcpb::Assertion::None
122+
}
123+
}
124+
125+
impl PessimisticLock for (Key, kvrpcpb::Assertion) {
126+
fn key(self) -> Key {
127+
self.0
128+
}
129+
130+
fn assertion(&self) -> kvrpcpb::Assertion {
131+
self.1
132+
}
133+
}
134+
109135
pub fn new_pessimistic_lock_request(
110-
keys: impl Iterator<Item = Key>,
136+
locks: impl Iterator<Item = impl PessimisticLock>,
111137
primary_lock: Key,
112138
start_version: Timestamp,
113139
lock_ttl: u64,
114140
for_update_ts: Timestamp,
115141
need_value: bool,
116142
) -> kvrpcpb::PessimisticLockRequest {
117143
requests::new_pessimistic_lock_request(
118-
keys.map(|key| {
119-
let mut mutation = kvrpcpb::Mutation::default();
120-
mutation.set_op(kvrpcpb::Op::PessimisticLock);
121-
mutation.set_key(key.into());
122-
mutation
123-
})
124-
.collect(),
144+
locks
145+
.map(|pl| {
146+
let mut mutation = kvrpcpb::Mutation::default();
147+
mutation.set_op(kvrpcpb::Op::PessimisticLock);
148+
mutation.set_assertion(pl.assertion());
149+
mutation.set_key(pl.key().into());
150+
mutation
151+
})
152+
.collect(),
125153
primary_lock.into(),
126154
start_version.version(),
127155
lock_ttl,

src/transaction/transaction.rs

+17-7
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ impl<PdC: PdClient> Transaction<PdC> {
6666
Transaction {
6767
status: Arc::new(RwLock::new(status)),
6868
timestamp,
69-
buffer: Default::default(),
69+
buffer: Buffer::new(options.is_pessimistic()),
7070
rpc,
7171
options,
7272
is_heartbeat_started: false,
@@ -401,8 +401,11 @@ impl<PdC: PdClient> Transaction<PdC> {
401401
return Err(Error::DuplicateKeyInsertion);
402402
}
403403
if self.is_pessimistic() {
404-
self.pessimistic_lock(iter::once(key.clone()), false)
405-
.await?;
404+
self.pessimistic_lock(
405+
iter::once((key.clone(), kvrpcpb::Assertion::NotExist)),
406+
false,
407+
)
408+
.await?;
406409
}
407410
self.buffer.insert(key, value.into()).await;
408411
Ok(())
@@ -630,20 +633,20 @@ impl<PdC: PdClient> Transaction<PdC> {
630633
/// Only valid for pessimistic transactions, panics if called on an optimistic transaction.
631634
async fn pessimistic_lock(
632635
&mut self,
633-
keys: impl IntoIterator<Item = Key>,
636+
keys: impl IntoIterator<Item = impl PessimisticLock>,
634637
need_value: bool,
635638
) -> Result<Vec<KvPair>> {
636639
assert!(
637640
matches!(self.options.kind, TransactionKind::Pessimistic(_)),
638641
"`pessimistic_lock` is only valid to use with pessimistic transactions"
639642
);
640643

641-
let keys: Vec<Key> = keys.into_iter().collect();
644+
let keys: Vec<_> = keys.into_iter().collect();
642645
if keys.is_empty() {
643646
return Ok(vec![]);
644647
}
645648

646-
let first_key = keys[0].clone();
649+
let first_key = keys[0].clone().key();
647650
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
648651
let for_update_ts = self.rpc.clone().get_timestamp().await?;
649652
self.options.push_for_update_ts(for_update_ts.clone());
@@ -667,7 +670,7 @@ impl<PdC: PdClient> Transaction<PdC> {
667670
self.start_auto_heartbeat().await;
668671

669672
for key in keys {
670-
self.buffer.lock(key).await;
673+
self.buffer.lock(key.key()).await;
671674
}
672675

673676
pairs
@@ -891,6 +894,13 @@ impl TransactionOptions {
891894
self.auto_heartbeat = false;
892895
self
893896
}
897+
898+
pub fn is_pessimistic(&self) -> bool {
899+
match self.kind {
900+
TransactionKind::Pessimistic(_) => true,
901+
TransactionKind::Optimistic => false,
902+
}
903+
}
894904
}
895905

896906
/// The default TTL of a lock in milliseconds.

tests/integration_tests.rs

+43
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,49 @@ async fn pessimistic_rollback() -> Result<()> {
596596
Ok(())
597597
}
598598

599+
#[tokio::test]
600+
#[serial]
601+
async fn pessimistic_delete() -> Result<()> {
602+
clear_tikv().await;
603+
let client =
604+
TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?;
605+
606+
// The transaction will lock the keys and must release the locks on commit, even when values are
607+
// not written to the DB.
608+
let mut txn = client.begin_pessimistic().await?;
609+
txn.put(vec![1], vec![42]).await?;
610+
txn.delete(vec![1]).await?;
611+
txn.insert(vec![2], vec![42]).await?;
612+
txn.delete(vec![2]).await?;
613+
txn.put(vec![3], vec![42]).await?;
614+
txn.commit().await?;
615+
616+
// Check that the keys are not locked.
617+
let mut txn2 = client.begin_optimistic().await?;
618+
txn2.put(vec![1], vec![42]).await?;
619+
txn2.put(vec![2], vec![42]).await?;
620+
txn2.put(vec![3], vec![42]).await?;
621+
txn2.commit().await?;
622+
623+
// As before, but rollback instead of commit.
624+
let mut txn = client.begin_pessimistic().await?;
625+
txn.put(vec![1], vec![42]).await?;
626+
txn.delete(vec![1]).await?;
627+
txn.delete(vec![2]).await?;
628+
txn.insert(vec![2], vec![42]).await?;
629+
txn.delete(vec![2]).await?;
630+
txn.put(vec![3], vec![42]).await?;
631+
txn.rollback().await?;
632+
633+
let mut txn2 = client.begin_optimistic().await?;
634+
txn2.put(vec![1], vec![42]).await?;
635+
txn2.put(vec![2], vec![42]).await?;
636+
txn2.put(vec![3], vec![42]).await?;
637+
txn2.commit().await?;
638+
639+
Ok(())
640+
}
641+
599642
#[tokio::test]
600643
#[serial]
601644
async fn lock_keys() -> Result<()> {

0 commit comments

Comments
 (0)