Skip to content

Commit

Permalink
Merge branch 'branch-3.2' into mergify/bp/branch-3.2/pr-50697
Browse files Browse the repository at this point in the history
Signed-off-by: 絵空事スピリット <[email protected]>
  • Loading branch information
EsoragotoSpirit authored Oct 8, 2024
2 parents f846e4b + 7bc5628 commit c6f1d83
Show file tree
Hide file tree
Showing 718 changed files with 23,021 additions and 7,899 deletions.
12 changes: 9 additions & 3 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ option(USE_AVX2 "Build with AVX2 instruction" ON)

option(USE_AVX512 "Build with AVX512f/AVX512BW instruction" OFF)

option(WITH_RELATIVE_SRC_PATH "Build source file with relative path" ON)

# Check gcc
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
if (CMAKE_CXX_COMPILER_VERSION VERSION_LESS "4.8.2")
Expand Down Expand Up @@ -606,7 +608,9 @@ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall -Wno-sign-compare -Wno-unknown-p
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-strict-aliasing -fno-omit-frame-pointer")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -std=gnu++20 -D__STDC_FORMAT_MACROS")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-deprecated -Wno-vla -Wno-comment")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} ${FILE_PREFIX_MAP_FLAG}")
if("${WITH_RELATIVE_SRC_PATH}" STREQUAL "ON")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} ${FILE_PREFIX_MAP_FLAG}")
endif()
# disable link delete(void*, long)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -fno-sized-deallocation")

Expand Down Expand Up @@ -754,8 +758,10 @@ endif()

# Add flags that are common across build types
SET(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} ${CMAKE_CXX_FLAGS}")
# apply -ffile-prefix-map option to .c source files as well
SET(CMAKE_C_FLAGS "${FILE_PREFIX_MAP_FLAG} ${CMAKE_C_FLAGS}")
if("${WITH_RELATIVE_SRC_PATH}" STREQUAL "ON")
# apply -ffile-prefix-map option to .c source files as well
SET(CMAKE_C_FLAGS "${FILE_PREFIX_MAP_FLAG} ${CMAKE_C_FLAGS}")
endif()

message(STATUS "Compiler Flags: ${CMAKE_CXX_FLAGS}")

Expand Down
22 changes: 16 additions & 6 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class AgentServer::Impl {
std::unique_ptr<ThreadPool> _thread_pool_move_dir;
std::unique_ptr<ThreadPool> _thread_pool_update_tablet_meta_info;
std::unique_ptr<ThreadPool> _thread_pool_drop_auto_increment_map;
std::unique_ptr<ThreadPool> _thread_pool_replication;
std::unique_ptr<ThreadPool> _thread_pool_remote_snapshot;
std::unique_ptr<ThreadPool> _thread_pool_replicate_snapshot;

std::unique_ptr<PushTaskWorkerPool> _push_workers;
std::unique_ptr<PublishVersionTaskWorkerPool> _publish_version_workers;
Expand Down Expand Up @@ -246,8 +247,12 @@ void AgentServer::Impl::init_or_die() {
MIN_CLONE_TASK_THREADS_IN_POOL),
DEFAULT_DYNAMIC_THREAD_POOL_QUEUE_SIZE, _thread_pool_clone);

BUILD_DYNAMIC_TASK_THREAD_POOL("replication", 0, calc_max_replication_threads(config::replication_threads),
std::numeric_limits<int>::max(), _thread_pool_replication);
BUILD_DYNAMIC_TASK_THREAD_POOL("remote_snapshot", 0, calc_max_replication_threads(config::replication_threads),
std::numeric_limits<int>::max(), _thread_pool_remote_snapshot);

BUILD_DYNAMIC_TASK_THREAD_POOL("replicate_snapshot", 0,
calc_max_replication_threads(config::replication_threads),
std::numeric_limits<int>::max(), _thread_pool_replicate_snapshot);

