Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM]Support mixed mode write batch and introduce enable-unips feature #290

Open
wants to merge 9 commits into
base: raftstore-proxy
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions engine_tiflash/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,8 @@ pub use crate::table_properties::*;
mod mixed_engine;
mod ps_engine;
mod rocks_engine;
#[cfg(feature = "enable-pagestorage")]
pub use crate::ps_engine::ps_write_batch::*;
#[cfg(not(feature = "enable-pagestorage"))]
pub use crate::rocks_engine::write_batch::*;
pub use crate::{ps_engine::PSLogEngine, rocks_engine::db_vector};

pub use crate::{mixed_engine::write_batch::*, ps_engine::PSLogEngine, rocks_engine::db_vector};

pub mod mvcc_properties;
pub use crate::mvcc_properties::*;
Expand Down
6 changes: 5 additions & 1 deletion engine_tiflash/src/mixed_engine/elementary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use engine_rocks::RocksEngineIterator;
use engine_traits::{IterOptions, ReadOptions, Result};

use super::MixedDbVector;
use super::{write_batch::MixedWriteBatch, MixedDbVector};
pub trait ElementaryEngine: std::fmt::Debug {
fn put(&self, key: &[u8], value: &[u8]) -> Result<()>;

Expand Down Expand Up @@ -33,4 +33,8 @@ pub trait ElementaryEngine: std::fmt::Debug {
) -> Result<()>;

fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result<RocksEngineIterator>;

fn write_batch(&self) -> MixedWriteBatch;

fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch;
}
1 change: 1 addition & 0 deletions engine_tiflash/src/mixed_engine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

pub mod elementary;
pub mod write_batch;

use std::{
fmt::{self, Debug, Formatter},
Expand Down
159 changes: 159 additions & 0 deletions engine_tiflash/src/mixed_engine/write_batch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

#![allow(unused_variables)]
use std::sync::Arc;

use engine_traits::{self, Mutable, Result, WriteBatchExt, WriteOptions};
use proxy_ffi::interfaces_ffi::RawCppPtr;
use rocksdb::{WriteBatch as RawWriteBatch, DB};
use tikv_util::Either;

use crate::{engine::RocksEngine, ps_engine::add_prefix, r2e, PageStorageExt};

pub const WRITE_BATCH_MAX_BATCH: usize = 16;
pub const WRITE_BATCH_LIMIT: usize = 16;

pub struct MixedWriteBatch {
pub inner:
Either<crate::rocks_engine::RocksWriteBatchVec, crate::ps_engine::PSRocksWriteBatchVec>,
}

impl WriteBatchExt for RocksEngine {
type WriteBatch = MixedWriteBatch;

const WRITE_BATCH_MAX_KEYS: usize = 256;

fn write_batch(&self) -> MixedWriteBatch {
self.element_engine.as_ref().unwrap().write_batch()
}

fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch {
self.element_engine
.as_ref()
.unwrap()
.write_batch_with_cap(cap)
}
}

impl engine_traits::WriteBatch for MixedWriteBatch {
fn write_opt(&mut self, opts: &WriteOptions) -> Result<u64> {
// write into ps
match self.inner.as_mut() {
Either::Left(x) => x.write_opt(opts),
Either::Right(x) => x.write_opt(opts),
}
}

fn data_size(&self) -> usize {
match self.inner.as_ref() {
Either::Left(x) => x.data_size(),
Either::Right(x) => x.data_size(),
}
}

fn count(&self) -> usize {
match self.inner.as_ref() {
Either::Left(x) => x.count(),
Either::Right(x) => x.count(),
}
}

fn is_empty(&self) -> bool {
match self.inner.as_ref() {
Either::Left(x) => x.is_empty(),
Either::Right(x) => x.is_empty(),
}
}

fn should_write_to_engine(&self) -> bool {
// Disable TiKV's logic, and using Proxy's instead.
false
}

fn clear(&mut self) {
match self.inner.as_mut() {
Either::Left(x) => x.clear(),
Either::Right(x) => x.clear(),
}
}

fn set_save_point(&mut self) {
self.wbs[self.index].set_save_point();
self.save_points.push(self.index);
}

fn pop_save_point(&mut self) -> Result<()> {
if let Some(x) = self.save_points.pop() {
return self.wbs[x].pop_save_point().map_err(r2e);
}
Err(r2e("no save point"))
}

fn rollback_to_save_point(&mut self) -> Result<()> {
if let Some(x) = self.save_points.pop() {
for i in x + 1..=self.index {
self.wbs[i].clear();
}
self.index = x;
return self.wbs[x].rollback_to_save_point().map_err(r2e);
}
Err(r2e("no save point"))
}

fn merge(&mut self, other: Self) -> Result<()> {
match self.inner.as_mut() {
Either::Left(x) => x.merge(other.left().unwrap()),
Either::Right(x) => x.merge(other.right().unwrap()),
}
}
}

impl Mutable for MixedWriteBatch {
fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
if !self.do_write(engine_traits::CF_DEFAULT, key) {
return Ok(());
}
match self.inner.as_mut() {
Either::Left(x) => x.put(key, value),
Either::Right(x) => x.put(key, value),
}
}

fn put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> {
if !self.do_write(cf, key) {
return Ok(());
}
match self.inner.as_mut() {
Either::Left(x) => x.put_cf(cf, key, value),
Either::Right(x) => x.put_cf(cf, key, value),
}
}

fn delete(&mut self, key: &[u8]) -> Result<()> {
if !self.do_write(engine_traits::CF_DEFAULT, key) {
return Ok(());
}
match self.inner.as_mut() {
Either::Left(x) => x.delete(key),
Either::Right(x) => x.delete(key),
}
}

fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> {
if !self.do_write(cf, key) {
return Ok(());
}
match self.inner.as_mut() {
Either::Left(x) => x.delete_cf(cf, key),
Either::Right(x) => x.delete_cf(cf, key),
}
}

fn delete_range(&mut self, begin_key: &[u8], end_key: &[u8]) -> Result<()> {
Ok(())
}

fn delete_range_cf(&mut self, cf: &str, begin_key: &[u8], end_key: &[u8]) -> Result<()> {
Ok(())
}
}
4 changes: 2 additions & 2 deletions engine_tiflash/src/proxy_utils/engine_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ impl PageStorageExt {
pub fn read_page(&self, page_id: &[u8]) -> Option<Vec<u8>> {
// TODO maybe we can steal memory from C++ here to reduce redundant copy?
let value = self.helper().read_page(page_id.into());
return if value.view.len == 0 {
if value.view.len == 0 {
None
} else {
Some(value.view.to_slice().to_vec())
};
}
}

#[allow(clippy::type_complexity)]
Expand Down
12 changes: 7 additions & 5 deletions engine_tiflash/src/proxy_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ pub use engine_ext::*;
mod proxy_ext;
pub use proxy_ext::*;

use crate::util::get_cf_handle;
use crate::{
mixed_engine::write_batch::MixedWriteBatch as RocksWriteBatchVec, util::get_cf_handle,
};

pub fn do_write(cf: &str, key: &[u8]) -> bool {
fail::fail_point!("before_tiflash_do_write", |_| true);
Expand All @@ -22,7 +24,7 @@ pub fn do_write(cf: &str, key: &[u8]) -> bool {
}
}

pub fn cf_to_name(batch: &crate::RocksWriteBatchVec, cf: u32) -> &'static str {
pub fn cf_to_name(batch: &RocksWriteBatchVec, cf: u32) -> &'static str {
// d 0 w 2 l 1
let handle_default = get_cf_handle(batch.db.as_ref(), engine_traits::CF_DEFAULT).unwrap();
let d = handle_default.id();
Expand All @@ -42,7 +44,7 @@ pub fn cf_to_name(batch: &crate::RocksWriteBatchVec, cf: u32) -> &'static str {
}

