Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian committed Nov 28, 2023
1 parent c203d36 commit c88274e
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 75 deletions.
3 changes: 0 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1110,9 +1110,6 @@ DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
// Max size(bytes) of group commit queues, used for mem back pressure, defult 64M.
DEFINE_Int32(group_commit_max_queue_size, "67108864");

// Max size(bytes) of wal disk using, used for disk space back pressure, default 64M.
DEFINE_Int32(wal_max_disk_size, "67108864");

// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");

Expand Down
3 changes: 0 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1188,9 +1188,6 @@ DECLARE_String(default_tzfiles_path);
// Max size(bytes) of group commit queues, used for mem back pressure.
DECLARE_Int32(group_commit_max_queue_size);

// Max size(bytes) of wal disk using, used for disk space back pressure.
DECLARE_Int32(wal_max_disk_size);

// Ingest binlog work pool size
DECLARE_Int32(ingest_binlog_work_pool_size);

Expand Down
27 changes: 27 additions & 0 deletions be/src/olap/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <memory>
#include <utility>

#include "common/status.h"
#include "io/fs/local_file_system.h"
#include "olap/wal_writer.h"
#include "runtime/client_cache.h"
Expand Down Expand Up @@ -75,11 +76,37 @@ Status WalManager::init() {
}
RETURN_IF_ERROR(scan_wals(wal_dir));
}
size_t available_bytes;
size_t disk_capacity_bytes;
// Get the root path available space.
RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info("./", &disk_capacity_bytes,
&available_bytes));
RETURN_IF_ERROR(init_wal_limit(available_bytes));
return Thread::create(
"WalMgr", "replay_wal", [this]() { static_cast<void>(this->replay()); },
&_replay_thread);
}

Status WalManager::init_wal_limit(size_t available_bytes) {
if (available_bytes >= DYNAMIC_WAL_SPACE_MAX_LIMIT) {
// If available bytes >= 1G, max wal disk limit is 5% of available bytes.
wal_limit = available_bytes / 20;
} else if (available_bytes < DYNAMIC_WAL_SPACE_MAX_LIMIT &&
available_bytes >= DYNAMIC_WAL_SPACE_MIN_LIMIT) {
// If 1G > available bytes >= 128M, max wal disk limit is 64M.
wal_limit = WAL_DEFAULT_LIMIT;
} else if (available_bytes < DYNAMIC_WAL_SPACE_MIN_LIMIT && available_bytes > 0) {
// If available bytes < 128M, max wal disk limit is 10% of available bytes.
wal_limit = available_bytes / 10;
LOG(WARNING) << "Currently the disk available bytes is " << available_bytes
<< ", which is lower than 128M. We recommend expanding your disk space. ";
} else {
wal_limit = 0;
return Status::InternalError("Disk space is 0. Please check the disk status! ");
}
return Status::OK();
}

void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) {
std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
LOG(INFO) << "add wal queue "
Expand Down
11 changes: 11 additions & 0 deletions be/src/olap/wal_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <gen_cpp/PaloInternalService_types.h>

#include <condition_variable>
#include <cstddef>
#include <memory>

#include "common/config.h"
Expand Down Expand Up @@ -70,6 +73,8 @@ class WalManager {
void add_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);
void erase_wal_column_index(int64_t wal_id);
Status get_wal_column_index(int64_t wal_id, std::vector<size_t>& column_index);
Status init_wal_limit(size_t available_bytes);
inline static size_t wal_limit;

private:
ExecEnv* _exec_env = nullptr;
Expand All @@ -87,5 +92,11 @@ class WalManager {
std::atomic<bool> _stop;
std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
std::shared_ptr<std::condition_variable> _cv;
// 1G, Used for dynamic wal space limit.
static constexpr size_t DYNAMIC_WAL_SPACE_MAX_LIMIT = 1024 * 1024 * 1024;
// 128M, Used for dynamic wal space limit.
static constexpr size_t DYNAMIC_WAL_SPACE_MIN_LIMIT = 128 * 1024 * 1024;
// 64M
static constexpr size_t WAL_DEFAULT_LIMIT = 64 * 1024 * 1024;
};
} // namespace doris
6 changes: 3 additions & 3 deletions be/src/olap/wal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "io/fs/local_file_system.h"
#include "io/fs/path.h"
#include "olap/storage_engine.h"
#include "olap/wal_manager.h"
#include "util/crc32c.h"