// It is the same code to create workers of each type, so we use a macro
// to make code to be more readable.
Expand Down Expand Up @@ -296,7 +301,8 @@ void AgentServer::Impl::stop() {

#ifndef BE_TEST
_thread_pool_clone->shutdown();
_thread_pool_replication->shutdown();
_thread_pool_remote_snapshot->shutdown();
_thread_pool_replicate_snapshot->shutdown();
#define STOP_POOL(type, pool_name) pool_name->stop();
#else
#define STOP_POOL(type, pool_name)
Expand Down Expand Up @@ -583,8 +589,10 @@ void AgentServer::Impl::update_max_thread_by_type(int type, int new_val) {
st = _thread_pool_clone->update_max_threads(new_val);
break;
case TTaskType::REMOTE_SNAPSHOT:
st = _thread_pool_remote_snapshot->update_max_threads(calc_max_replication_threads(new_val));
break;
case TTaskType::REPLICATE_SNAPSHOT:
st = _thread_pool_replication->update_max_threads(calc_max_replication_threads(new_val));
st = _thread_pool_replicate_snapshot->update_max_threads(calc_max_replication_threads(new_val));
break;
default:
break;
Expand Down Expand Up @@ -648,8 +656,10 @@ ThreadPool* AgentServer::Impl::get_thread_pool(int type) const {
ret = _thread_pool_drop_auto_increment_map.get();
break;
case TTaskType::REMOTE_SNAPSHOT:
ret = _thread_pool_remote_snapshot.get();
break;
case TTaskType::REPLICATE_SNAPSHOT:
ret = _thread_pool_replication.get();
ret = _thread_pool_replicate_snapshot.get();
break;
case TTaskType::PUSH:
case TTaskType::REALTIME_PUSH:
Expand Down
6 changes: 5 additions & 1 deletion be/src/agent/publish_version.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,16 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque
tablet_info.tablet_id, partition_version.version, transaction_id);
} else {
const int64_t max_continuous_version =
enable_sync_publish ? tablet->max_continuous_version() : tablet->max_readable_version();
enable_sync_publish ? tablet->max_readable_version() : tablet->max_continuous_version();
if (max_continuous_version > 0) {
auto& pair = tablet_versions.emplace_back();
pair.__set_tablet_id(tablet_info.tablet_id);
pair.__set_version(max_continuous_version);
}

if (enable_sync_publish && tablet_tasks.empty() && max_continuous_version < partition_version.version) {
error_tablet_ids.push_back(tablet_info.tablet_id);
}
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions be/src/block_cache/block_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ Status BlockCache::init(const CacheOptions& options) {
return Status::NotSupported("unsupported block cache engine");
}
RETURN_IF_ERROR(_kv_cache->init(options));
_refresh_quota();
_initialized.store(true, std::memory_order_relaxed);
return Status::OK();
}
Expand Down Expand Up @@ -142,6 +143,18 @@ Status BlockCache::remove(const CacheKey& cache_key, off_t offset, size_t size)
return _kv_cache->remove(block_key);
}

Status BlockCache::update_mem_quota(size_t quota_bytes) {
Status st = _kv_cache->update_mem_quota(quota_bytes);
_refresh_quota();
return st;
}

Status BlockCache::update_disk_spaces(const std::vector<DirSpace>& spaces) {
Status st = _kv_cache->update_disk_spaces(spaces);
_refresh_quota();
return st;
}

void BlockCache::record_read_remote(size_t size, int64_t lateny_us) {
_kv_cache->record_read_remote(size, lateny_us);
}
Expand All @@ -164,4 +177,10 @@ DataCacheEngineType BlockCache::engine_type() {
return _kv_cache->engine_type();
}

