diff --git a/proxy_components/engine_store_ffi/Cargo.toml b/proxy_components/engine_store_ffi/Cargo.toml index 21eb18ce61d..c04b4bcd9a4 100644 --- a/proxy_components/engine_store_ffi/Cargo.toml +++ b/proxy_components/engine_store_ffi/Cargo.toml @@ -32,6 +32,9 @@ openssl-vendored = [ "openssl/vendored" ] +jemalloc = ["proxy_ffi/jemalloc"] +external-jemalloc = ["proxy_ffi/external-jemalloc"] + [dependencies] batch-system = { workspace = true, default-features = false } bitflags = "1.0.1" diff --git a/proxy_components/engine_store_ffi/src/core/forward_raft/region.rs b/proxy_components/engine_store_ffi/src/core/forward_raft/region.rs index bdb356c451f..12c61ddfb66 100644 --- a/proxy_components/engine_store_ffi/src/core/forward_raft/region.rs +++ b/proxy_components/engine_store_ffi/src/core/forward_raft/region.rs @@ -13,6 +13,10 @@ impl ProxyForwarder { } pub fn on_region_changed(&self, ob_region: &Region, e: RegionChangeEvent, r: StateRole) { + self.engine_store_server_helper + .maybe_jemalloc_register_alloc(); + self.engine_store_server_helper + .directly_report_jemalloc_alloc(); let region_id = ob_region.get_id(); if e == RegionChangeEvent::Destroy { info!( @@ -104,6 +108,10 @@ impl ProxyForwarder { } pub fn on_role_change(&self, ob_region: &Region, r: &RoleChange) { + self.engine_store_server_helper + .maybe_jemalloc_register_alloc(); + self.engine_store_server_helper + .directly_report_jemalloc_alloc(); let region_id = ob_region.get_id(); let is_replicated = !r.initialized; let is_fap_enabled = if let Some(b) = self.engine.proxy_ext.config_set.as_ref() { diff --git a/proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs b/proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs index 095aeab1bb3..650d9a96d36 100644 --- a/proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs +++ b/proxy_components/mock-engine-store/src/mock_cluster/test_utils.rs @@ -98,7 +98,6 @@ pub fn maybe_collect_states( } pub fn collect_all_states(cluster_ext: &ClusterExt, region_id: u64) -> HashMap { - maybe_collect_states(cluster_ext, region_id, None); let prev_state = maybe_collect_states(cluster_ext, region_id, None); assert_eq!( prev_state.len(), diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index 68fb195747e..969c28af033 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -1,6 +1,10 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. -use std::{cell::RefCell, pin::Pin, sync::atomic::Ordering}; +use std::{ + cell::RefCell, + pin::Pin, + sync::{atomic::Ordering, Mutex}, +}; use engine_store_ffi::TiFlashEngine; @@ -18,6 +22,23 @@ use super::{ }; use crate::mock_cluster; +#[derive(Clone)] +pub struct ThreadInfoJealloc { + pub allocated_ptr: u64, + pub deallocated_ptr: u64, +} + +impl ThreadInfoJealloc { + pub fn allocated(&self) -> u64 { + unsafe { *(self.allocated_ptr as *const u64) } + } + pub fn deallocated(&self) -> u64 { + unsafe { *(self.deallocated_ptr as *const u64) } + } + pub fn remaining(&self) -> i64 { + self.allocated() as i64 - self.deallocated() as i64 + } +} pub struct EngineStoreServer { pub id: u64, // TODO engines maybe changed into TabletRegistry? @@ -28,6 +49,7 @@ pub struct EngineStoreServer { pub page_storage: MockPageStorage, // (region_id, peer_id) -> MockRegion pub tmp_fap_regions: HashMap>, + pub thread_info_map: Mutex>, } impl EngineStoreServer { @@ -40,6 +62,7 @@ impl EngineStoreServer { region_states: RefCell::new(Default::default()), page_storage: Default::default(), tmp_fap_regions: Default::default(), + thread_info_map: Default::default(), } } @@ -369,6 +392,8 @@ pub fn gen_engine_store_server_helper( fn_query_fap_snapshot_state: Some(ffi_query_fap_snapshot_state), fn_kvstore_region_exists: Some(ffi_kvstore_region_exists), fn_clear_fap_snapshot: Some(ffi_clear_fap_snapshot), + fn_report_thread_allocate_info: Some(ffi_report_thread_allocate_info), + fn_report_thread_allocate_batch: Some(ffi_report_thread_allocate_batch), ps: PageStorageInterfaces { fn_create_write_batch: Some(ffi_mockps_create_write_batch), fn_wb_put_page: Some(ffi_mockps_wb_put_page), @@ -609,3 +634,49 @@ unsafe extern "C" fn ffi_get_lock_by_key( }, } } + +unsafe extern "C" fn ffi_report_thread_allocate_info( + arg1: *mut interfaces_ffi::EngineStoreServerWrap, + _: u64, + name: interfaces_ffi::BaseBuffView, + t: interfaces_ffi::ReportThreadAllocateInfoType, + value: u64, +) { + let store = into_engine_store_server_wrap(arg1); + let tn = std::str::from_utf8(name.to_slice()).unwrap().to_string(); + match (*store.engine_store_server) + .thread_info_map + .lock() + .expect("poisoned") + .entry(tn) + { + std::collections::hash_map::Entry::Occupied(mut o) => { + if t == interfaces_ffi::ReportThreadAllocateInfoType::AllocPtr { + o.get_mut().allocated_ptr = value; + } else if t == interfaces_ffi::ReportThreadAllocateInfoType::DeallocPtr { + o.get_mut().deallocated_ptr = value; + } + } + std::collections::hash_map::Entry::Vacant(v) => { + if t == interfaces_ffi::ReportThreadAllocateInfoType::AllocPtr { + v.insert(ThreadInfoJealloc { + allocated_ptr: value, + deallocated_ptr: 0, + }); + } else if t == interfaces_ffi::ReportThreadAllocateInfoType::DeallocPtr { + v.insert(ThreadInfoJealloc { + allocated_ptr: 0, + deallocated_ptr: value, + }); + } + } + } +} + +unsafe extern "C" fn ffi_report_thread_allocate_batch( + _: *mut interfaces_ffi::EngineStoreServerWrap, + _: u64, + _name: interfaces_ffi::BaseBuffView, + _data: interfaces_ffi::ReportThreadAllocateInfoBatch, +) { +} diff --git a/proxy_components/proxy_ffi/Cargo.toml b/proxy_components/proxy_ffi/Cargo.toml index 0b49155e49e..f4349a4eceb 100644 --- a/proxy_components/proxy_ffi/Cargo.toml +++ b/proxy_components/proxy_ffi/Cargo.toml @@ -22,12 +22,15 @@ test-engines-rocksdb = [ test-engines-panic = [ "engine_test/test-engines-panic", ] +jemalloc = [] # TODO use encryption/openssl-vendored if later supports openssl-vendored = [ "openssl/vendored" ] +external-jemalloc = [] + [dependencies] encryption = { workspace = true, default-features = false } openssl = { workspace = true } # TODO only for feature diff --git a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs index 68149af83e0..dd96e0a66f4 100644 --- a/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs +++ b/proxy_components/proxy_ffi/src/engine_store_helper_impls.rs @@ -1,5 +1,5 @@ // Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0. -use std::pin::Pin; +use std::{cell::RefCell, pin::Pin}; use kvproto::{kvrpcpb, metapb, raft_cmdpb}; @@ -34,6 +34,13 @@ pub fn gen_engine_store_server_helper( unsafe { &(*(engine_store_server_helper as *const EngineStoreServerHelper)) } } +thread_local! { + pub static JEMALLOC_REGISTERED: RefCell = RefCell::new(false); + pub static JEMALLOC_TNAME: RefCell<(String, u64)> = RefCell::new(Default::default()); + pub static JEMALLOC_ALLOCP: RefCell<*mut u64> = RefCell::new(std::ptr::null_mut()); + pub static JEMALLOC_DEALLOCP: RefCell<*mut u64> = RefCell::new(std::ptr::null_mut()); +} + /// # Safety /// The lifetime of `engine_store_server_helper` is definitely longer than /// `ENGINE_STORE_SERVER_HELPER_PTR`. @@ -49,6 +56,85 @@ pub fn set_server_info_resp(res: &kvproto::diagnosticspb::ServerInfoResponse, pt } impl EngineStoreServerHelper { + pub fn maybe_jemalloc_register_alloc(&self) { + JEMALLOC_REGISTERED.with(|b| { + if !*b.borrow() { + unsafe { + let ptr_alloc: u64 = crate::jemalloc_utils::get_allocatep_on_thread_start(); + let ptr_dealloc: u64 = crate::jemalloc_utils::get_deallocatep_on_thread_start(); + let thread_name = std::thread::current().name().unwrap_or("").to_string(); + let thread_id: u64 = std::thread::current().id().as_u64().into(); + (self.fn_report_thread_allocate_info.into_inner())( + self.inner, + thread_id, + BaseBuffView::from(thread_name.as_bytes()), + interfaces_ffi::ReportThreadAllocateInfoType::Reset, + 0, + ); + (self.fn_report_thread_allocate_info.into_inner())( + self.inner, + thread_id, + BaseBuffView::from(thread_name.as_bytes()), + interfaces_ffi::ReportThreadAllocateInfoType::AllocPtr, + ptr_alloc, + ); + (self.fn_report_thread_allocate_info.into_inner())( + self.inner, + thread_id, + BaseBuffView::from(thread_name.as_bytes()), + interfaces_ffi::ReportThreadAllocateInfoType::DeallocPtr, + ptr_dealloc, + ); + + // Some threads are not everlasting, so we don't want TiFlash to directly access + // the pointer. + JEMALLOC_TNAME.with(|p| { + *p.borrow_mut() = (thread_name, thread_id); + }); + if ptr_alloc != 0 { + JEMALLOC_ALLOCP.with(|p| { + *p.borrow_mut() = ptr_alloc as *mut u64; + }); + } + if ptr_dealloc != 0 { + JEMALLOC_DEALLOCP.with(|p| { + *p.borrow_mut() = ptr_dealloc as *mut u64; + }); + } + } + *(b.borrow_mut()) = true; + } + }); + } + + pub fn directly_report_jemalloc_alloc(&self) { + JEMALLOC_TNAME.with(|thread_info| unsafe { + let a = JEMALLOC_ALLOCP.with(|p| { + let p = *p.borrow_mut(); + if p.is_null() { + return 0; + } + *p + }); + let d = JEMALLOC_DEALLOCP.with(|p| { + let p = *p.borrow_mut(); + if p.is_null() { + return 0; + } + *p + }); + (self.fn_report_thread_allocate_batch.into_inner())( + self.inner, + thread_info.borrow().1, + BaseBuffView::from(thread_info.borrow().0.as_bytes()), + interfaces_ffi::ReportThreadAllocateInfoBatch { + alloc: a, + dealloc: d, + }, + ); + }); + } + pub fn gc_raw_cpp_ptr(&self, ptr: *mut ::std::os::raw::c_void, tp: RawCppPtrType) { debug_assert!(self.fn_gc_raw_cpp_ptr.is_some()); unsafe { @@ -82,6 +168,7 @@ impl EngineStoreServerHelper { pub fn handle_compute_store_stats(&self) -> StoreStats { debug_assert!(self.fn_handle_compute_store_stats.is_some()); + self.maybe_jemalloc_register_alloc(); unsafe { (self.fn_handle_compute_store_stats.into_inner())(self.inner) } } @@ -91,16 +178,22 @@ impl EngineStoreServerHelper { header: RaftCmdHeader, ) -> EngineStoreApplyRes { debug_assert!(self.fn_handle_write_raft_cmd.is_some()); + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); unsafe { (self.fn_handle_write_raft_cmd.into_inner())(self.inner, cmds.gen_view(), header) } } pub fn handle_get_engine_store_server_status(&self) -> EngineStoreServerStatus { debug_assert!(self.fn_handle_get_engine_store_server_status.is_some()); + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); unsafe { (self.fn_handle_get_engine_store_server_status.into_inner())(self.inner) } } pub fn handle_set_proxy(&self, proxy: *const RaftStoreProxyFFIHelper) { debug_assert!(self.fn_atomic_update_proxy.is_some()); + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); unsafe { (self.fn_atomic_update_proxy.into_inner())(self.inner, proxy as *mut _) } } @@ -129,7 +222,8 @@ impl EngineStoreServerHelper { header: RaftCmdHeader, ) -> EngineStoreApplyRes { debug_assert!(self.fn_handle_admin_raft_cmd.is_some()); - + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); unsafe { let req = ProtoMsgBaseBuff::new(req); let resp = ProtoMsgBaseBuff::new(resp); @@ -158,6 +252,8 @@ impl EngineStoreServerHelper { term: u64, ) -> bool { debug_assert!(self.fn_try_flush_data.is_some()); + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); // TODO(proactive flush) unsafe { (self.fn_try_flush_data.into_inner())( @@ -187,6 +283,8 @@ impl EngineStoreServerHelper { ) -> RawCppPtr { debug_assert!(self.fn_pre_handle_snapshot.is_some()); + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); let snaps_view = into_sst_views(snaps); unsafe { let region = ProtoMsgBaseBuff::new(region); @@ -203,6 +301,8 @@ impl EngineStoreServerHelper { pub fn apply_pre_handled_snapshot(&self, snap: RawCppPtr) { debug_assert!(self.fn_apply_pre_handled_snapshot.is_some()); + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); unsafe { (self.fn_apply_pre_handled_snapshot.into_inner())(self.inner, snap.ptr, snap.type_) } @@ -210,6 +310,8 @@ impl EngineStoreServerHelper { pub fn abort_pre_handle_snapshot(&self, region_id: u64, peer_id: u64) { debug_assert!(self.fn_abort_pre_handle_snapshot.is_some()); + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); unsafe { (self.fn_abort_pre_handle_snapshot.into_inner())(self.inner, region_id, peer_id) } } @@ -277,6 +379,8 @@ impl EngineStoreServerHelper { ) -> EngineStoreApplyRes { debug_assert!(self.fn_handle_ingest_sst.is_some()); + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); let snaps_view = into_sst_views(snaps); unsafe { (self.fn_handle_ingest_sst.into_inner())( @@ -290,6 +394,8 @@ impl EngineStoreServerHelper { pub fn handle_destroy(&self, region_id: u64) { debug_assert!(self.fn_handle_destroy.is_some()); + self.maybe_jemalloc_register_alloc(); + self.directly_report_jemalloc_alloc(); unsafe { (self.fn_handle_destroy.into_inner())(self.inner, region_id); } diff --git a/proxy_components/proxy_ffi/src/interfaces.rs b/proxy_components/proxy_ffi/src/interfaces.rs index b06d0a5c7ca..ca2e9337c6f 100644 --- a/proxy_components/proxy_ffi/src/interfaces.rs +++ b/proxy_components/proxy_ffi/src/interfaces.rs @@ -558,6 +558,20 @@ pub mod root { unsafe extern "C" fn(arg1: *const root::DB::EngineStoreServerWrap), >, } + #[repr(u64)] + #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] + pub enum ReportThreadAllocateInfoType { + Reset = 0, + Remove = 1, + AllocPtr = 2, + DeallocPtr = 3, + } + #[repr(C)] + #[derive(Debug)] + pub struct ReportThreadAllocateInfoBatch { + pub alloc: u64, + pub dealloc: u64, + } #[repr(C)] #[derive(Debug)] pub struct EngineStoreServerHelper { @@ -750,6 +764,23 @@ pub mod root { region_id: u64, ) -> bool, >, + pub fn_report_thread_allocate_info: ::std::option::Option< + unsafe extern "C" fn( + arg1: *mut root::DB::EngineStoreServerWrap, + thread_id: u64, + name: root::DB::BaseBuffView, + type_: root::DB::ReportThreadAllocateInfoType, + value: u64, + ), + >, + pub fn_report_thread_allocate_batch: ::std::option::Option< + unsafe extern "C" fn( + arg1: *mut root::DB::EngineStoreServerWrap, + thread_id: u64, + name: root::DB::BaseBuffView, + data: root::DB::ReportThreadAllocateInfoBatch, + ), + >, } extern "C" { pub fn ffi_get_server_info_from_proxy( @@ -758,7 +789,7 @@ pub mod root { arg3: root::DB::RawVoidPtr, ) -> u32; } - pub const RAFT_STORE_PROXY_VERSION: u64 = 8589640407431546086; + pub const RAFT_STORE_PROXY_VERSION: u64 = 9679186549381427051; pub const RAFT_STORE_PROXY_MAGIC_NUMBER: u32 = 324508639; } } diff --git a/proxy_components/proxy_ffi/src/jemalloc_utils.rs b/proxy_components/proxy_ffi/src/jemalloc_utils.rs new file mode 100644 index 00000000000..33875908b0c --- /dev/null +++ b/proxy_components/proxy_ffi/src/jemalloc_utils.rs @@ -0,0 +1,92 @@ +// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0. + +extern "C" { + // External jemalloc + pub fn mallctl( + name: *const ::std::os::raw::c_char, + oldp: *mut ::std::os::raw::c_void, + oldlenp: *mut u64, + newp: *mut ::std::os::raw::c_void, + newlen: u64, + ) -> ::std::os::raw::c_int; + + // Embedded jemalloc + pub fn _rjem_mallctl( + name: *const ::std::os::raw::c_char, + oldp: *mut ::std::os::raw::c_void, + oldlenp: *mut u64, + newp: *mut ::std::os::raw::c_void, + newlen: u64, + ) -> ::std::os::raw::c_int; +} + +#[allow(unused_variables)] +#[allow(unused_mut)] +#[allow(unused_unsafe)] +fn issue_mallctl(command: &str) -> u64 { + type PtrUnderlying = u64; + let mut ptr: PtrUnderlying = 0; + let mut size = std::mem::size_of::() as u64; + let c_str = std::ffi::CString::new(command).unwrap(); + let c_ptr: *const ::std::os::raw::c_char = c_str.as_ptr() as *const ::std::os::raw::c_char; + unsafe { + // See unprefixed_malloc_on_supported_platforms in tikv-jemalloc-sys. + #[cfg(any(test, feature = "testexport"))] + { + #[cfg(any(feature = "jemalloc"))] + { + // See NO_UNPREFIXED_MALLOC + #[cfg(any(target_os = "android", target_os = "dragonfly", target_os = "macos"))] + _rjem_mallctl( + c_ptr, + &mut ptr as *mut _ as *mut ::std::os::raw::c_void, + &mut size as *mut u64, + std::ptr::null_mut(), + 0, + ); + #[cfg(not(any( + target_os = "android", + target_os = "dragonfly", + target_os = "macos" + )))] + mallctl( + c_ptr, + &mut ptr as *mut _ as *mut ::std::os::raw::c_void, + &mut size as *mut u64, + std::ptr::null_mut(), + 0, + ); + } + } + + #[cfg(not(any(test, feature = "testexport")))] + { + // Must linked to tiflash. + #[cfg(feature = "external-jemalloc")] + mallctl( + c_ptr, + &mut ptr as *mut _ as *mut ::std::os::raw::c_void, + &mut size as *mut u64, + std::ptr::null_mut(), + 0, + ); + } + } + ptr +} + +pub fn get_allocatep_on_thread_start() -> u64 { + issue_mallctl("thread.allocatedp") +} + +pub fn get_deallocatep_on_thread_start() -> u64 { + issue_mallctl("thread.deallocatedp") +} + +pub fn get_allocate() -> u64 { + issue_mallctl("thread.allocated") +} + +pub fn get_deallocate() -> u64 { + issue_mallctl("thread.deallocated") +} diff --git a/proxy_components/proxy_ffi/src/lib.rs b/proxy_components/proxy_ffi/src/lib.rs index 59ed9dbdc8e..fd61492e800 100644 --- a/proxy_components/proxy_ffi/src/lib.rs +++ b/proxy_components/proxy_ffi/src/lib.rs @@ -1,5 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +#![feature(thread_id_value)] + /// All mods end up with `_impls` impl structs defined in interface. /// Other mods which define and impl structs should not end up with name /// `_impls`. @@ -21,6 +23,7 @@ pub mod raftstore_proxy; pub mod raftstore_proxy_helper_impls; pub mod read_index_helper; // FFI releated with reading from SST/RocksDB files. +pub mod jemalloc_utils; pub mod snapshot_reader_impls; pub mod utils; diff --git a/proxy_components/proxy_server/Cargo.toml b/proxy_components/proxy_server/Cargo.toml index d843816c169..4296f870f63 100644 --- a/proxy_components/proxy_server/Cargo.toml +++ b/proxy_components/proxy_server/Cargo.toml @@ -7,7 +7,7 @@ publish = false [features] tcmalloc = ["tikv/tcmalloc"] -jemalloc = ["tikv/jemalloc"] +jemalloc = ["tikv/jemalloc", "engine_store_ffi/jemalloc"] mimalloc = ["tikv/mimalloc"] snmalloc = ["tikv/snmalloc"] portable = ["tikv/portable"] @@ -35,6 +35,7 @@ backup-stream-debug = ["backup-stream/backup-stream-debug"] testexport = ["engine_tiflash/testexport", "engine_store_ffi/testexport", "tikv/testexport"] pprof-fp = ["tikv/pprof-fp"] openssl-vendored = ["tikv/openssl-vendored", "openssl/vendored"] +external-jemalloc = ["engine_store_ffi/external-jemalloc"] [dependencies] api_version = { workspace = true } diff --git a/proxy_components/proxy_server/src/run.rs b/proxy_components/proxy_server/src/run.rs index ab0e34aceae..71ace196ae5 100644 --- a/proxy_components/proxy_server/src/run.rs +++ b/proxy_components/proxy_server/src/run.rs @@ -618,6 +618,11 @@ impl TiKvServer { Arc::clone(&security_mgr), ); + #[cfg(feature = "external-jemalloc")] + info!("linked with external jemalloc"); + #[cfg(not(feature = "external-jemalloc"))] + info!("linked without external jemalloc"); + // Initialize and check config info!("using proxy config"; "config" => ?proxy_config); crate::config::address_proxy_config(&mut config, &proxy_config); diff --git a/proxy_scripts/ci_check.sh b/proxy_scripts/ci_check.sh index a38316b1afd..fb11fd1eafa 100755 --- a/proxy_scripts/ci_check.sh +++ b/proxy_scripts/ci_check.sh @@ -1,4 +1,9 @@ set -uxeo pipefail +cat /etc/issue +cat /proc/version +echo "LD_LIBRARY_PATH=", ${LD_LIBRARY_PATH:-nil} +echo "PATH=", $PATH + if [[ $M == "fmt" ]]; then pwd git rev-parse --show-toplevel @@ -34,11 +39,14 @@ elif [[ $M == "testold" ]]; then # cargo test --package tests --test failpoints cases::test_snap cargo test --package tests --test failpoints cases::test_import_service elif [[ $M == "testnew" ]]; then + chmod +x ./proxy_scripts/make_env.sh + ./proxy_scripts/make_env.sh export ENGINE_LABEL_VALUE=tiflash export RUST_BACKTRACE=full export ENABLE_FEATURES="test-engine-kv-rocksdb test-engine-raft-raft-engine openssl-vendored" cargo check --package proxy_server --features="$ENABLE_FEATURES" # tests based on mock-engine-store, with compat for new proxy + LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::jemalloc --features="jemalloc" cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::write cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::snapshot cargo test --package proxy_tests --features="$ENABLE_FEATURES" --test proxy shared::store @@ -70,3 +78,4 @@ elif [[ $M == "release" ]]; then export ENGINE_LABEL_VALUE=tiflash make release fi + \ No newline at end of file diff --git a/proxy_scripts/make_env.sh b/proxy_scripts/make_env.sh new file mode 100644 index 00000000000..9884b991db0 --- /dev/null +++ b/proxy_scripts/make_env.sh @@ -0,0 +1,6 @@ +wget https://github.com/jemalloc/jemalloc/releases/download/5.2.1/jemalloc-5.2.1.tar.bz2 +tar -xvf jemalloc-5.2.1.tar.bz2 +cd jemalloc-5.2.1 +./configure +make +make install \ No newline at end of file diff --git a/proxy_tests/Cargo.toml b/proxy_tests/Cargo.toml index 35f0bebe765..895832a8809 100644 --- a/proxy_tests/Cargo.toml +++ b/proxy_tests/Cargo.toml @@ -38,7 +38,7 @@ test-engine-raft-rocksdb = [ "engine_test/test-engine-raft-rocksdb" ] -jemalloc = ["tikv/jemalloc"] +jemalloc = ["tikv/jemalloc", "proxy_server/jemalloc", "proxy_ffi/jemalloc"] mimalloc = ["tikv/mimalloc"] snmalloc = ["tikv/snmalloc"] mem-profiling = ["tikv/mem-profiling"] @@ -46,6 +46,7 @@ sse = ["tikv/sse"] portable = ["tikv/portable"] openssl-vendored = ["tikv/openssl-vendored"] enable-pagestorage = [] +external-jemalloc = ["proxy_server/external-jemalloc", "engine_store_ffi/external-jemalloc"] [dependencies] api_version = { workspace = true } diff --git a/proxy_tests/proxy/shared/jemalloc.rs b/proxy_tests/proxy/shared/jemalloc.rs new file mode 100644 index 00000000000..6ff6f0faf7b --- /dev/null +++ b/proxy_tests/proxy/shared/jemalloc.rs @@ -0,0 +1,78 @@ +// Copyright 2024 TiKV Project Authors. Licensed under Apache-2.0. + +use collections::HashMap; +use mock_engine_store::ThreadInfoJealloc; +use more_asserts::assert_gt; +use proxy_ffi::jemalloc_utils::{get_allocatep_on_thread_start, get_deallocatep_on_thread_start}; + +use crate::utils::v1::*; + +#[test] +fn test_alloc_dealloc() { + let dummy: Vec = Vec::with_capacity(100000); + let ptr_alloc = get_allocatep_on_thread_start(); + let actual_alloc: &u64 = unsafe { &*(ptr_alloc as *const u64) }; + let ptr_dealloc = get_deallocatep_on_thread_start(); + let actual_dealloc: &u64 = unsafe { &*(ptr_dealloc as *const u64) }; + assert_gt!(*actual_alloc, 100000 * std::mem::size_of::() as u64); + let dummy2: Vec = Vec::with_capacity(100000); + assert_gt!( + *actual_alloc, + 2 * 100000 * std::mem::size_of::() as u64 + ); + drop(dummy); + assert_gt!(*actual_dealloc, 100000 * std::mem::size_of::() as u64); + drop(dummy2); + assert_gt!( + *actual_dealloc, + 2 * 100000 * std::mem::size_of::() as u64 + ); +} + +fn collect_thread_state( + cluster_ext: &ClusterExt, + store_id: u64, +) -> HashMap { + let mut res: HashMap = Default::default(); + cluster_ext.iter_ffi_helpers(Some(vec![store_id]), &mut |_, ffi: &mut FFIHelperSet| { + res = (*ffi.engine_store_server.thread_info_map.lock().expect("")).clone(); + }); + res +} + +fn gather(m: &HashMap, pattern: &str) -> i64 { + m.iter() + .filter(|(k, _)| k.contains(pattern)) + .fold(0, |acc, e| acc + e.1.remaining()) +} + +#[test] +fn test_ffi() { + let (mut cluster, _pd_client) = new_mock_cluster(0, 1); + + let _ = cluster.run(); + + let prev = collect_thread_state(&cluster.cluster_ext, 1); + let before_raftstore = gather(&prev, "raftstore"); + let before_apply = gather(&prev, "apply"); + + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + cluster.must_put(k.as_bytes(), v.as_bytes()); + } + + for i in 0..10 { + let k = format!("k{}", i); + let v = format!("v{}", i); + check_key(&cluster, k.as_bytes(), v.as_bytes(), Some(true), None, None); + } + + let after = collect_thread_state(&cluster.cluster_ext, 1); + let after_raftstore = gather(&after, "raftstore"); + let after_apply = gather(&after, "apply"); + assert_gt!(after_raftstore, before_raftstore); + assert_gt!(after_apply, before_apply); + + cluster.shutdown(); +} diff --git a/proxy_tests/proxy/shared/mod.rs b/proxy_tests/proxy/shared/mod.rs index 7a20b9d4483..3fbdcfe8053 100644 --- a/proxy_tests/proxy/shared/mod.rs +++ b/proxy_tests/proxy/shared/mod.rs @@ -6,6 +6,7 @@ mod engine; mod fast_add_peer; mod ffi; mod ingest; +mod jemalloc; mod mock; mod normal; mod region; diff --git a/raftstore-proxy/Cargo.toml b/raftstore-proxy/Cargo.toml index 1916b7b6228..4141686abe2 100644 --- a/raftstore-proxy/Cargo.toml +++ b/raftstore-proxy/Cargo.toml @@ -35,6 +35,7 @@ backup-stream-debug = ["proxy_server/backup-stream-debug"] pprof-fp = ["proxy_server/pprof-fp"] openssl-vendored = ["proxy_server/openssl-vendored"] +external-jemalloc = ["proxy_server/external-jemalloc"] [lib] name = "raftstore_proxy" diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version index 91166e0b477..59c3d0bca37 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/@version @@ -1,3 +1,3 @@ #pragma once #include -namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 8589640407431546086ull; } \ No newline at end of file +namespace DB { constexpr uint64_t RAFT_STORE_PROXY_VERSION = 9679186549381427051ull; } \ No newline at end of file diff --git a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index 17428d52ce3..3dd1feba12a 100644 --- a/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/raftstore-proxy/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -311,6 +311,18 @@ struct PageStorageInterfaces { void (*fn_handle_purge_ps)(const EngineStoreServerWrap *); }; +enum class ReportThreadAllocateInfoType : uint64_t { + Reset = 0, + Remove, + AllocPtr, + DeallocPtr, +}; + +struct ReportThreadAllocateInfoBatch { + uint64_t alloc; + uint64_t dealloc; +}; + struct EngineStoreServerHelper { uint32_t magic_number; // use a very special number to check whether this // struct is legal @@ -373,6 +385,13 @@ struct EngineStoreServerHelper { uint64_t, uint64_t); void (*fn_clear_fap_snapshot)(EngineStoreServerWrap *, uint64_t region_id); bool (*fn_kvstore_region_exists)(EngineStoreServerWrap *, uint64_t region_id); + void (*fn_report_thread_allocate_info)(EngineStoreServerWrap *, + uint64_t thread_id, BaseBuffView name, + ReportThreadAllocateInfoType type, + uint64_t value); + void (*fn_report_thread_allocate_batch)(EngineStoreServerWrap *, + uint64_t thread_id, BaseBuffView name, + ReportThreadAllocateInfoBatch data); }; #ifdef __cplusplus