#[cfg(any(test, feature = "testexport"))]
pub fn check_double_write(batch: &crate::RocksWriteBatchVec) {
pub fn check_double_write(batch: &RocksWriteBatchVec) {
// It will fire if we write by both observer(compat_old_proxy is not enabled)
// and TiKV's WriteBatch.
fail::fail_point!("before_tiflash_check_double_write", |_| {});
Expand All @@ -61,9 +63,9 @@ pub fn check_double_write(batch: &crate::RocksWriteBatchVec) {
}
}
#[cfg(not(any(test, feature = "testexport")))]
pub fn check_double_write(_: &crate::RocksWriteBatchVec) {}
pub fn check_double_write(_: &RocksWriteBatchVec) {}

pub fn log_check_double_write(batch: &crate::RocksWriteBatchVec) -> bool {
pub fn log_check_double_write(batch: &RocksWriteBatchVec) -> bool {
check_double_write(batch);
// TODO(tiflash) re-support this tracker.
let mut e = true;
Expand Down
32 changes: 30 additions & 2 deletions engine_tiflash/src/ps_engine/engine.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.

use std::sync::Arc;

use engine_rocks::RocksEngineIterator;
use engine_traits::{IterOptions, Iterable, ReadOptions, Result};

use super::{PSEngineWriteBatch, PSRocksWriteBatchVec};
use crate::{
mixed_engine::{elementary::ElementaryEngine, MixedDbVector},
PageStorageExt,
MixedWriteBatch, PageStorageExt,
};

use crate::mixed_engine::write_batch::WRITE_BATCH_LIMIT;
use crate::mixed_engine::write_batch::WRITE_BATCH_MAX_BATCH;
use tikv_util::Either;
#[derive(Clone, Debug)]
pub struct PSElementEngine {
pub ps_ext: PageStorageExt,
Expand Down Expand Up @@ -90,6 +95,29 @@ impl ElementaryEngine for PSElementEngine {
panic!("iterator_opt should not be called in PS engine");
r
}

fn write_batch(&self) -> MixedWriteBatch {
MixedWriteBatch {
inner: Either::Right(PSRocksWriteBatchVec::new(
Arc::clone(self.as_inner()),
self.ps_ext.clone(),
self.ps_ext.as_ref().unwrap().create_write_batch(),
WRITE_BATCH_LIMIT,
1,
self.support_multi_batch_write(),
)),
}
}

fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch {
MixedWriteBatch {
inner: Either::Right(PSRocksWriteBatchVec::with_unit_capacity(
self,
self.ps_ext.as_ref().unwrap().create_write_batch(),
cap,
)),
}
}
}

// Some data may be migrated from kv engine to raft engine in the future,
Expand Down
2 changes: 0 additions & 2 deletions engine_tiflash/src/ps_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

mod engine;
mod ps_log_engine;
#[cfg(feature = "enable-pagestorage")]
pub(crate) mod ps_write_batch;

pub use engine::*;
pub use ps_log_engine::*;
#[cfg(feature = "enable-pagestorage")]
pub use ps_write_batch::*;
Loading