void BlockCache::_refresh_quota() {
auto metrics = _kv_cache->cache_metrics(0);
_mem_quota.store(metrics.mem_quota_bytes, std::memory_order_relaxed);
_disk_quota.store(metrics.disk_quota_bytes, std::memory_order_relaxed);
}

} // namespace starrocks
17 changes: 16 additions & 1 deletion be/src/block_cache/block_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class BlockCache {
// Remove data from cache. The offset and size must be aligned by block size
Status remove(const CacheKey& cache_key, off_t offset, size_t size);

// Update the datacache memory quota.
Status update_mem_quota(size_t quota_bytes);

// Update the datacache disk space infomation, such as disk quota or disk path.
Status update_disk_spaces(const std::vector<DirSpace>& spaces);

void record_read_remote(size_t size, int64_t lateny_us);

void record_read_cache(size_t size, int64_t lateny_us);
Expand All @@ -70,7 +76,13 @@ class BlockCache {

size_t block_size() const { return _block_size; }

bool is_initialized() { return _initialized.load(std::memory_order_relaxed); }
bool is_initialized() const { return _initialized.load(std::memory_order_relaxed); }

bool has_mem_cache() const { return _mem_quota.load(std::memory_order_relaxed) > 0; }

bool has_disk_cache() const { return _disk_quota.load(std::memory_order_relaxed) > 0; }

bool available() const { return is_initialized() && (has_mem_cache() || has_disk_cache()); }

DataCacheEngineType engine_type();

Expand All @@ -80,10 +92,13 @@ class BlockCache {
#ifndef BE_TEST
BlockCache() = default;
#endif
void _refresh_quota();

size_t _block_size = 0;
std::unique_ptr<KvCache> _kv_cache;
std::atomic<bool> _initialized = false;
std::atomic<size_t> _mem_quota = 0;
std::atomic<size_t> _disk_quota = 0;
};

} // namespace starrocks
118 changes: 103 additions & 15 deletions be/src/block_cache/cache_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,126 @@

#include "block_cache/cache_options.h"

#include <fmt/format.h>

#include <filesystem>

#include "common/logging.h"
#include "fs/fs.h"
#include "gutil/strings/split.h"
#include "util/parse_util.h"

namespace starrocks {

int64_t parse_mem_size(const std::string& mem_size_str, int64_t mem_limit) {
return ParseUtil::parse_mem_spec(mem_size_str, mem_limit);
Status parse_conf_datacache_mem_size(const std::string& conf_mem_size_str, int64_t mem_limit, size_t* mem_size) {
int64_t parsed_mem_size = ParseUtil::parse_mem_spec(conf_mem_size_str, mem_limit);
if (mem_limit > 0 && parsed_mem_size > mem_limit) {
LOG(WARNING) << "the configured datacache memory size exceeds the limit, decreased it to the limit value."
<< "mem_size: " << parsed_mem_size << ", mem_limit: " << mem_limit;
parsed_mem_size = mem_limit;
}

if (parsed_mem_size < 0) {
LOG(ERROR) << "invalid mem size for datacache: " << mem_size;
return Status::InvalidArgument("invalid mem size for datacache");
}
*mem_size = parsed_mem_size;
return Status::OK();
}

int64_t parse_disk_size(const std::string& disk_path, const std::string& disk_size_str, int64_t disk_limit) {
if (disk_limit == -1) {
Status parse_conf_datacache_disk_size(const std::string& disk_path, const std::string& disk_size_str,
int64_t disk_limit, size_t* disk_size) {
if (disk_limit <= 0) {
std::filesystem::path dpath(disk_path);
// The datacache directory may be created automatically later.
if (!std::filesystem::exists(dpath)) {
if (!dpath.has_parent_path()) {
LOG(ERROR) << "invalid disk path for datacache, disk_path: " << disk_path;
return -1;
}
dpath = dpath.parent_path();
}

std::error_code ec;
auto space_info = std::filesystem::space(dpath, ec);
if (ec) {
LOG(ERROR) << "fail to get disk space info, path: " << dpath << ", error: " << ec.message();
return -1;
return Status::InternalError("fail to get disk space info for datacache");
}
disk_limit = space_info.capacity;
}
return ParseUtil::parse_mem_spec(disk_size_str, disk_limit);

int64_t parsed_disk_size = ParseUtil::parse_mem_spec(disk_size_str, disk_limit);
if (parsed_disk_size < 0) {
LOG(ERROR) << "invalid disk size for datacache: " << parsed_disk_size;
return Status::InvalidArgument("invalid disk size for datacache");
}
if (parsed_disk_size > disk_limit) {
LOG(WARNING) << "the configured datacache disk size exceeds the disk limit, decreased it to the limit value."
<< ", path: " << disk_path << ", disk_size: " << parsed_disk_size
<< ", disk_limit: " << disk_limit;
parsed_disk_size = disk_limit;
}
*disk_size = parsed_disk_size;
return Status::OK();
}

Status parse_conf_datacache_disk_paths(const std::string& config_path, std::vector<std::string>* paths,
bool ignore_broken_disk) {
if (config_path.empty()) {
return Status::OK();
}

size_t duplicated_count = 0;
std::vector<std::string> path_vec = strings::Split(config_path, ";", strings::SkipWhitespace());
for (auto& item : path_vec) {
StripWhiteSpace(&item);
item.erase(item.find_last_not_of('/') + 1);
if (item.empty() || item[0] != '/') {
LOG(WARNING) << "invalid datacache path. path: " << item;
continue;
}

Status status = FileSystem::Default()->create_dir_if_missing(item);
if (!status.ok()) {
LOG(WARNING) << "datacache path can not be created. path: " << item;
continue;
}

string canonicalized_path;
status = FileSystem::Default()->canonicalize(item, &canonicalized_path);
if (!status.ok()) {
LOG(WARNING) << "datacache path can not be canonicalized. may be not exist. path: " << item;
continue;
}
if (std::find(paths->begin(), paths->end(), canonicalized_path) != paths->end()) {
LOG(WARNING) << "duplicated datacache disk path: " << item << ", ignore it.";
++duplicated_count;
continue;
}
paths->emplace_back(canonicalized_path);
}
if ((path_vec.size() != (paths->size() + duplicated_count) && ignore_broken_disk)) {
LOG(WARNING) << "fail to parse datacache_disk_path config. value: " << config_path;
return Status::InvalidArgument("fail to parse datacache_disk_path");
}
return Status::OK();
}

Status parse_conf_datacache_disk_spaces(const std::string& config_disk_path, const std::string& config_disk_size,
bool ignore_broken_disk, std::vector<DirSpace>* disk_spaces) {
std::vector<std::string> paths;
RETURN_IF_ERROR(parse_conf_datacache_disk_paths(config_disk_path, &paths, ignore_broken_disk));
for (auto& p : paths) {
size_t disk_size = 0;
RETURN_IF_ERROR(parse_conf_datacache_disk_size(p, config_disk_size, -1, &disk_size));
disk_spaces->push_back({.path = p, .size = static_cast<size_t>(disk_size)});
}
return Status::OK();
}

void clean_residual_datacache(const std::string& disk_path) {
auto st = FileSystem::Default()->iterate_dir2(disk_path, [&](DirEntry entry) {
if (!entry.is_dir.value_or(false) && entry.name.find("blockfile_") == 0) {
auto file = fmt::format("{}/{}", disk_path, entry.name);
auto ret = FileSystem::Default()->delete_file(file);
LOG_IF(WARNING, !ret.ok()) << "fail to delete residual datacache file: " << file
<< ", reason: " << ret.message();
}
return true;
});
LOG_IF(WARNING, !st.ok()) << "fail to clean residual datacache data, reason: " << st.message();
}

} // namespace starrocks
15 changes: 13 additions & 2 deletions be/src/block_cache/cache_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include <string>
#include <vector>

#include "common/status.h"

namespace starrocks {

struct DirSpace {
Expand Down Expand Up @@ -75,8 +77,17 @@ struct ReadCacheOptions {
} stats;
};

int64_t parse_mem_size(const std::string& mem_size_str, int64_t mem_limit = -1);
Status parse_conf_datacache_mem_size(const std::string& conf_mem_size_str, int64_t mem_limit, size_t* mem_size);

Status parse_conf_datacache_disk_size(const std::string& disk_path, const std::string& disk_size_str,
int64_t disk_limit, size_t* disk_size);

Status parse_conf_datacache_disk_paths(const std::string& config_path, std::vector<std::string>* paths,
bool ignore_broken_disk);

Status parse_conf_datacache_disk_spaces(const std::string& config_disk_path, const std::string& config_disk_size,
bool ignore_broken_disk, std::vector<DirSpace>* disk_spaces);

int64_t parse_disk_size(const std::string& disk_path, const std::string& disk_size_str, int64_t disk_limit = -1);
void clean_residual_datacache(const std::string& disk_path);

} // namespace starrocks
8 changes: 8 additions & 0 deletions be/src/block_cache/cachelib_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ Status CacheLibWrapper::remove(const std::string& key) {
return Status::OK();
}

Status CacheLibWrapper::update_mem_quota(size_t quota_bytes) {
return Status::NotSupported("not support updating memory cache quota for cachelib");
}

Status update_disk_spaces(const std::vector<DirSpace>& spaces) {
return Status::NotSupported("not support updating disk cache spaces for cachelib");
}

std::unordered_map<std::string, double> CacheLibWrapper::cache_stats() {
const auto navy_stats = _cache->getNvmCacheStatsMap().toMap();
return navy_stats;
Expand Down
4 changes: 4 additions & 0 deletions be/src/block_cache/cachelib_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class CacheLibWrapper : public KvCache {

Status remove(const std::string& key) override;

Status update_mem_quota(size_t quota_bytes) override;

Status update_disk_spaces(const std::vector<DirSpace>& spaces) override;

std::unordered_map<std::string, double> cache_stats() override;

const DataCacheMetrics cache_metrics(int level) override;
Expand Down
6 changes: 6 additions & 0 deletions be/src/block_cache/kv_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ class KvCache {
// Remove data from cache. The offset must be aligned by block size
virtual Status remove(const std::string& key) = 0;

// Update the datacache memory quota.
virtual Status update_mem_quota(size_t quota_bytes) = 0;

// Update the datacache disk space infomation, such as disk quota or disk path.
virtual Status update_disk_spaces(const std::vector<DirSpace>& spaces) = 0;

virtual const DataCacheMetrics cache_metrics(int level) = 0;

virtual void record_read_remote(size_t size, int64_t lateny_us) = 0;
Expand Down
Loading

0 comments on commit c6f1d83

Please sign in to comment.