Skip to content

Commit

Permalink
add DictSinkOperatorX
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryange committed Jan 6, 2025
1 parent ef9d243 commit 68ae141
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 17 deletions.
104 changes: 104 additions & 0 deletions be/src/pipeline/exec/dict_sink_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "dict_sink_operator.h"

#include "common/status.h"
#include "vec/functions/dictionary_factory.h"
#include "vec/functions/hash_map_dictionary.h"
#include "vec/functions/ip_address_dictionary.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"
Status DictSinkLocalState::load_dict(RuntimeState* state) {
if (_dict_input_block.empty()) {
return Status::OK();
}

auto& p = _parent->cast<DictSinkOperatorX>();

auto input_block = _dict_input_block.to_block();
auto key_data = input_block.get_by_position(p.key_column_id);

vectorized::ColumnsWithTypeAndName attribute_data;
std::string dict_name = p.dictionary_name;
for (int64_t i = 0; i < p.attribute_column_ids.size(); i++) {
auto att_col_id = p.attribute_column_ids[i];
auto att_name = p.attribute_name[i];
auto att_data = input_block.get_by_position(att_col_id);
att_data.name = att_name;
attribute_data.push_back(att_data);
}

vectorized::DictionaryPtr dict = nullptr;

switch (p.layout_type) {
case TDictLayoutType::type::IP_TRIE: {
dict = create_ip_trie_dict_from_column(dict_name, key_data, attribute_data);
break;
}
case TDictLayoutType::type::HASH_MAP: {
dict = create_hash_map_dict_from_column(dict_name, key_data, attribute_data);
break;
}
default:
return Status::InternalError("Unknown layout type");
}
if (dict == nullptr) {
return Status::InternalError("Failed to create dictionary");
}
ExecEnv::GetInstance()->dict_factory()->register_dict(dict);
return Status::OK();
}

DictSinkOperatorX::DictSinkOperatorX(int operator_id, const TDictSink& dict_sink)
: DataSinkOperatorX(operator_id, 0),
dictionary_id(dict_sink.dictionary_id),
version_id(dict_sink.version_id),
dictionary_name(dict_sink.dictionary_name),
key_column_id(dict_sink.key_column_id),
attribute_column_ids(dict_sink.attribute_column_ids),
attribute_name(dict_sink.attribute_name),
layout_type(dict_sink.layout_type) {}

Status DictSinkOperatorX::open(RuntimeState* state) {
if (attribute_column_ids.size() != attribute_name.size()) {
return Status::InternalError("attribute_column_ids.size() != attribute_name.size()");
}
return Status::OK();
}

Status DictSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

if (local_state._dict_input_block.empty()) {
local_state._dict_input_block = in_block->clone_empty();
}

RETURN_IF_ERROR(local_state._dict_input_block.merge(std::move(*in_block)));

if (eos) {
RETURN_IF_ERROR(local_state.load_dict(state));
}

return Status::OK();
}

} // namespace doris::pipeline
#include "common/compile_check_end.h"
62 changes: 62 additions & 0 deletions be/src/pipeline/exec/dict_sink_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <gen_cpp/DataSinks_types.h>

#include <cstdint>

#include "operator.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"
class DictSinkLocalState final : public PipelineXSinkLocalState<BasicSharedState> {
ENABLE_FACTORY_CREATOR(DictSinkLocalState);
using Base = PipelineXSinkLocalState<BasicSharedState>;

public:
DictSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) {}

private:
Status load_dict(RuntimeState* state);
friend class DictSinkOperatorX;

vectorized::MutableBlock _dict_input_block;
};

class DictSinkOperatorX final : public DataSinkOperatorX<DictSinkLocalState> {
public:
using Base = DataSinkOperatorX<DictSinkLocalState>;
DictSinkOperatorX(int operator_id, const TDictSink& dict_sink);
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;

private:
friend class DictSinkLocalState;
[[maybe_unused]] const int64_t dictionary_id;
[[maybe_unused]] const int64_t version_id;
const std::string dictionary_name;
const int64_t key_column_id;
const std::vector<int64_t> attribute_column_ids;
const std::vector<std::string> attribute_name;
const TDictLayoutType::type layout_type;
};

} // namespace doris::pipeline
#include "common/compile_check_end.h"
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "pipeline/exec/cache_sink_operator.h"
#include "pipeline/exec/cache_source_operator.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/dict_sink_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_operator.h"
#include "pipeline/exec/empty_set_operator.h"
#include "pipeline/exec/es_scan_operator.h"
Expand Down Expand Up @@ -693,6 +694,7 @@ DECLARE_OPERATOR(SetSinkLocalState<false>)
DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
DECLARE_OPERATOR(CacheSinkLocalState)
DECLARE_OPERATOR(DictSinkLocalState)

#undef DECLARE_OPERATOR

Expand Down
9 changes: 9 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "pipeline/exec/cache_sink_operator.h"
#include "pipeline/exec/cache_source_operator.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/dict_sink_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_operator.h"
#include "pipeline/exec/empty_set_operator.h"
#include "pipeline/exec/es_scan_operator.h"
Expand Down Expand Up @@ -1026,6 +1027,14 @@ Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
thrift_sink.result_sink));
break;
}
case TDataSinkType::DICT_SINK: {
if (!thrift_sink.__isset.dict_sink) {
return Status::InternalError("Missing dict sink.");
}

_sink.reset(new DictSinkOperatorX(next_sink_operator_id(), thrift_sink.dict_sink));
break;
}
case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK:
case TDataSinkType::OLAP_TABLE_SINK: {
if (state->query_options().enable_memtable_on_sink_node &&
Expand Down
17 changes: 0 additions & 17 deletions be/src/vec/functions/dictionary_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,21 +286,4 @@ void DictionaryFactory::register_dict_when_load_be() {
// register_dict(dict);
}
}

void test_sink(const TDictSink& dict_sink, Block* block) {
auto key_data = block->get_by_position(dict_sink.key_column_id);

ColumnsWithTypeAndName attribute_data;
for (int64_t i = 0; i < dict_sink.attribute_column_ids.size(); i++) {
auto att_col_id = dict_sink.attribute_column_ids[i];
auto att_name = dict_sink.attribute_name[i];
auto att_data = block->get_by_position(att_col_id);
att_data.name = att_name;
attribute_data.push_back(att_data);
}

auto dict =
create_ip_trie_dict_from_column(dict_sink.dictionary_name, key_data, attribute_data);
}

} // namespace doris::vectorized
2 changes: 2 additions & 0 deletions gensrc/thrift/DataSinks.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum TDataSinkType {
GROUP_COMMIT_BLOCK_SINK,
HIVE_TABLE_SINK,
ICEBERG_TABLE_SINK,
DICT_SINK,
}

enum TResultSinkType {
Expand Down Expand Up @@ -445,4 +446,5 @@ struct TDataSink {
12: optional TMultiCastDataStreamSink multi_cast_stream_sink
13: optional THiveTableSink hive_table_sink
14: optional TIcebergTableSink iceberg_table_sink
15: optional TDictSink dict_sink
}

0 comments on commit 68ae141

Please sign in to comment.