Skip to content

Commit

Permalink
Support select vector index
Browse files Browse the repository at this point in the history
Signed-off-by: yulongzhao <[email protected]>
  • Loading branch information
yulongfufu committed Aug 19, 2024
1 parent 2e68c7e commit 8e76d2f
Show file tree
Hide file tree
Showing 60 changed files with 2,164 additions and 31 deletions.
9 changes: 9 additions & 0 deletions be/src/column/chunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ void Chunk::append_column(ColumnPtr column, const FieldPtr& field) {
check_or_die();
}

void Chunk::append_vector_column(ColumnPtr column, const FieldPtr& field, SlotId slot_id) {
DCHECK(!_cid_to_index.contains(field->id()));
_cid_to_index[field->id()] = _columns.size();
_slot_id_to_index[slot_id] = _columns.size();
_columns.emplace_back(std::move(column));
_schema->append(field);
check_or_die();
}

void Chunk::append_column(ColumnPtr column, SlotId slot_id) {
_slot_id_to_index[slot_id] = _columns.size();
_columns.emplace_back(std::move(column));
Expand Down
2 changes: 2 additions & 0 deletions be/src/column/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ class Chunk {
// schema must exist and will be updated.
void append_column(ColumnPtr column, const FieldPtr& field);

void append_vector_column(ColumnPtr column, const FieldPtr& field, SlotId slot_id);

void append_column(ColumnPtr column, SlotId slot_id);
void insert_column(size_t idx, ColumnPtr column, const FieldPtr& field);

Expand Down
6 changes: 5 additions & 1 deletion be/src/column/schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ Schema::Schema(Schema* schema, const std::vector<ColumnId>& cids)
auto ori_sort_idxes = schema->sort_key_idxes();
std::unordered_set<ColumnId> scids(ori_sort_idxes.begin(), ori_sort_idxes.end());
for (int i = 0; i < cids.size(); i++) {
DCHECK_LT(cids[i], schema->_fields.size());
if (cids[i] >= schema->_fields.size()) {
_fields.resize(_fields.size() - 1);
continue;
}
// DCHECK_LT(cids[i], schema->_fields.size());
_fields[i] = schema->_fields[cids[i]];
if (scids.find(cids[i]) != scids.end()) {
_sort_key_idxes.emplace_back(i);
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,9 @@ CONF_Int64(query_cache_capacity, "536870912");
// ranges in [1,16], default value is 4.
CONF_mInt32(query_cache_num_lanes_per_driver, "4");

// Used by vector query cache, 500MB in default
CONF_Int64(vector_query_cache_capacity, "536870912");

// Used to limit buffer size of tablet send channel.
CONF_mInt64(send_channel_buffer_limit, "67108864");

Expand Down Expand Up @@ -1359,6 +1362,9 @@ CONF_mInt64(arrow_io_coalesce_read_max_buffer_size, "8388608");
CONF_mInt64(arrow_io_coalesce_read_max_distance_size, "1048576");
CONF_mInt64(arrow_read_batch_size, "4096");

// default not to build the empty index
CONF_mInt32(config_tenann_default_build_threshold, "0");

// Set to true to enable socket_keepalive option in brpc
CONF_mBool(brpc_socket_keepalive, "false");
CONF_mBool(apply_del_vec_after_all_index_filter, "true");
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,10 @@ void OlapScanNode::_init_counter(RuntimeState* state) {
_bf_filtered_counter = ADD_CHILD_COUNTER(_scan_profile, "BloomFilterFilterRows", TUnit::UNIT, "SegmentInit");
_gin_filtered_counter = ADD_CHILD_COUNTER(_runtime_profile, "GinFilterRows", TUnit::UNIT, "SegmentInit");
_gin_filtered_timer = ADD_CHILD_TIMER(_runtime_profile, "GinFilter", "SegmentInit");
_get_row_ranges_by_vector_index_timer = ADD_CHILD_TIMER(_scan_profile, "GetVectorRowRangesTime", "SegmentInit");
_vector_search_timer = ADD_CHILD_TIMER(_scan_profile, "VectorSearchTime", "SegmentInit");
_process_vector_distance_and_id_timer =
ADD_CHILD_TIMER(_scan_profile, "ProcessVectorDistanceAndIdTime", "SegmentInit");
_seg_zm_filtered_counter = ADD_CHILD_COUNTER(_scan_profile, "SegmentZoneMapFilterRows", TUnit::UNIT, "SegmentInit");
_seg_rt_filtered_counter =
ADD_CHILD_COUNTER(_scan_profile, "SegmentRuntimeZoneMapFilterRows", TUnit::UNIT, "SegmentInit");
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ class OlapScanNode final : public starrocks::ScanNode {
RuntimeProfile::Counter* _bi_filter_timer = nullptr;
RuntimeProfile::Counter* _gin_filtered_counter = nullptr;
RuntimeProfile::Counter* _gin_filtered_timer = nullptr;
RuntimeProfile::Counter* _get_row_ranges_by_vector_index_timer = nullptr;
RuntimeProfile::Counter* _vector_search_timer = nullptr;
RuntimeProfile::Counter* _process_vector_distance_and_id_timer = nullptr;
RuntimeProfile::Counter* _pushdown_predicates_counter = nullptr;
RuntimeProfile::Counter* _rowsets_read_count = nullptr;
RuntimeProfile::Counter* _segments_read_count = nullptr;
Expand Down
60 changes: 54 additions & 6 deletions be/src/exec/pipeline/scan/olap_chunk_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ void OlapChunkSource::close(RuntimeState* state) {
Status OlapChunkSource::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ChunkSource::prepare(state));
_runtime_state = state;
const TQueryOptions& query_options = _runtime_state->query_options();
_use_vector_index = query_options.enable_use_ann;
if (_use_vector_index) {
_use_ivfpq = query_options.vector_search_options.use_ivfpq;
_vector_distance_column_name = query_options.vector_search_options.vector_distance_column_name;
}
const TOlapScanNode& thrift_olap_scan_node = _scan_node->thrift_olap_scan_node();
const TupleDescriptor* tuple_desc = state->desc_tbl().get_tuple_descriptor(thrift_olap_scan_node.tuple_id);
_slots = &tuple_desc->slots();
Expand Down Expand Up @@ -129,6 +135,11 @@ void OlapChunkSource::_init_counter(RuntimeState* state) {
const std::string segment_init_name = "SegmentInit";
_seg_init_timer = ADD_CHILD_TIMER(_runtime_profile, segment_init_name, IO_TASK_EXEC_TIMER_NAME);
_bi_filter_timer = ADD_CHILD_TIMER(_runtime_profile, "BitmapIndexFilter", segment_init_name);
_get_row_ranges_by_vector_index_timer =
ADD_CHILD_TIMER(_runtime_profile, "GetVectorRowRangesTime", segment_init_name);
_vector_search_timer = ADD_CHILD_TIMER(_runtime_profile, "VectorSearchTime", segment_init_name);
_process_vector_distance_and_id_timer =
ADD_CHILD_TIMER(_runtime_profile, "ProcessVectorDistanceAndIdTime", segment_init_name);
_bi_filtered_counter = ADD_CHILD_COUNTER(_runtime_profile, "BitmapIndexFilterRows", TUnit::UNIT, segment_init_name);
_bf_filtered_counter = ADD_CHILD_COUNTER(_runtime_profile, "BloomFilterFilterRows", TUnit::UNIT, segment_init_name);
_gin_filtered_counter = ADD_CHILD_COUNTER(_runtime_profile, "GinFilterRows", TUnit::UNIT, segment_init_name);
Expand Down Expand Up @@ -214,6 +225,21 @@ Status OlapChunkSource::_init_reader_params(const std::vector<std::unique_ptr<Ol
if (thrift_olap_scan_node.__isset.enable_gin_filter) {
_params.enable_gin_filter = thrift_olap_scan_node.enable_gin_filter;
}
_params.use_vector_index = _use_vector_index;
if (_use_vector_index) {
const TVectorSearchOptions& vector_options = _runtime_state->query_options().vector_search_options;
_params.vector_distance_column_name = _vector_distance_column_name;
_params.k = vector_options.vector_limit_k;
for (const std::string& str : vector_options.query_vector) {
_params.query_vector.push_back(std::stof(str));
}
_params.query_params = vector_options.query_params;
_params.vector_range = vector_options.vector_range;
_params.result_order = vector_options.result_order;
_params.use_ivfpq = _use_ivfpq;
_params.pq_refine_factor = vector_options.pq_refine_factor;
_params.k_factor = vector_options.k_factor;
}
if (thrift_olap_scan_node.__isset.sorted_by_keys_per_tablet) {
_params.sorted_by_keys_per_tablet = thrift_olap_scan_node.sorted_by_keys_per_tablet;
}
Expand Down Expand Up @@ -272,7 +298,16 @@ Status OlapChunkSource::_init_reader_params(const std::vector<std::unique_ptr<Ol
Status OlapChunkSource::_init_scanner_columns(std::vector<uint32_t>& scanner_columns) {
for (auto slot : *_slots) {
DCHECK(slot->is_materialized());
int32_t index = _tablet_schema->field_index(slot->col_name());
int32_t index;
if (_use_vector_index && !_use_ivfpq) {
index = _tablet_schema->field_index(slot->col_name(), _vector_distance_column_name);
if (slot->col_name() == _vector_distance_column_name) {
_params.vector_column_id = index;
_params.vector_slot_id = slot->id();
}
} else {
index = _tablet->field_index_with_max_version(slot->col_name());
}
if (index < 0) {
std::stringstream ss;
ss << "invalid field name: " << slot->col_name();
Expand All @@ -296,7 +331,12 @@ Status OlapChunkSource::_init_scanner_columns(std::vector<uint32_t>& scanner_col

Status OlapChunkSource::_init_unused_output_columns(const std::vector<std::string>& unused_output_columns) {
for (const auto& col_name : unused_output_columns) {
int32_t index = _tablet_schema->field_index(col_name);
int32_t index;
if (_use_vector_index && !_use_ivfpq) {
index = _tablet_schema->field_index(col_name, _vector_distance_column_name);
} else {
index = _tablet_schema->field_index(col_name);
}
if (index < 0) {
std::stringstream ss;
ss << "invalid field name: " << col_name;
Expand Down Expand Up @@ -451,7 +491,9 @@ Status OlapChunkSource::_init_olap_reader(RuntimeState* runtime_state) {
_prj_iter = _reader;
} else {
starrocks::Schema output_schema = ChunkHelper::convert_schema(_tablet_schema, scanner_columns);
_prj_iter = new_projection_iterator(output_schema, _reader);
_prj_iter = new_projection_iterator(output_schema, _reader, _use_vector_index && !_use_ivfpq,
_params.vector_column_id, _params.vector_slot_id,
_params.vector_distance_column_name);
}

if (!_scan_ctx->not_push_down_conjuncts().empty() || !_non_pushdown_pred_tree.empty()) {
Expand Down Expand Up @@ -505,7 +547,12 @@ Status OlapChunkSource::_init_global_dicts(TabletReaderParams* params) {
auto iter = global_dict_map.find(slot->id());
if (iter != global_dict_map.end()) {
auto& dict_map = iter->second.first;
int32_t index = _tablet_schema->field_index(slot->col_name());
int32_t index;
if (_use_vector_index && !_use_ivfpq) {
index = _tablet_schema->field_index(slot->col_name(), _vector_distance_column_name);
} else {
index = _tablet_schema->field_index(slot->col_name());
}
DCHECK(index >= 0);
global_dict->emplace(index, const_cast<GlobalDictMap*>(&dict_map));
}
Expand Down Expand Up @@ -625,8 +672,9 @@ void OlapChunkSource::_update_counter() {

COUNTER_UPDATE(_bi_filtered_counter, _reader->stats().rows_bitmap_index_filtered);
COUNTER_UPDATE(_bi_filter_timer, _reader->stats().bitmap_index_filter_timer);
COUNTER_UPDATE(_gin_filtered_counter, _reader->stats().rows_gin_filtered);
COUNTER_UPDATE(_gin_filtered_timer, _reader->stats().gin_index_filter_ns);
COUNTER_UPDATE(_get_row_ranges_by_vector_index_timer, _reader->stats().get_row_ranges_by_vector_index_timer);
COUNTER_UPDATE(_vector_search_timer, _reader->stats().vector_search_timer);
COUNTER_UPDATE(_process_vector_distance_and_id_timer, _reader->stats().process_vector_distance_and_id_timer);
COUNTER_UPDATE(_block_seek_counter, _reader->stats().block_seek_num);

COUNTER_UPDATE(_rowsets_read_count, _reader->stats().rowsets_read_count);
Expand Down
15 changes: 14 additions & 1 deletion be/src/exec/pipeline/scan/olap_chunk_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class OlapChunkSource final : public ChunkSource {
OlapScanNode* _scan_node;
OlapScanContext* _scan_ctx;

const int64_t _limit; // -1: no limit
int64_t _limit; // -1: no limit
TInternalScanRange* _scan_range;

PredicateTree _non_pushdown_pred_tree;
Expand Down Expand Up @@ -106,6 +106,16 @@ class OlapChunkSource final : public ChunkSource {

std::vector<ColumnAccessPathPtr> _column_access_paths;

bool _use_vector_index = false;

bool _use_ivfpq = false;

std::string _vector_distance_column_name;

double _vector_range;

int result_order;

// The following are profile meatures
int64_t _num_rows_read = 0;

Expand Down Expand Up @@ -150,6 +160,9 @@ class OlapChunkSource final : public ChunkSource {
RuntimeProfile::Counter* _bi_filter_timer = nullptr;
RuntimeProfile::Counter* _gin_filtered_counter = nullptr;
RuntimeProfile::Counter* _gin_filtered_timer = nullptr;
RuntimeProfile::Counter* _get_row_ranges_by_vector_index_timer = nullptr;
RuntimeProfile::Counter* _vector_search_timer = nullptr;
RuntimeProfile::Counter* _process_vector_distance_and_id_timer = nullptr;
RuntimeProfile::Counter* _pushdown_predicates_counter = nullptr;
RuntimeProfile::Counter* _non_pushdown_predicates_counter = nullptr;
RuntimeProfile::Counter* _rowsets_read_count = nullptr;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exprs/column_ref.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ StatusOr<ColumnPtr> ColumnRef::evaluate_checked(ExprContext* context, Chunk* ptr
ColumnPtr& ColumnRef::get_column(Expr* expr, Chunk* chunk) {
auto* ref = (ColumnRef*)expr;
ColumnPtr& column = (chunk)->get_column_by_slot_id(ref->slot_id());
return column;
return column;
}

} // namespace starrocks
Loading

0 comments on commit 8e76d2f

Please sign in to comment.