Skip to content

Commit

Permalink
[ADD] Support write with replace_if_not_null
Browse files Browse the repository at this point in the history
1. memtable process: add aggregate_merge, support replace_if_not_null
2. segment_rewrite process: add rewrite, support replace_if_not_null
  • Loading branch information
motto1314 committed Sep 9, 2024
1 parent df13ecb commit 32502bd
Show file tree
Hide file tree
Showing 32 changed files with 685 additions and 2 deletions.
2 changes: 2 additions & 0 deletions be/src/column/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ class Column {

virtual void append(const Column& src) { append(src, 0, src.size()); }

virtual void merge(const Column& src) {};

// replicate a column to align with an array's offset, used for captured columns in lambda functions
// for example: column(1,2)->replicate({0,2,5}) = column(1,1,2,2,2)
// FixedLengthColumn, BinaryColumn and ConstColumn override this function for better performance.
Expand Down
41 changes: 41 additions & 0 deletions be/src/column/nullable_column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,47 @@ void NullableColumn::append(const Column& src, size_t offset, size_t count) {
DCHECK_EQ(_null_column->size(), _data_column->size());
}

void NullableColumn::merge(const starrocks::Column &src) {
DCHECK_EQ(_null_column->size(), _data_column->size());

if (src.is_nullable()) {
const auto& c = down_cast<const NullableColumn&>(src);

DCHECK_EQ(c._null_column->size(), c._data_column->size());

const auto& new_src_data = c._data_column->clone_empty();
const auto& new_src_null = c._null_column->clone_empty();
std::vector<uint32_t> null_idxes;

for (size_t i = 0; i < _null_column->size(); ++i) {
if (_null_column->get_data()[i] == 1 && c._null_column->get_data()[i] == 0) {
null_idxes.emplace_back(i);
new_src_null->append_datum(c._null_column->get(i));
new_src_data->append_datum(c._data_column->get(i));
}
}

_null_column->update_rows(*new_src_null, null_idxes.data());
_data_column->update_rows(*new_src_data, null_idxes.data());
update_has_null();
} else {
const auto& new_src_data = src.clone_empty();
std::vector<uint32_t> null_idxes;

for (size_t i = 0; i < _null_column->size(); ++i) {
if (_null_column->get_data()[i] == 1) {
null_idxes.emplace_back(i);
new_src_data->append_datum(src.get(i));
}
}

_data_column->update_rows(*new_src_data, null_idxes.data());
_has_null = false;
}

DCHECK_EQ(_null_column->size(), _data_column->size());
}

void NullableColumn::append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
DCHECK_EQ(_null_column->size(), _data_column->size());
size_t orig_size = _null_column->size();
Expand Down
2 changes: 2 additions & 0 deletions be/src/column/nullable_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class NullableColumn : public ColumnFactory<Column, NullableColumn> {

void append(const Column& src, size_t offset, size_t count) override;

void merge(const Column& src) override;

void append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) override;

void append_value_multiple_times(const Column& src, uint32_t index, uint32_t size) override;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const std::vector<TExpr>& texprs,
Status OlapTableSink::init(const TDataSink& t_sink, RuntimeState* state) {
DCHECK(t_sink.__isset.olap_table_sink);
const auto& table_sink = t_sink.olap_table_sink;
_merge_mode = table_sink.merge_mode;
_merge_condition = table_sink.merge_condition;
_encryption_meta = table_sink.encryption_meta;
_partial_update_mode = table_sink.partial_update_mode;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ class OlapTableSink : public AsyncDataSink {
std::string _merge_condition;
std::string _encryption_meta;
TPartialUpdateMode::type _partial_update_mode;
bool _merge_mode = false;

// this is tuple descriptor of destination OLAP table
TupleDescriptor* _output_tuple_desc = nullptr;
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/tablet_sink_index_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ void NodeChannel::try_open() {
void NodeChannel::_open(int64_t index_id, RefCountClosure<PTabletWriterOpenResult>* open_closure,
std::vector<PTabletWithPartition>& tablets, bool incremental_open) {
PTabletWriterOpenRequest request;
request.set_merge_mode(_parent->_merge_mode);
request.set_merge_condition(_parent->_merge_condition);
request.set_encryption_meta(_parent->_encryption_meta);
if (_parent->_partial_update_mode == TPartialUpdateMode::type::ROW_MODE) {
Expand Down
14 changes: 14 additions & 0 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,20 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_partial_update_mode(TPartialUpdateMode::type::COLUMN_UPSERT_MODE);
}
}
if (!http_req->header(HTTP_MERGE_MODE).empty()) {
if (!http_req->header(HTTP_MERGE_CONDITION).empty() ||
!http_req->header(HTTP_PARTIAL_UPDATE).empty()) {
return Status::InvalidArgument(
"Not supporting merge_mode, when partial_update=true or merge_condition is set");
}
if (boost::iequals(http_req->header(HTTP_MERGE_MODE), "false")) {
request.__set_merge_mode(false);
} else if (boost::iequals(http_req->header(HTTP_MERGE_MODE), "true")) {
request.__set_merge_mode(true);
} else {
return Status::InvalidArgument("Invalid merge mode flag format. Must be bool type");
}
}
if (!http_req->header(HTTP_TRANSMISSION_COMPRESSION_TYPE).empty()) {
request.__set_transmission_compression_type(http_req->header(HTTP_TRANSMISSION_COMPRESSION_TYPE));
}
Expand Down
1 change: 1 addition & 0 deletions be/src/http/http_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ static const std::string HTTP_ENABLE_REPLICATED_STORAGE = "enable_replicated_sto
static const std::string HTTP_MERGE_CONDITION = "merge_condition";
static const std::string HTTP_LOG_REJECTED_RECORD_NUM = "log_rejected_record_num";
static const std::string HTTP_PARTIAL_UPDATE_MODE = "partial_update_mode";
static const std::string HTTP_MERGE_MODE = "merge_mode";

static const std::string HTTP_100_CONTINUE = "100-continue";
static const std::string HTTP_CHANNEL_ID = "channel_id";
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,7 @@ Status LocalTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& pa
}
options.merge_condition = params.merge_condition();
options.partial_update_mode = params.partial_update_mode();
options.merge_mode = params.merge_mode();
options.immutable_tablet_size = params.immutable_tablet_size();

auto res = AsyncDeltaWriter::open(options, _mem_tracker);
Expand Down
49 changes: 49 additions & 0 deletions be/src/storage/chunk_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,55 @@ void ChunkAggregator::aggregate() {
_has_aggregate = true;
}

void ChunkAggregator::aggregate_merge() {
if (source_exhausted()) {
return;
}

DCHECK(_source_row < _source_size) << "It's impossible";

_selective_index.clear();
_aggregate_loops.clear();

// first key is not equal with last row in previous chunk
bool previous_neq = !_is_eq[_source_row] && (_aggregate_rows > 0);

// same with last row
if (_is_eq[_source_row] == 1) {
_aggregate_loops.emplace_back(0);
}

// 1. Calculate the key rows selective arrays
// 2. Calculate the value rows that can be aggregated for each key row
uint32_t row = _source_row;
for (; row < _source_size; ++row) {
if (_is_eq[row] == 0) {
if (_aggregate_rows >= _max_aggregate_rows) {
break;
}
++_aggregate_rows;
_selective_index.emplace_back(row);
_aggregate_loops.emplace_back(1);
} else {
_aggregate_loops[_aggregate_loops.size() - 1] += 1;
}
}
SCOPED_THREAD_LOCAL_AGG_STATE_ALLOCATOR_SETTER(&kDefaultColumnAggregatorAllocator);
// 3. Copy the selected key rows
// 4. Aggregate the value rows
for (int i = 0; i < _key_fields; ++i) {
_column_aggregator[i]->aggregate_keys(_source_row, _selective_index.size(), _selective_index.data());
}

for (int i = _key_fields; i < _num_fields; ++i) {
_column_aggregator[i]->aggregate_merge_values(_source_row, _aggregate_loops,
_aggregate_loops.data(), previous_neq);
}

_source_row = row;
_has_aggregate = true;
}

bool ChunkAggregator::is_finish() {
return (_aggregate_chunk == nullptr || _aggregate_rows >= _max_aggregate_rows);
}
Expand Down
4 changes: 4 additions & 0 deletions be/src/storage/chunk_aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class ChunkAggregator {

void aggregate();

void aggregate_merge();

bool is_do_aggregate() const { return _do_aggregate; }

bool source_exhausted() const { return _source_row == _source_size; }
Expand Down Expand Up @@ -93,6 +95,8 @@ class ChunkAggregator {
// use for calculate the aggregate range covered by each aggregate key
std::vector<uint32_t> _aggregate_loops;

std::vector<std::vector<uint32_t>> _aggregate_merge_loops;

ChunkPtr _aggregate_chunk;
// the last row of non-key column is in aggregator (not in aggregate chunk) before finalize.
// in vertical compaction, there maybe only non-key column in aggregate chunk,
Expand Down
60 changes: 60 additions & 0 deletions be/src/storage/column_aggregate_func.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,43 @@ class ReplaceNullableColumnAggregator final : public ValueColumnAggregatorBase {
_null_child->aggregate_values(start, nums, aggregate_loops, previous_neq);
}

void aggregate_merge_values(int start, std::vector<uint32_t> aggregate_loops, const uint32* selective_loops, bool previous_neq) override {
auto* data = down_cast<FixedLengthColumn<uint8_t>*>(_null_child->_source_column.get())->get_data().data();

// merge previous & current
if (_aggregate_column->size() > 0 && !previous_neq) {
// check first row null
bool first_is_null = true;
for (int i = start; i < start + aggregate_loops[0]; ++i) {
if (data[i] != 1) {
first_is_null = false;
break;
}
}
if (first_is_null) {
start += aggregate_loops[0];
aggregate_loops.erase(aggregate_loops.begin());
previous_neq = true;
}
}

std::vector<uint32_t> select;
int index = start;
for (int i = 0; i < aggregate_loops.size(); ++i) {
select.emplace_back(1);
for (int j = index; j < index + aggregate_loops[i]; ++j) {
if (data[j] == 1) {
continue;
}
select[i] = j - index + 1;
}
index += aggregate_loops[i];
}

_child->aggregate_merge_values(start, aggregate_loops, select.data(), previous_neq);
_null_child->aggregate_merge_values(start, aggregate_loops, select.data(), previous_neq);
}

void finalize() override {
_child->finalize();
_null_child->finalize();
Expand Down Expand Up @@ -314,6 +351,29 @@ class AggFuncBasedValueAggregator : public ValueColumnAggregatorBase {
}
}

void aggregate_merge_values(int start, std::vector<uint32_t> aggregate_loops, const uint32* selective_loops, bool previous_neq) override {
int nums = aggregate_loops.size();
if (nums <= 0) {
return;
}

// if different with last row in previous chunk
if (previous_neq) {
append_data(_aggregate_column);
reset();
}

for (int i = 0; i < nums; ++i) {
aggregate_batch_impl(start, start + selective_loops[i], _source_column);
start += aggregate_loops[i];
// If there is another loop, append current state to result column
if (i < nums - 1) {
append_data(_aggregate_column);
reset();
}
}
}

void finalize() override {
append_data(_aggregate_column);
_aggregate_column = nullptr;
Expand Down
Loading

0 comments on commit 32502bd

Please sign in to comment.