namespace doris {
Expand Down Expand Up @@ -62,12 +63,11 @@ Status WalWriter::append_blocks(const PBlockArray& blocks) {
if (_is_first_append_blocks) {
_is_first_append_blocks = false;
std::unique_lock l(_mutex);
while (_all_wal_disk_bytes->load(std::memory_order_relaxed) >
config::wal_max_disk_size) {
while (_all_wal_disk_bytes->load(std::memory_order_relaxed) > WalManager::wal_limit) {
LOG(INFO) << "First time to append blocks to wal file " << _file_name
<< ". Currently, all wal disk space usage is "
<< _all_wal_disk_bytes->load(std::memory_order_relaxed)
<< ", larger than the maximum limit " << config::wal_max_disk_size
<< ", larger than the maximum limit " << WalManager::wal_limit
<< ", so we need to wait. When any other load finished, that wal will be "
"removed, the space used by that wal will be free.";
cv->wait_for(l, std::chrono::milliseconds(WalWriter::MAX_WAL_WRITE_WAIT_TIME));
Expand Down
12 changes: 6 additions & 6 deletions be/src/util/debug_points.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
#include "fmt/format.h"

// more usage can see 'util/debug_points_test.cpp'
#define DBUG_EXECUTE_IF(debug_point_name, code) \
if (UNLIKELY(config::enable_debug_points)) { \
auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
if (dp) { \
code; \
} \
#define DBUG_EXECUTE_IF(debug_point_name, code) \
if (UNLIKELY(config::enable_debug_points)) { \
auto dp = DebugPoints::instance() -> get_debug_point(debug_point_name); \
if (dp) { \
code; \
} \
}

namespace doris {
Expand Down
120 changes: 60 additions & 60 deletions be/src/vec/functions/function_bitmap_variadic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,67 +52,67 @@ namespace doris::vectorized {

// currently only bitmap_or and bitmap_or_count will call this function,
// other bitmap functions will use default implementation for nulls
#define BITMAP_OR_NULLABLE(nullable, input_rows_count, res, op) \
const auto& nested_col_ptr = nullable->get_nested_column_ptr(); \
const auto* __restrict null_map_data = nullable->get_null_map_data().data(); \
const auto& mid_data = assert_cast<const ColumnBitmap*>(nested_col_ptr.get())->get_data(); \
for (size_t row = 0; row < input_rows_count; ++row) { \
if (!null_map_data[row]) { \
res[row] op mid_data[row]; \
} \
#define BITMAP_OR_NULLABLE(nullable, input_rows_count, res, op) \
const auto& nested_col_ptr = nullable->get_nested_column_ptr(); \
const auto* __restrict null_map_data = nullable->get_null_map_data().data(); \
const auto& mid_data = assert_cast<const ColumnBitmap*>(nested_col_ptr.get()) -> get_data(); \
for (size_t row = 0; row < input_rows_count; ++row) { \
if (!null_map_data[row]) { \
res[row] op mid_data[row]; \
} \
}

#define BITMAP_FUNCTION_VARIADIC(CLASS, FUNCTION_NAME, OP) \
struct CLASS { \
static constexpr auto name = #FUNCTION_NAME; \
using ResultDataType = DataTypeBitMap; \
static Status vector_vector(ColumnPtr argument_columns[], size_t col_size, \
size_t input_rows_count, std::vector<BitmapValue>& res, \
IColumn* res_nulls) { \
const ColumnUInt8::value_type* null_map_datas[col_size]; \
int nullable_cols_count = 0; \
ColumnUInt8::value_type* __restrict res_nulls_data = nullptr; \
if (res_nulls) { \
res_nulls_data = assert_cast<ColumnUInt8*>(res_nulls)->get_data().data(); \
} \
if (auto* nullable = check_and_get_column<ColumnNullable>(*argument_columns[0])) { \
null_map_datas[nullable_cols_count++] = nullable->get_null_map_data().data(); \
BITMAP_OR_NULLABLE(nullable, input_rows_count, res, =); \
} else { \
const auto& mid_data = \
assert_cast<const ColumnBitmap*>(argument_columns[0].get())->get_data(); \
for (size_t row = 0; row < input_rows_count; ++row) { \
res[row] = mid_data[row]; \
} \
} \
for (size_t col = 1; col < col_size; ++col) { \
if (auto* nullable = \
check_and_get_column<ColumnNullable>(*argument_columns[col])) { \
null_map_datas[nullable_cols_count++] = nullable->get_null_map_data().data(); \
BITMAP_OR_NULLABLE(nullable, input_rows_count, res, OP); \
} else { \
const auto& col_data = \
assert_cast<const ColumnBitmap*>(argument_columns[col].get()) \
->get_data(); \
for (size_t row = 0; row < input_rows_count; ++row) { \
res[row] OP col_data[row]; \
} \
} \
} \
if (res_nulls_data && nullable_cols_count == col_size) { \
const auto* null_map_data = null_map_datas[0]; \
for (size_t row = 0; row < input_rows_count; ++row) { \
res_nulls_data[row] = null_map_data[row]; \
} \
for (int i = 1; i < nullable_cols_count; ++i) { \
const auto* null_map_data = null_map_datas[i]; \
for (size_t row = 0; row < input_rows_count; ++row) { \
res_nulls_data[row] &= null_map_data[row]; \
} \
} \
} \
return Status::OK(); \
} \
#define BITMAP_FUNCTION_VARIADIC(CLASS, FUNCTION_NAME, OP) \
struct CLASS { \
static constexpr auto name = #FUNCTION_NAME; \
using ResultDataType = DataTypeBitMap; \
static Status vector_vector(ColumnPtr argument_columns[], size_t col_size, \
size_t input_rows_count, std::vector<BitmapValue>& res, \
IColumn* res_nulls) { \
const ColumnUInt8::value_type* null_map_datas[col_size]; \
int nullable_cols_count = 0; \
ColumnUInt8::value_type* __restrict res_nulls_data = nullptr; \
if (res_nulls) { \
res_nulls_data = assert_cast<ColumnUInt8*>(res_nulls)->get_data().data(); \
} \
if (auto* nullable = check_and_get_column<ColumnNullable>(*argument_columns[0])) { \
null_map_datas[nullable_cols_count++] = nullable->get_null_map_data().data(); \
BITMAP_OR_NULLABLE(nullable, input_rows_count, res, =); \
} else { \
const auto& mid_data = \
assert_cast<const ColumnBitmap*>(argument_columns[0].get()) -> get_data(); \
for (size_t row = 0; row < input_rows_count; ++row) { \
res[row] = mid_data[row]; \
} \
} \
for (size_t col = 1; col < col_size; ++col) { \
if (auto* nullable = \
check_and_get_column<ColumnNullable>(*argument_columns[col])) { \
null_map_datas[nullable_cols_count++] = nullable->get_null_map_data().data(); \
BITMAP_OR_NULLABLE(nullable, input_rows_count, res, OP); \
} else { \
const auto& col_data = \
assert_cast<const ColumnBitmap*>(argument_columns[col].get()) \
-> get_data(); \
for (size_t row = 0; row < input_rows_count; ++row) { \
res[row] OP col_data[row]; \
} \
} \
} \
if (res_nulls_data && nullable_cols_count == col_size) { \
const auto* null_map_data = null_map_datas[0]; \
for (size_t row = 0; row < input_rows_count; ++row) { \
res_nulls_data[row] = null_map_data[row]; \
} \
for (int i = 1; i < nullable_cols_count; ++i) { \
const auto* null_map_data = null_map_datas[i]; \
for (size_t row = 0; row < input_rows_count; ++row) { \
res_nulls_data[row] &= null_map_data[row]; \
} \
} \
} \
return Status::OK(); \
} \
}

#define BITMAP_FUNCTION_COUNT_VARIADIC(CLASS, FUNCTION_NAME, OP) \
Expand All @@ -137,7 +137,7 @@ namespace doris::vectorized {
} else { \
const auto& col_data = \
assert_cast<const ColumnBitmap*>(argument_columns[col].get()) \
->get_data(); \
-> get_data(); \
for (size_t row = 0; row < input_rows_count; ++row) { \
vals[row] OP col_data[row]; \
} \
Expand Down
Loading

0 comments on commit c88274e

Please sign in to comment.