From 45d72ace192a6dda42ca5eb2fad1d492dd55d197 Mon Sep 17 00:00:00 2001 From: Seaven Date: Fri, 20 Sep 2024 15:18:01 +0800 Subject: [PATCH] [Enhancement][FlatJson] Improve flat json performace and extract strategy (#50696) Signed-off-by: Seaven --- be/src/column/column_access_path.cpp | 26 +- be/src/column/json_column.cpp | 22 ++ be/src/column/json_column.h | 2 + be/src/common/config.h | 3 + be/src/connector/lake_connector.cpp | 8 +- be/src/exec/olap_scan_prepare.cpp | 5 + be/src/exprs/json_functions.cpp | 3 + be/src/storage/chunk_helper.cpp | 30 +- be/src/storage/chunk_helper.h | 3 + be/src/storage/meta_reader.cpp | 42 ++- be/src/storage/olap_meta_reader.cpp | 5 - be/src/storage/rowset/array_column_writer.cpp | 2 + be/src/storage/rowset/bloom_filter.h | 4 +- be/src/storage/rowset/column_reader.cpp | 283 ++++++++++-------- be/src/storage/rowset/column_reader.h | 5 + .../storage/rowset/json_column_compactor.cpp | 2 + .../storage/rowset/json_column_iterator.cpp | 7 +- be/src/storage/rowset/json_column_writer.cpp | 4 + be/src/storage/rowset/json_column_writer.h | 2 + be/src/storage/rowset/map_column_writer.cpp | 4 + .../storage/rowset/struct_column_writer.cpp | 2 + be/src/types/logical_type.h | 14 +- be/src/util/json_flattener.cpp | 252 +++++++++++----- be/src/util/json_flattener.h | 33 +- be/test/exprs/flat_json_functions_test.cpp | 4 +- .../rowset/flat_json_column_compact_test.cpp | 107 +++---- .../rowset/flat_json_column_rw_test.cpp | 67 +++-- be/test/util/json_flattener_test.cpp | 99 +++++- .../com/starrocks/catalog/FunctionSet.java | 6 + gensrc/proto/segment.proto | 2 + 30 files changed, 690 insertions(+), 358 deletions(-) diff --git a/be/src/column/column_access_path.cpp b/be/src/column/column_access_path.cpp index 05acb7fc9a8ae..d64df12402257 100644 --- a/be/src/column/column_access_path.cpp +++ b/be/src/column/column_access_path.cpp @@ -15,6 +15,7 @@ #include "column/column_access_path.h" #include +#include #include #include @@ -31,6 +32,7 @@ #include "runtime/runtime_state.h" #include "runtime/types.h" #include "types/logical_type.h" +#include "util/json_flattener.h" namespace starrocks { @@ -209,31 +211,13 @@ StatusOr> ColumnAccessPath::create(const TAcce return std::move(p); } -std::pair _split_path(const std::string& path) { - size_t pos = 0; - if (path.starts_with("\"")) { - pos = path.find('\"', 1); - DCHECK(pos != std::string::npos); - } - pos = path.find('.', pos); - std::string key; - std::string next; - if (pos == std::string::npos) { - key = path; - } else { - key = path.substr(0, pos); - next = path.substr(pos + 1); - } - - return {key, next}; -} - ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPath* root) { if (path.empty()) { return root; } - auto [key, next] = _split_path(path); + auto [key_view, next] = JsonFlatPath::split_path(path); + auto key = std::string(key_view); auto child = root->get_child(key); if (child == nullptr) { auto n = ColumnAccessPath::create(TAccessPathType::FIELD, key, 0, root->absolute_path()); @@ -241,7 +225,7 @@ ColumnAccessPath* insert_json_path_impl(const std::string& path, ColumnAccessPat root->children().emplace_back(std::move(n.value())); child = root->children().back().get(); } - return insert_json_path_impl(next, child); + return insert_json_path_impl(std::string(next), child); } void ColumnAccessPath::insert_json_path(ColumnAccessPath* root, LogicalType type, const std::string& path) { diff --git a/be/src/column/json_column.cpp b/be/src/column/json_column.cpp index f8841a1eec71e..107b95fd89cb4 100644 --- a/be/src/column/json_column.cpp +++ b/be/src/column/json_column.cpp @@ -469,6 +469,28 @@ bool JsonColumn::has_flat_column(const std::string& path) const { return false; } +bool JsonColumn::is_equallity_schema(const Column* other) const { + if (!other->is_json()) { + return false; + } + auto* other_json = down_cast(other); + if (this->is_flat_json() && other_json->is_flat_json()) { + if (this->_flat_column_paths.size() != other_json->_flat_column_paths.size()) { + return false; + } + for (size_t i = 0; i < this->_flat_column_paths.size(); i++) { + if (this->_flat_column_paths[i] != other_json->_flat_column_paths[i]) { + return false; + } + if (this->_flat_column_types[i] != other_json->_flat_column_types[i]) { + return false; + } + } + return _flat_columns.size() == other_json->_flat_columns.size(); + } + return !this->is_flat_json() && !other_json->is_flat_json(); +} + std::string JsonColumn::debug_flat_paths() const { if (_flat_column_paths.empty()) { return "[]"; diff --git a/be/src/column/json_column.h b/be/src/column/json_column.h index 935bb84333ed8..0ff977681fe94 100644 --- a/be/src/column/json_column.h +++ b/be/src/column/json_column.h @@ -136,6 +136,8 @@ class JsonColumn final : public ColumnFactory, JsonColum void set_flat_columns(const std::vector& paths, const std::vector& types, const Columns& flat_columns); + bool is_equallity_schema(const Column* other) const; + std::string debug_flat_paths() const; private: diff --git a/be/src/common/config.h b/be/src/common/config.h index 07b3da86f2bd5..6ce587c8042db 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1355,6 +1355,9 @@ CONF_mDouble(json_flat_sparsity_factor, "0.9"); // the maximum number of extracted JSON sub-field CONF_mInt32(json_flat_column_max, "100"); +// for whitelist on flat json remain data, max set 1kb +CONF_mInt32(json_flat_remain_filter_max_bytes, "1024"); + // Allowable intervals for continuous generation of pk dumps // Disable when pk_dump_interval_seconds <= 0 CONF_mInt64(pk_dump_interval_seconds, "3600"); // 1 hour diff --git a/be/src/connector/lake_connector.cpp b/be/src/connector/lake_connector.cpp index 6234460278563..7270ecd76c926 100644 --- a/be/src/connector/lake_connector.cpp +++ b/be/src/connector/lake_connector.cpp @@ -495,6 +495,9 @@ Status LakeDataSource::build_scan_range(RuntimeState* state) { } void LakeDataSource::init_counter(RuntimeState* state) { + _access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT); + _access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT); + _bytes_read_counter = ADD_COUNTER(_runtime_profile, "BytesRead", TUnit::BYTES); _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead", TUnit::UNIT); @@ -590,9 +593,6 @@ void LakeDataSource::init_counter(RuntimeState* state) { _prefetch_hit_counter = ADD_CHILD_COUNTER(_runtime_profile, "PrefetchHitCount", TUnit::UNIT, io_statistics_name); _prefetch_wait_finish_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchWaitFinishTime", io_statistics_name); _prefetch_pending_timer = ADD_CHILD_TIMER(_runtime_profile, "PrefetchPendingTime", io_statistics_name); - - _access_path_hits_counter = ADD_COUNTER(_runtime_profile, "AccessPathHits", TUnit::UNIT); - _access_path_unhits_counter = ADD_COUNTER(_runtime_profile, "AccessPathUnhits", TUnit::UNIT); } void LakeDataSource::update_realtime_counter(Chunk* chunk) { @@ -718,7 +718,7 @@ void LakeDataSource::update_counter() { _runtime_state->update_num_datacache_count(1); } - if (_reader->stats().flat_json_hits.size() > 0) { + if (_reader->stats().flat_json_hits.size() > 0 || _reader->stats().merge_json_hits.size() > 0) { std::string access_path_hits = "AccessPathHits"; int64_t total = 0; for (auto& [k, v] : _reader->stats().flat_json_hits) { diff --git a/be/src/exec/olap_scan_prepare.cpp b/be/src/exec/olap_scan_prepare.cpp index 826647e4d1fce..cf29a94c01e78 100644 --- a/be/src/exec/olap_scan_prepare.cpp +++ b/be/src/exec/olap_scan_prepare.cpp @@ -351,10 +351,13 @@ static bool is_not_in(const auto* pred) { } }; +// clang-format off template template requires(!lt_is_date) Status ChunkPredicateBuilder::normalize_in_or_equal_predicate( const SlotDescriptor& slot, ColumnValueRange* range) { + // clang-format on + Status status; for (size_t i = 0; i < _exprs.size(); i++) { @@ -412,11 +415,13 @@ requires(!lt_is_date) Status ChunkPredicateBuilder::normalize return Status::OK(); } +// clang-format off // explicit specialization for DATE. template template requires lt_is_date Status ChunkPredicateBuilder::normalize_in_or_equal_predicate( const SlotDescriptor& slot, ColumnValueRange* range) { + // clang-format on Status status; for (size_t i = 0; i < _exprs.size(); i++) { diff --git a/be/src/exprs/json_functions.cpp b/be/src/exprs/json_functions.cpp index e4158bc445703..ae080fda30bfc 100644 --- a/be/src/exprs/json_functions.cpp +++ b/be/src/exprs/json_functions.cpp @@ -525,6 +525,9 @@ static StatusOr _extract_with_hyper(NativeJsonState* state, const std state->real_path.paths.emplace_back(p); continue; } + if (p.key.find('.') != std::string::npos) { + in_flat = false; + } if (in_flat) { flat_path += "." + p.key; if (p.array_selector->type != NONE) { diff --git a/be/src/storage/chunk_helper.cpp b/be/src/storage/chunk_helper.cpp index e948e9ca94afd..be635e55aa3d2 100644 --- a/be/src/storage/chunk_helper.cpp +++ b/be/src/storage/chunk_helper.cpp @@ -18,6 +18,7 @@ #include "column/chunk.h" #include "column/column_helper.h" #include "column/column_pool.h" +#include "column/json_column.h" #include "column/map_column.h" #include "column/schema.h" #include "column/struct_column.h" @@ -546,6 +547,32 @@ void ChunkAccumulator::finalize() { _accumulate_count = 0; } +bool ChunkPipelineAccumulator::_check_json_schema_equallity(const Chunk* one, const Chunk* two) { + if (one->num_columns() != two->num_columns()) { + return false; + } + + for (size_t i = 0; i < one->num_columns(); i++) { + auto& c1 = one->get_column_by_index(i); + auto& c2 = two->get_column_by_index(i); + const auto* a1 = ColumnHelper::get_data_column(c1.get()); + const auto* a2 = ColumnHelper::get_data_column(c2.get()); + + if (a1->is_json() && a2->is_json()) { + auto json1 = down_cast(a1); + if (!json1->is_equallity_schema(a2)) { + return false; + } + } else if (a1->is_json() || a2->is_json()) { + // never hit + DCHECK_EQ(a1->is_json(), a2->is_json()); + return false; + } + } + + return true; +} + void ChunkPipelineAccumulator::push(const ChunkPtr& chunk) { chunk->check_or_die(); DCHECK(_out_chunk == nullptr); @@ -553,7 +580,8 @@ void ChunkPipelineAccumulator::push(const ChunkPtr& chunk) { _in_chunk = chunk; _mem_usage = chunk->bytes_usage(); } else if (_in_chunk->num_rows() + chunk->num_rows() > _max_size || - _in_chunk->owner_info() != chunk->owner_info() || _in_chunk->owner_info().is_last_chunk()) { + _in_chunk->owner_info() != chunk->owner_info() || _in_chunk->owner_info().is_last_chunk() || + !_check_json_schema_equallity(chunk.get(), _in_chunk.get())) { _out_chunk = std::move(_in_chunk); _in_chunk = chunk; _mem_usage = chunk->bytes_usage(); diff --git a/be/src/storage/chunk_helper.h b/be/src/storage/chunk_helper.h index 0525b68352bcb..94c0a27234cfc 100644 --- a/be/src/storage/chunk_helper.h +++ b/be/src/storage/chunk_helper.h @@ -130,6 +130,9 @@ class ChunkPipelineAccumulator { bool need_input() const; bool is_finished() const; +private: + static bool _check_json_schema_equallity(const Chunk* one, const Chunk* two); + private: static constexpr double LOW_WATERMARK_ROWS_RATE = 0.75; // 0.75 * chunk_size #ifdef BE_TEST diff --git a/be/src/storage/meta_reader.cpp b/be/src/storage/meta_reader.cpp index a59d6031be41b..d1e006c7adeb3 100644 --- a/be/src/storage/meta_reader.cpp +++ b/be/src/storage/meta_reader.cpp @@ -14,6 +14,7 @@ #include "storage/meta_reader.h" +#include #include #include @@ -237,12 +238,43 @@ Status SegmentMetaCollecter::_collect(const std::string& name, ColumnId cid, Col return Status::NotSupported("Not Support Collect Meta: " + name); } +std::string append_read_name(const ColumnReader* col_reader) { + std::stringstream stream; + if (col_reader->column_type() == LogicalType::TYPE_JSON) { + for (const auto& sub_reader : *col_reader->sub_readers()) { + stream << fmt::format("{}({}), ", sub_reader->name(), type_to_string(sub_reader->column_type())); + } + return stream.str().substr(0, stream.view().size() - 2); + } + if (col_reader->column_type() == LogicalType::TYPE_ARRAY) { + auto child = append_read_name((*col_reader->sub_readers())[0].get()); + if (!child.empty()) { + stream << "[" << child << "]"; + } + } else if (col_reader->column_type() == LogicalType::TYPE_MAP) { + auto child = append_read_name((*col_reader->sub_readers())[1].get()); + if (!child.empty()) { + stream << "{" << child << "}"; + } + } else if (col_reader->column_type() == LogicalType::TYPE_STRUCT) { + for (const auto& sub_reader : *col_reader->sub_readers()) { + auto child = append_read_name(sub_reader.get()); + if (!child.empty()) { + stream << sub_reader->name() << "(" << child << "), "; + } + } + return stream.str().substr(0, stream.view().size() - 2); + } + return stream.str(); +} + Status SegmentMetaCollecter::_collect_flat_json(ColumnId cid, Column* column) { const ColumnReader* col_reader = _segment->column(cid); if (col_reader == nullptr) { return Status::NotFound("don't found column"); } - if (col_reader->column_type() != TYPE_JSON) { + + if (!is_semi_type(col_reader->column_type())) { return Status::InternalError("column type mismatch"); } @@ -253,11 +285,11 @@ Status SegmentMetaCollecter::_collect_flat_json(ColumnId cid, Column* column) { ArrayColumn* array_column = down_cast(column); size_t size = array_column->offsets_column()->get_data().back(); - for (const auto& sub_reader : *col_reader->sub_readers()) { - std::string str = fmt::format("{}({})", sub_reader->name(), type_to_string(sub_reader->column_type())); - array_column->elements_column()->append_datum(Slice(str)); + auto res = append_read_name(col_reader); + if (!res.empty()) { + array_column->elements_column()->append_datum(Slice(res)); + array_column->offsets_column()->append(size + 1); } - array_column->offsets_column()->append(size + col_reader->sub_readers()->size()); return Status::OK(); } diff --git a/be/src/storage/olap_meta_reader.cpp b/be/src/storage/olap_meta_reader.cpp index f492033466081..1b554d73e25d2 100644 --- a/be/src/storage/olap_meta_reader.cpp +++ b/be/src/storage/olap_meta_reader.cpp @@ -111,11 +111,6 @@ Status OlapMetaReader::_init_seg_meta_collecters(const OlapMetaReaderParams& par Status OlapMetaReader::_get_segments(const TabletSharedPtr& tablet, const Version& version, std::vector* segments) { - if (tablet->updates() != nullptr) { - LOG(INFO) << "Skipped Update tablet"; - return Status::OK(); - } - Status acquire_rowset_st; { std::shared_lock l(tablet->get_header_lock()); diff --git a/be/src/storage/rowset/array_column_writer.cpp b/be/src/storage/rowset/array_column_writer.cpp index 7dbe367a5b9a0..6fae8c973da00 100644 --- a/be/src/storage/rowset/array_column_writer.cpp +++ b/be/src/storage/rowset/array_column_writer.cpp @@ -72,6 +72,8 @@ StatusOr> create_array_column_writer(const ColumnW element_options.need_zone_map = false; element_options.need_bloom_filter = element_column.is_bf_column(); element_options.need_bitmap_index = element_column.has_bitmap_index(); + element_options.need_flat = opts.need_flat; + element_options.is_compaction = opts.is_compaction; if (element_column.type() == LogicalType::TYPE_ARRAY) { if (element_options.need_bloom_filter) { return Status::NotSupported("Do not support bloom filter for array type"); diff --git a/be/src/storage/rowset/bloom_filter.h b/be/src/storage/rowset/bloom_filter.h index 43c54599d71ae..5ecb004c69b4d 100644 --- a/be/src/storage/rowset/bloom_filter.h +++ b/be/src/storage/rowset/bloom_filter.h @@ -172,13 +172,15 @@ class BloomFilter { virtual void add_hash(uint64_t hash) = 0; virtual bool test_hash(uint64_t hash) const = 0; + static uint32_t estimate_bytes(uint64_t n, double fpp) { return _optimal_bit_num(n, fpp) / 8 + 1; } + private: // Compute the optimal bit number according to the following rule: // m = -n * ln(fpp) / (ln(2) ^ 2) // n: expected distinct record number // fpp: false positive probablity // the result will be power of 2 - uint32_t _optimal_bit_num(uint64_t n, double fpp); + static uint32_t _optimal_bit_num(uint64_t n, double fpp); protected: // bloom filter data diff --git a/be/src/storage/rowset/column_reader.cpp b/be/src/storage/rowset/column_reader.cpp index e7db6b181f582..da883d0864653 100644 --- a/be/src/storage/rowset/column_reader.cpp +++ b/be/src/storage/rowset/column_reader.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include "column/column.h" #include "column/column_access_path.h" @@ -46,7 +47,9 @@ #include "column/datum_convert.h" #include "common/compiler_util.h" #include "common/logging.h" +#include "common/status.h" #include "common/statusor.h" +#include "gen_cpp/segment.pb.h" #include "runtime/types.h" #include "storage/column_predicate.h" #include "storage/index/index_descriptor.h" @@ -135,6 +138,14 @@ Status ColumnReader::_init(ColumnMetaPB* meta, const TabletColumn* column) { CHECK_EQ(kJsonMetaDefaultFormatVersion, json_meta.format_version()) << "Only format_version=1 is supported"; _is_flat_json = json_meta.is_flat(); _has_remain = json_meta.has_remain(); + + if (json_meta.has_remain_filter()) { + DCHECK(_has_remain); + DCHECK(!json_meta.remain_filter().empty()); + RETURN_IF_ERROR(BloomFilter::create(BLOCK_BLOOM_FILTER, &_remain_filter)); + RETURN_IF_ERROR(_remain_filter->init(json_meta.remain_filter().data(), json_meta.remain_filter().size(), + HASH_MURMUR3_X64_64)); + } } if (is_scalar_field_type(delegate_type(_column_type))) { RETURN_IF_ERROR(EncodingInfo::get(delegate_type(_column_type), meta->encoding(), &_encoding_info)); @@ -680,133 +691,7 @@ StatusOr> ColumnReader::_create_merge_struct_ite StatusOr> ColumnReader::new_iterator(ColumnAccessPath* path, const TabletColumn* column) { if (_column_type == LogicalType::TYPE_JSON) { - auto json_iter = std::make_unique(this); - - // access sub columns - std::vector access_paths; - std::vector target_paths; - std::vector target_types; - if (path != nullptr && !path->children().empty()) { - auto field_name = path->absolute_path(); - path->get_all_leafs(&access_paths); - for (auto& p : access_paths) { - // use absolute path, not relative path - // root path is field name, we remove it - target_paths.emplace_back(p->absolute_path().substr(field_name.size() + 1)); - target_types.emplace_back(p->value_type().type); - } - } - - if (!_is_flat_json) { - if (path == nullptr || path->children().empty()) { - return json_iter; - } - // dynamic flattern - // we must dynamic flat json, because we don't know other segment wasn't the paths - return create_json_dynamic_flat_iterator(std::move(json_iter), target_paths, target_types, - path->is_from_compaction()); - } - - std::vector source_paths; - std::vector source_types; - std::unique_ptr null_iter; - std::vector> all_iters; - size_t start = is_nullable() ? 1 : 0; - size_t end = _has_remain ? _sub_readers->size() - 1 : _sub_readers->size(); - if (is_nullable()) { - ASSIGN_OR_RETURN(null_iter, (*_sub_readers)[0]->new_iterator()); - } - - if (path == nullptr || path->children().empty() || path->is_from_compaction()) { - DCHECK(_is_flat_json); - for (size_t i = start; i < end; i++) { - const auto& rd = (*_sub_readers)[i]; - std::string name = rd->name(); - ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); - source_paths.emplace_back(name); - source_types.emplace_back(rd->column_type()); - all_iters.emplace_back(std::move(iter)); - } - - if (_has_remain) { - const auto& rd = (*_sub_readers)[end]; - ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); - all_iters.emplace_back(std::move(iter)); - } - - if (path == nullptr || path->children().empty()) { - // access whole json - return create_json_merge_iterator(this, std::move(null_iter), std::move(all_iters), source_paths, - source_types); - } else { - DCHECK(path->is_from_compaction()); - return create_json_flat_iterator(this, std::move(null_iter), std::move(all_iters), target_paths, - target_types, source_paths, source_types, true); - } - } - - bool need_remain = false; - for (size_t k = 0; k < target_paths.size(); k++) { - auto& target = target_paths[k]; - size_t i = start; - for (; i < end; i++) { - const auto& rd = (*_sub_readers)[i]; - std::string name = rd->name(); - // target: b.b2.b3 - // source: b.b2 - if (target == name || target.starts_with(name + ".")) { - ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); - source_paths.emplace_back(name); - source_types.emplace_back(rd->column_type()); - all_iters.emplace_back(std::move(iter)); - break; - } else if (name.starts_with(target + ".")) { - // target: b.b2 - // source: b.b2.b3 - if (target_types[k] != TYPE_JSON && !is_string_type(target_types[k])) { - // don't need column and remain - break; - } - need_remain = true; - ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); - source_paths.emplace_back(name); - source_types.emplace_back(rd->column_type()); - all_iters.emplace_back(std::move(iter)); - } - } - need_remain |= (i == end); - } - - if (_has_remain && need_remain) { - const auto& rd = (*_sub_readers)[end]; - ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); - all_iters.emplace_back(std::move(iter)); - } - - if (all_iters.empty()) { - DCHECK(!_sub_readers->empty()); - DCHECK(source_paths.empty()); - // has none remain and can't hit any column, we read any one - // why not return null directly, segemnt iterater need ordinal index... - // it's canbe optimized - size_t index = start; - LogicalType type = (*_sub_readers)[start]->column_type(); - for (size_t i = start + 1; i < end; i++) { - const auto& rd = (*_sub_readers)[i]; - if (type < rd->column_type()) { - index = i; - type = rd->column_type(); - } - } - const auto& rd = (*_sub_readers)[index]; - ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); - all_iters.emplace_back(std::move(iter)); - source_paths.emplace_back(rd->name()); - source_types.emplace_back(rd->column_type()); - } - - return create_json_flat_iterator(this, std::move(null_iter), std::move(all_iters), target_paths, target_types, - source_paths, source_types); + return _new_json_iterator(path, column); } else if (is_scalar_field_type(delegate_type(_column_type))) { return std::make_unique(this); } else if (_column_type == LogicalType::TYPE_ARRAY) { @@ -892,6 +777,150 @@ StatusOr> ColumnReader::new_iterator(ColumnAcces } } +StatusOr> ColumnReader::_new_json_iterator(ColumnAccessPath* path, + const TabletColumn* column) { + DCHECK(_column_type == LogicalType::TYPE_JSON); + auto json_iter = std::make_unique(this); + // access sub columns + std::vector target_leafs; + std::vector target_paths; + std::vector target_types; + if (path != nullptr && !path->children().empty()) { + auto field_name = path->absolute_path(); + path->get_all_leafs(&target_leafs); + for (auto& p : target_leafs) { + // use absolute path, not relative path + // root path is field name, we remove it + target_paths.emplace_back(p->absolute_path().substr(field_name.size() + 1)); + target_types.emplace_back(p->value_type().type); + } + } + + if (!_is_flat_json) { + if (path == nullptr || path->children().empty()) { + return json_iter; + } + // dynamic flattern + // we must dynamic flat json, because we don't know other segment wasn't the paths + return create_json_dynamic_flat_iterator(std::move(json_iter), target_paths, target_types, + path->is_from_compaction()); + } + + std::vector source_paths; + std::vector source_types; + std::unique_ptr null_iter; + std::vector> all_iters; + size_t start = is_nullable() ? 1 : 0; + size_t end = _has_remain ? _sub_readers->size() - 1 : _sub_readers->size(); + if (is_nullable()) { + ASSIGN_OR_RETURN(null_iter, (*_sub_readers)[0]->new_iterator()); + } + + if (path == nullptr || path->children().empty() || path->is_from_compaction()) { + DCHECK(_is_flat_json); + for (size_t i = start; i < end; i++) { + const auto& rd = (*_sub_readers)[i]; + std::string name = rd->name(); + ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); + source_paths.emplace_back(name); + source_types.emplace_back(rd->column_type()); + all_iters.emplace_back(std::move(iter)); + } + + if (_has_remain) { + const auto& rd = (*_sub_readers)[end]; + ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); + all_iters.emplace_back(std::move(iter)); + } + + if (path == nullptr || path->children().empty()) { + // access whole json + return create_json_merge_iterator(this, std::move(null_iter), std::move(all_iters), source_paths, + source_types); + } else { + DCHECK(path->is_from_compaction()); + return create_json_flat_iterator(this, std::move(null_iter), std::move(all_iters), target_paths, + target_types, source_paths, source_types, true); + } + } + + bool need_remain = false; + std::set check_paths; + for (size_t k = 0; k < target_paths.size(); k++) { + auto& target = target_paths[k]; + size_t i = start; + for (; i < end; i++) { + const auto& rd = (*_sub_readers)[i]; + std::string name = rd->name(); + if (check_paths.contains(name)) { + continue; + } + // target: b.b2.b3 + // source: b.b2 + if (target == name || target.starts_with(name + ".")) { + ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); + source_paths.emplace_back(name); + source_types.emplace_back(rd->column_type()); + all_iters.emplace_back(std::move(iter)); + check_paths.emplace(name); + break; + } else if (name.starts_with(target + ".")) { + // target: b.b2 + // source: b.b2.b3 + if (target_types[k] != TYPE_JSON && !is_string_type(target_types[k])) { + // don't need column and remain + break; + } + + if (_remain_filter != nullptr && + !_remain_filter->test_bytes(target_leafs[i]->path().data(), target_leafs[i]->path().size())) { + need_remain = false; + } else { + need_remain = true; + } + + ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); + source_paths.emplace_back(name); + source_types.emplace_back(rd->column_type()); + all_iters.emplace_back(std::move(iter)); + check_paths.emplace(name); + } + } + need_remain |= (i == end); + } + + if (_has_remain && need_remain) { + const auto& rd = (*_sub_readers)[end]; + ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); + all_iters.emplace_back(std::move(iter)); + } + + if (all_iters.empty()) { + DCHECK(!_sub_readers->empty()); + DCHECK(source_paths.empty()); + // has none remain and can't hit any column, we read any one + // why not return null directly, segemnt iterater need ordinal index... + // it's canbe optimized + size_t index = start; + LogicalType type = (*_sub_readers)[start]->column_type(); + for (size_t i = start + 1; i < end; i++) { + const auto& rd = (*_sub_readers)[i]; + if (type < rd->column_type()) { + index = i; + type = rd->column_type(); + } + } + const auto& rd = (*_sub_readers)[index]; + ASSIGN_OR_RETURN(auto iter, rd->new_iterator()); + all_iters.emplace_back(std::move(iter)); + source_paths.emplace_back(rd->name()); + source_types.emplace_back(rd->column_type()); + } + + return create_json_flat_iterator(this, std::move(null_iter), std::move(all_iters), target_paths, target_types, + source_paths, source_types); +} + size_t ColumnReader::mem_usage() const { size_t size = sizeof(ColumnReader) + _meta_mem_usage.load(std::memory_order_relaxed); diff --git a/be/src/storage/rowset/column_reader.h b/be/src/storage/rowset/column_reader.h index 48b9a7e5c814c..194a4cc8b9b29 100644 --- a/be/src/storage/rowset/column_reader.h +++ b/be/src/storage/rowset/column_reader.h @@ -68,6 +68,7 @@ class ColumnPredicate; class Column; class ZoneMapDetail; +class BloomFilter; class BitmapIndexIterator; class BitmapIndexReader; class ColumnIterator; @@ -197,6 +198,9 @@ class ColumnReader { bool has_remain_json() const { return _has_remain; } private: + StatusOr> _new_json_iterator(ColumnAccessPath* path = nullptr, + const TabletColumn* column = nullptr); + const std::string& file_name() const { return _segment->file_name(); } template Status bloom_filter(const std::vector& predicates, SparseRange<>* row_ranges, @@ -298,6 +302,7 @@ class ColumnReader { std::string _name; bool _is_flat_json = false; bool _has_remain = false; + std::unique_ptr _remain_filter; // only used for inverted index load OnceFlag _inverted_index_load_once; diff --git a/be/src/storage/rowset/json_column_compactor.cpp b/be/src/storage/rowset/json_column_compactor.cpp index b7abe7eab4c21..feb69727c2903 100644 --- a/be/src/storage/rowset/json_column_compactor.cpp +++ b/be/src/storage/rowset/json_column_compactor.cpp @@ -47,11 +47,13 @@ Status FlatJsonColumnCompactor::_compact_columns(std::vector& json_da for (const auto& js : json_datas) { vc.emplace_back(js.get()); } + deriver.set_generate_filter(true); deriver.derived(vc); _flat_paths = deriver.flat_paths(); _flat_types = deriver.flat_types(); _has_remain = deriver.has_remain_json(); + _remain_filter = deriver.remain_fitler(); VLOG(1) << "FlatJsonColumnCompactor compact_columns, json_datas size: " << json_datas.size() << ", flat json: " << JsonFlatPath::debug_flat_json(_flat_paths, _flat_types, _has_remain); diff --git a/be/src/storage/rowset/json_column_iterator.cpp b/be/src/storage/rowset/json_column_iterator.cpp index 1490cf2a0e6c0..3192947c8e3e5 100644 --- a/be/src/storage/rowset/json_column_iterator.cpp +++ b/be/src/storage/rowset/json_column_iterator.cpp @@ -493,12 +493,7 @@ Status JsonMergeIterator::init(const ColumnIteratorOptions& opts) { _src_column_modules.emplace_back(JsonColumn::create()); } - for (auto& p : _src_paths) { - opts.stats->merge_json_hits[p] += 1; - } - if (has_remain) { - opts.stats->merge_json_hits["remain"] += 1; - } + opts.stats->merge_json_hits["MergeAllSubfield"] += 1; SCOPED_RAW_TIMER(&_opts.stats->json_init_ns); _merger = std::make_unique(_src_paths, _src_types, has_remain); diff --git a/be/src/storage/rowset/json_column_writer.cpp b/be/src/storage/rowset/json_column_writer.cpp index a7ed25184a85c..1793f2a867ddc 100644 --- a/be/src/storage/rowset/json_column_writer.cpp +++ b/be/src/storage/rowset/json_column_writer.cpp @@ -135,6 +135,10 @@ Status FlatJsonColumnWriter::_init_flat_writers() { _json_meta->mutable_json_meta()->set_has_remain(_has_remain); _json_meta->mutable_json_meta()->set_is_flat(true); + if (_remain_filter != nullptr) { + _json_meta->mutable_json_meta()->set_remain_filter(_remain_filter->data(), _remain_filter->size()); + } + // recode null column in 1st if (_json_meta->is_nullable()) { _flat_paths.insert(_flat_paths.begin(), "nulls"); diff --git a/be/src/storage/rowset/json_column_writer.h b/be/src/storage/rowset/json_column_writer.h index 4675296ac5531..d82bd825de8cc 100644 --- a/be/src/storage/rowset/json_column_writer.h +++ b/be/src/storage/rowset/json_column_writer.h @@ -17,6 +17,7 @@ #include "storage/rowset/column_writer.h" namespace starrocks { +class BloomFilter; StatusOr> create_json_column_writer(const ColumnWriterOptions& opts, TypeInfoPtr type_info, WritableFile* wfile, @@ -69,6 +70,7 @@ class FlatJsonColumnWriter : public ColumnWriter { size_t _estimate_size = 0; bool _has_remain; + std::shared_ptr _remain_filter; bool _is_flat = false; }; } // namespace starrocks diff --git a/be/src/storage/rowset/map_column_writer.cpp b/be/src/storage/rowset/map_column_writer.cpp index 5a04e09fd516b..6b61302e738cb 100644 --- a/be/src/storage/rowset/map_column_writer.cpp +++ b/be/src/storage/rowset/map_column_writer.cpp @@ -73,6 +73,8 @@ StatusOr> create_map_column_writer(const ColumnWri key_options.need_zone_map = false; key_options.need_bloom_filter = key_column.is_bf_column(); key_options.need_bitmap_index = key_column.has_bitmap_index(); + key_options.need_flat = opts.need_flat; + key_options.is_compaction = opts.is_compaction; if (key_column.type() == LogicalType::TYPE_ARRAY) { if (key_options.need_bloom_filter) { return Status::NotSupported("Do not support bloom filter for array type"); @@ -93,6 +95,8 @@ StatusOr> create_map_column_writer(const ColumnWri value_options.need_zone_map = false; value_options.need_bloom_filter = value_column.is_bf_column(); value_options.need_bitmap_index = value_column.has_bitmap_index(); + value_options.need_flat = opts.need_flat; + value_options.is_compaction = opts.is_compaction; if (value_column.type() == LogicalType::TYPE_ARRAY) { if (value_options.need_bloom_filter) { return Status::NotSupported("Do not support bloom filter for array type"); diff --git a/be/src/storage/rowset/struct_column_writer.cpp b/be/src/storage/rowset/struct_column_writer.cpp index 9357380d21909..7bdef9e173905 100644 --- a/be/src/storage/rowset/struct_column_writer.cpp +++ b/be/src/storage/rowset/struct_column_writer.cpp @@ -71,6 +71,8 @@ StatusOr> create_struct_column_writer(const Column value_options.need_zone_map = false; value_options.need_bloom_filter = field_column.is_bf_column(); value_options.need_bitmap_index = field_column.has_bitmap_index(); + value_options.need_flat = opts.need_flat; + value_options.is_compaction = opts.is_compaction; ASSIGN_OR_RETURN(auto field_writer, ColumnWriter::create(value_options, &field_column, wfile)); field_writers.emplace_back(std::move(field_writer)); } diff --git a/be/src/types/logical_type.h b/be/src/types/logical_type.h index edc5c6e64d4ab..73c31c2973dad 100644 --- a/be/src/types/logical_type.h +++ b/be/src/types/logical_type.h @@ -162,6 +162,18 @@ inline bool is_scalar_field_type(LogicalType type) { } } +inline bool is_semi_type(LogicalType type) { + switch (type) { + case TYPE_STRUCT: + case TYPE_ARRAY: + case TYPE_MAP: + case TYPE_JSON: + return true; + default: + return false; + } +} + inline bool is_complex_metric_type(LogicalType type) { switch (type) { case TYPE_OBJECT: @@ -258,8 +270,6 @@ constexpr bool support_column_expr_predicate(LogicalType ltype) { case TYPE_DECIMAL32: /* 24 */ case TYPE_DECIMAL64: /* 25 */ case TYPE_DECIMAL128: /* 26 */ - case TYPE_JSON: - case TYPE_MAP: case TYPE_STRUCT: return true; default: diff --git a/be/src/util/json_flattener.cpp b/be/src/util/json_flattener.cpp index f866af518da49..4d8ba0317841a 100644 --- a/be/src/util/json_flattener.cpp +++ b/be/src/util/json_flattener.cpp @@ -17,6 +17,8 @@ #include "util/json_flattener.h" +#include + #include #include #include @@ -35,6 +37,7 @@ #include "column/type_traits.h" #include "column/vectorized_fwd.h" #include "common/compiler_util.h" +#include "common/config.h" #include "common/status.h" #include "common/statusor.h" #include "exprs/cast_expr.h" @@ -42,6 +45,7 @@ #include "exprs/expr_context.h" #include "gutil/casts.h" #include "runtime/types.h" +#include "storage/rowset/bloom_filter.h" #include "storage/rowset/column_reader.h" #include "types/logical_type.h" #include "util/json.h" @@ -199,12 +203,25 @@ inline uint8_t get_compatibility_type(vpack::ValueType type1, uint8_t type2) { } // namespace flat_json -std::pair JsonFlatPath::_split_path(const std::string_view& path) { - size_t pos = 0; - if (path.starts_with("\"")) { - pos = path.find('\"', 1); - DCHECK(pos != std::string::npos); +static const double FILTER_TEST_FPP[]{0.05, 0.1, 0.15, 0.2, 0.25, 0.3}; + +double estimate_filter_fpp(uint64_t element_nums) { + uint32_t bytes[6]; + int min_idx = 7; + double min = config::json_flat_remain_filter_max_bytes + 1; + for (int i = 0; i < 6; i++) { + bytes[i] = BloomFilter::estimate_bytes(element_nums, FILTER_TEST_FPP[i]); + double d = bytes[i] + FILTER_TEST_FPP[i]; + if (d < min) { + min = d; + min_idx = i; + } } + return min_idx < 6 ? FILTER_TEST_FPP[min_idx] : -1; +} + +std::pair JsonFlatPath::split_path(const std::string_view& path) { + size_t pos = 0; pos = path.find('.', pos); std::string_view key; std::string_view next; @@ -222,7 +239,7 @@ JsonFlatPath* JsonFlatPath::normalize_from_path(const std::string_view& path, Js if (path.empty()) { return root; } - auto [key, next] = _split_path(path); + auto [key, next] = split_path(path); auto iter = root->children.find(key); JsonFlatPath* child_path = nullptr; @@ -251,7 +268,7 @@ void JsonFlatPath::set_root(const std::string_view& new_root_path, JsonFlatPath* node->op = OP_ROOT; return; } - auto [key, next] = _split_path(new_root_path); + auto [key, next] = split_path(new_root_path); auto iter = node->children.begin(); for (; iter != node->children.end(); iter++) { @@ -262,7 +279,7 @@ void JsonFlatPath::set_root(const std::string_view& new_root_path, JsonFlatPath* } } -bool check_null_factor(const std::vector& json_datas) { +StatusOr check_null_factor(const std::vector& json_datas) { size_t total_rows = 0; size_t null_count = 0; @@ -281,10 +298,10 @@ bool check_null_factor(const std::vector& json_datas) { if (null_count > total_rows * config::json_flat_null_factor) { VLOG(8) << "flat json, null_count[" << null_count << "], row[" << total_rows << "], null_factor: " << config::json_flat_null_factor; - return false; + return Status::InternalError("json flat null factor too high"); } - return true; + return total_rows - null_count; } JsonPathDeriver::JsonPathDeriver(const std::vector& paths, const std::vector& types, @@ -307,24 +324,48 @@ void JsonPathDeriver::derived(const std::vector& json_datas) { return; } - if (!check_null_factor(json_datas)) { + auto res = check_null_factor(json_datas); + if (!res.ok()) { return; } + _total_rows = res.value(); _path_root = std::make_shared(); - _total_rows = 0; // init path by flat json _derived_on_flat_json(json_datas); // extract common keys, type + size_t mark_row = 0; for (size_t k = 0; k < json_datas.size(); k++) { - _derived(json_datas[k], _total_rows); - _total_rows += json_datas[k]->size(); + _derived(json_datas[k], mark_row); + mark_row += json_datas[k]->size(); } _finalize(); } +JsonFlatPath* JsonPathDeriver::_normalize_exists_path(const std::string_view& path, JsonFlatPath* root, uint64_t hits) { + if (path.empty()) { + return root; + } + + _derived_maps[root].hits += hits; + _derived_maps[root].type = flat_json::JSON_BASE_TYPE_BITS; + + auto [key, next] = JsonFlatPath::split_path(path); + auto iter = root->children.find(key); + JsonFlatPath* child_path = nullptr; + + if (iter == root->children.end()) { + root->children.emplace(key, std::make_unique()); + child_path = root->children[key].get(); + } else { + child_path = iter->second.get(); + } + + return _normalize_exists_path(next, child_path, hits); +} + void JsonPathDeriver::derived(const std::vector& json_readers) { DCHECK(_paths.empty()); DCHECK(_types.empty()); @@ -349,12 +390,15 @@ void JsonPathDeriver::derived(const std::vector& json_reade _has_remain |= reader->has_remain_json(); for (size_t i = start; i < end; i++) { const auto& sub = (*reader->sub_readers())[i]; - auto leaf = JsonFlatPath::normalize_from_path(sub->name(), _path_root.get()); + // compaction only extract common leaf, extract parent node need more compute on remain, it's bad performance + auto leaf = _normalize_exists_path(sub->name(), _path_root.get(), 0); _derived_maps[leaf].type &= flat_json::LOGICAL_TYPE_TO_JSON_BITS.at(sub->column_type()); _derived_maps[leaf].hits += reader->num_rows(); } } - _json_sparsity_factory = 1; // only extract common schema + _derived_maps.erase(_path_root.get()); + + _min_json_sparsity_factory = 1; // only extract common schema _finalize(); } @@ -381,11 +425,12 @@ void JsonPathDeriver::_derived_on_flat_json(const std::vector& js const auto& types = json_col->flat_column_types(); for (size_t i = 0; i < paths.size(); i++) { - auto leaf = JsonFlatPath::normalize_from_path(paths[i], _path_root.get()); + auto leaf = _normalize_exists_path(paths[i], _path_root.get(), hits); _derived_maps[leaf].type &= flat_json::LOGICAL_TYPE_TO_JSON_BITS.at(types[i]); _derived_maps[leaf].hits += hits; } } + _derived_maps.erase(_path_root.get()); } void JsonPathDeriver::_derived(const Column* col, size_t mark_row) { @@ -441,16 +486,19 @@ void JsonPathDeriver::_visit_json_paths(const vpack::Slice& value, JsonFlatPath* root->children.emplace(k, std::make_unique()); } auto child = root->children[k].get(); + auto desc = &_derived_maps[child]; + desc->hits++; + desc->multi_times += (desc->last_row == mark_row); + desc->last_row = mark_row; + if (v.isObject()) { + child->remain = v.isEmptyObject(); + desc->type = flat_json::JSON_BASE_TYPE_BITS; _visit_json_paths(v, child, mark_row); } else { auto desc = &_derived_maps[child]; - desc->hits++; vpack::ValueType json_type = v.type(); desc->type = flat_json::get_compatibility_type(json_type, desc->type); - desc->multi_times += (desc->last_row == mark_row); - desc->last_row = mark_row; - if (json_type == vpack::ValueType::UInt) { desc->max = std::max(desc->max, v.getUIntUnchecked()); } @@ -458,6 +506,56 @@ void JsonPathDeriver::_visit_json_paths(const vpack::Slice& value, JsonFlatPath* } } +// why dfs? because need compute parent isn't extract base on bottom-up, stack is not suitable +uint32_t JsonPathDeriver::_dfs_finalize(JsonFlatPath* node, const std::string& absolute_path, + std::vector>* hit_leaf) { + uint32_t flat_count = 0; + for (auto& [key, child] : node->children) { + if (!key.empty() && key.find('.') == std::string::npos) { + // ignore empty key/quote key, it's can't handle in SQL + // why not support `.` in key? + // FE will add `"`. e.g: `a.b` -> `"a.b"`, in binary `\"a.b\"` + // but BE is hard to handle `"`, because vpackjson don't add escape for `"` and `\` + // input string `a\"b` -> in binary `a\\\"b` -> vpack json binary `a\"b` + // it's take us can't identify `"` and `\` corrently + std::string abs_path = fmt::format("{}.{}", absolute_path, key); + flat_count += _dfs_finalize(child.get(), abs_path, hit_leaf); + } else { + child->remain = true; + } + } + if (flat_count == 0 && !absolute_path.empty()) { + // leaf node or all children is remain + // check sparsity, same key may appear many times in json, so we need avoid duplicate compute hits + auto desc = _derived_maps[node]; + if (desc.multi_times <= 0 && desc.hits >= _total_rows * _min_json_sparsity_factory) { + hit_leaf->emplace_back(node, absolute_path); + node->type = flat_json::JSON_BITS_TO_LOGICAL_TYPE.at(desc.type); + node->remain = false; + return 1; + } else { + node->remain = true; + return 0; + } + } else { + node->remain |= (flat_count != node->children.size()); + return 1; + } +} + +void dfs_add_remain_keys(JsonFlatPath* node, std::set* remain_keys) { + auto iter = node->children.begin(); + while (iter != node->children.end()) { + auto child = iter->second.get(); + dfs_add_remain_keys(child, remain_keys); + if (child->remain) { + node->remain |= true; + remain_keys->insert(iter->first); + } + iter++; + } +} + void JsonPathDeriver::_finalize() { // try downgrade json-uint to bigint int128_t max = RunTimeTypeLimits::max_value(); @@ -467,50 +565,21 @@ void JsonPathDeriver::_finalize() { } } - std::vector update_stack; - std::vector> stack; std::vector> hit_leaf; - - stack.emplace_back(_path_root.get(), ""); - while (!stack.empty()) { - auto [node, path] = stack.back(); - stack.pop_back(); - - if (node->children.empty()) { - // leaf node - // check sparsity, same key may appear many times in json, so we need avoid duplicate compute hits - auto desc = _derived_maps[node]; - if (desc.multi_times <= 0 && desc.hits >= _total_rows * _json_sparsity_factory) { - hit_leaf.emplace_back(node, path); - node->type = flat_json::JSON_BITS_TO_LOGICAL_TYPE.at(desc.type); - node->remain = false; // later update - } else { - node->remain = true; - } - _has_remain |= (desc.multi_times > 0 || desc.hits != _total_rows); - VLOG(8) << "flat json[" << path << "], hit[" << desc.hits << "], row[" << _total_rows << "]"; - } else { - update_stack.push_back(node); - for (auto& [key, child] : node->children) { - if (key.size() > 0) { - // ignore empty key, it's invalid path in SQL - stack.emplace_back(child.get(), path + "." + std::string(key)); - } else { - node->remain = true; - _has_remain |= true; - } - } - } - } + _dfs_finalize(_path_root.get(), "", &hit_leaf); // sort by name, just for stable order - size_t limit = config::json_flat_column_max > 0 ? config::json_flat_column_max : std::numeric_limits::max(); std::sort(hit_leaf.begin(), hit_leaf.end(), [&](const auto& a, const auto& b) { auto desc_a = _derived_maps[a.first]; auto desc_b = _derived_maps[b.first]; return desc_a.hits > desc_b.hits; }); + size_t limit = config::json_flat_column_max > 0 ? config::json_flat_column_max : std::numeric_limits::max(); for (size_t i = limit; i < hit_leaf.size(); i++) { + if (!hit_leaf[i].first->remain && _derived_maps[hit_leaf[i].first].hits >= _total_rows) { + limit++; + continue; + } hit_leaf[i].first->remain = true; } if (hit_leaf.size() > limit) { @@ -524,20 +593,23 @@ void JsonPathDeriver::_finalize() { _types.emplace_back(node->type); } - // remove & update remain json - while (!update_stack.empty()) { - auto* node = update_stack.back(); - update_stack.pop_back(); - - auto iter = node->children.begin(); - while (iter != node->children.end()) { - node->remain |= iter->second->remain; - if (iter->second->remain && iter->second->children.empty()) { - iter = node->children.erase(iter); - } else { - ++iter; - } + std::set remain_keys; + dfs_add_remain_keys(_path_root.get(), &remain_keys); + _has_remain |= _path_root->remain; + if (_has_remain && _generate_filter) { + double fpp = estimate_filter_fpp(remain_keys.size()); + if (fpp < 0) { + return; + } + std::unique_ptr bf; + Status st = BloomFilter::create(BLOCK_BLOOM_FILTER, &bf); + DCHECK(st.ok()); + st = bf->init(remain_keys.size(), fpp, HASH_MURMUR3_X64_64); + DCHECK(st.ok()); + for (const auto& key : remain_keys) { + bf->add_bytes(key.data(), key.size()); } + _remain_filter = std::move(bf); } } @@ -580,7 +652,6 @@ JsonFlattener::JsonFlattener(const std::vector& paths, const std::v _remain = down_cast(_flat_columns.back().get()); } } - void JsonFlattener::flatten(const Column* json_column) { for (auto& col : _flat_columns) { DCHECK_EQ(col->size(), 0); @@ -760,6 +831,38 @@ void JsonMerger::set_exclude_paths(const std::vector& exclude_paths } } +// add all level paths, e.g: a.b.c, level path: a.b, leaf path: c +void JsonMerger::_add_level_paths_impl(const std::string_view& path, JsonFlatPath* root) { + if (path.empty()) { + return; + } + + auto [key, next] = JsonFlatPath::split_path(path); + if (next.empty()) { + // don't add leaf node + return; + } + + auto iter = root->children.find(key); + JsonFlatPath* child_path = nullptr; + + if (iter == root->children.end()) { + root->children.emplace(key, std::make_unique()); + child_path = root->children[key].get(); + child_path->op = JsonFlatPath::OP_NEW_LEVEL; + } else { + child_path = iter->second.get(); + } + _add_level_paths_impl(next, child_path); +} + +void JsonMerger::add_level_paths(const std::vector& level_paths) { + this->_level_paths = level_paths; + for (auto& path : _level_paths) { + _add_level_paths_impl(path, _src_root.get()); + } +} + void JsonMerger::set_root_path(const std::string& base_path) { JsonFlatPath::set_root(base_path, _src_root.get()); } @@ -828,9 +931,9 @@ void JsonMerger::_merge_impl(size_t rows) { template void JsonMerger::_merge_json_with_remain(const JsonFlatPath* root, const vpack::Slice* remain, vpack::Builder* builder, size_t index) { -#ifndef NDEBUG - std::string json = remain->toJson(); -#endif + // #ifndef NDEBUG + // std::string json = remain->toJson(); + // #endif vpack::ObjectIterator it(*remain, false); for (; it.valid(); it.next()) { auto k = it.key().stringView(); @@ -892,7 +995,7 @@ void JsonMerger::_merge_json(const JsonFlatPath* root, vpack::Builder* builder, continue; } - if (child->children.empty()) { + if (child->children.empty() && child->op != JsonFlatPath::OP_NEW_LEVEL) { DCHECK(child->op == JsonFlatPath::OP_INCLUDE || child->op == JsonFlatPath::OP_ROOT); auto col = _src_columns[child->index]; if (!col->is_null(index)) { @@ -1148,6 +1251,7 @@ void HyperJsonTransformer::init_compaction_task(const std::vector& t.emplace_back(_src_types[index]); } mk.merger = std::make_unique(p, t, has_remain); + mk.merger->add_level_paths(_dst_paths); mk.merger->set_exclude_paths(all_flat_paths); if (has_remain) { _merge_tasks[0].src_index.emplace_back(_src_paths.size()); diff --git a/be/src/util/json_flattener.h b/be/src/util/json_flattener.h index c0a419633ddf1..9584a448b6daf 100644 --- a/be/src/util/json_flattener.h +++ b/be/src/util/json_flattener.h @@ -34,6 +34,7 @@ #include "common/status.h" #include "common/statusor.h" #include "exprs/expr.h" +#include "storage/rowset/block_split_bloom_filter.h" #include "storage/rowset/column_reader.h" #include "types/logical_type.h" #include "util/phmap/phmap.h" @@ -42,6 +43,7 @@ namespace starrocks { namespace vpack = arangodb::velocypack; class ColumnReader; +class BloomFilter; #ifndef NDEBUG template @@ -55,9 +57,11 @@ class JsonFlatPath { public: using OP = uint8_t; static const OP OP_INCLUDE = 0; - static const OP OP_EXCLUDE = 1; // for compaction remove extract json - static const OP OP_IGNORE = 2; // for merge and read middle json - static const OP OP_ROOT = 3; // to mark new root + static const OP OP_EXCLUDE = 1; // for compaction remove extract json + static const OP OP_IGNORE = 2; // for merge and read middle json + static const OP OP_ROOT = 3; // to mark new root + static const OP OP_NEW_LEVEL = 4; // for merge flat json use, to mark the path is need + // for express flat path int index = -1; // flat paths array index, only use for leaf, to find column LogicalType type = LogicalType::TYPE_JSON; @@ -94,8 +98,7 @@ class JsonFlatPath { return ss.str(); } -private: - static std::pair _split_path(const std::string_view& path); + static std::pair split_path(const std::string_view& path); }; // to deriver json flanttern path @@ -113,6 +116,10 @@ class JsonPathDeriver { bool has_remain_json() const { return _has_remain; } + void set_generate_filter(bool generate_filter) { _generate_filter = generate_filter; } + + std::shared_ptr& remain_fitler() { return _remain_filter; } + std::shared_ptr& flat_path_root() { return _path_root; } const std::vector& flat_paths() const { return _paths; } @@ -122,7 +129,11 @@ class JsonPathDeriver { private: void _derived(const Column* json_data, size_t mark_row); + JsonFlatPath* _normalize_exists_path(const std::string_view& path, JsonFlatPath* root, uint64_t hits); + void _finalize(); + uint32_t _dfs_finalize(JsonFlatPath* node, const std::string& absolute_path, + std::vector>* hit_leaf); void _derived_on_flat_json(const std::vector& json_datas); @@ -140,7 +151,7 @@ class JsonPathDeriver { uint64_t max = 0; // same key may appear many times in json, so we need avoid duplicate compute hits - uint64_t last_row = -1; + int64_t last_row = -1; uint64_t multi_times = 0; }; @@ -148,10 +159,13 @@ class JsonPathDeriver { std::vector _paths; std::vector _types; - double _json_sparsity_factory = config::json_flat_sparsity_factor; + double _min_json_sparsity_factory = config::json_flat_sparsity_factor; size_t _total_rows; FlatJsonHashMap _derived_maps; std::shared_ptr _path_root; + + bool _generate_filter = false; + std::shared_ptr _remain_filter = nullptr; }; // flattern JsonColumn to flat json A,B,C @@ -201,6 +215,8 @@ class JsonMerger { // for compaction, set exclude paths, to remove the path void set_exclude_paths(const std::vector& exclude_paths); + // for compaction, set level paths, to generate the level in json + void add_level_paths(const std::vector& level_paths); bool has_exclude_paths() const { return !_exclude_paths.empty(); } @@ -217,6 +233,8 @@ class JsonMerger { void _merge_json(const JsonFlatPath* root, vpack::Builder* builder, size_t index); + void _add_level_paths_impl(const std::string_view& path, JsonFlatPath* root); + private: std::vector _src_paths; bool _has_remain = false; @@ -224,6 +242,7 @@ class JsonMerger { std::shared_ptr _src_root; std::vector _src_columns; std::vector _exclude_paths; + std::vector _level_paths; bool _output_nullable = false; ColumnPtr _result; diff --git a/be/test/exprs/flat_json_functions_test.cpp b/be/test/exprs/flat_json_functions_test.cpp index d7d31599094c8..37bd59ee79d68 100644 --- a/be/test/exprs/flat_json_functions_test.cpp +++ b/be/test/exprs/flat_json_functions_test.cpp @@ -615,9 +615,9 @@ INSTANTIATE_TEST_SUITE_P(FlatJsonPathDeriver, FlatJsonDeriverPaths, ::testing::Values( std::make_tuple(R"({ "k1": 1, "k2": 2 })", R"({ "k1": 3, "k2": 4 })", std::vector {"k1", "k2"}, std::vector {TYPE_BIGINT, TYPE_BIGINT}), std::make_tuple(R"({ "k1": "v1" })", R"({ "k1": "v33" })", std::vector {"k1"}, std::vector {TYPE_VARCHAR}), - std::make_tuple(R"({ "k1": {"k2": 1} })", R"({ "k1": 123 })", std::vector {}, std::vector {}), + std::make_tuple(R"({ "k1": {"k2": 1} })", R"({ "k1": 123 })", std::vector {"k1"}, std::vector {TYPE_JSON}), std::make_tuple(R"({ "k1": "v1" })", R"({ "k1": 1.123 })", std::vector {"k1"}, std::vector {TYPE_JSON}), - std::make_tuple(R"({ "k1": {"k2": 1} })", R"({ "k1": 1.123 })", std::vector {}, std::vector {}), + std::make_tuple(R"({ "k1": {"k2": 1} })", R"({ "k1": 1.123 })", std::vector {"k1"}, std::vector {TYPE_JSON}), std::make_tuple(R"({ "k1": [1,2,3] })", R"({ "k1": "v33" })", std::vector {"k1"}, std::vector {TYPE_JSON}), std::make_tuple(R"({ "k1": "v1", "k2": [3,4,5], "k3": 1, "k4": 1.2344 })", diff --git a/be/test/storage/rowset/flat_json_column_compact_test.cpp b/be/test/storage/rowset/flat_json_column_compact_test.cpp index bbf20b6c959ec..39e7bccf75dcc 100644 --- a/be/test/storage/rowset/flat_json_column_compact_test.cpp +++ b/be/test/storage/rowset/flat_json_column_compact_test.cpp @@ -650,6 +650,7 @@ TEST_F(FlatJsonColumnCompactTest, testJsonCompactToFlatJson) { } TEST_F(FlatJsonColumnCompactTest, testNullJsonCompactToFlatJson) { + config::json_flat_null_factor = 0.1; // clang-format off Columns jsons = { normal_json(R"({"a": 1, "b": 21})", true), @@ -785,10 +786,11 @@ TEST_F(FlatJsonColumnCompactTest, testFlatJsonCompactToFlatJson4) { test_json(writer_opts, jsons, read_col); EXPECT_TRUE(_meta->json_meta().is_flat()); EXPECT_TRUE(_meta->json_meta().has_remain()); - EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ(6, _meta->children_columns_size()); EXPECT_EQ("b1.b2", _meta->children_columns(2).name()); EXPECT_EQ("b1.b3.b4", _meta->children_columns(3).name()); - EXPECT_EQ("remain", _meta->children_columns(4).name()); + EXPECT_EQ("b1.b3.b5", _meta->children_columns(4).name()); + EXPECT_EQ("remain", _meta->children_columns(5).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -822,10 +824,11 @@ TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJson) { test_json(writer_opts, jsons, read_col); EXPECT_TRUE(_meta->json_meta().is_flat()); EXPECT_TRUE(_meta->json_meta().has_remain()); - EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ(6, _meta->children_columns_size()); EXPECT_EQ("b1.b2", _meta->children_columns(2).name()); EXPECT_EQ("b1.b3.b4", _meta->children_columns(3).name()); - EXPECT_EQ("remain", _meta->children_columns(4).name()); + EXPECT_EQ("b1.b3.b5", _meta->children_columns(4).name()); + EXPECT_EQ("remain", _meta->children_columns(5).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -858,8 +861,8 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson) { writer_opts.need_flat = true; test_json(writer_opts, jsons, read_col); EXPECT_TRUE(_meta->json_meta().is_flat()); - EXPECT_TRUE(_meta->json_meta().has_remain()); - EXPECT_EQ(4, _meta->children_columns_size()); + EXPECT_FALSE(_meta->json_meta().has_remain()); + EXPECT_EQ(3, _meta->children_columns_size()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -956,11 +959,12 @@ TEST_F(FlatJsonColumnCompactTest, testNullFlatJsonCompactToFlatJson4) { test_json(writer_opts, jsons, read_col); EXPECT_TRUE(_meta->json_meta().is_flat()); EXPECT_TRUE(_meta->json_meta().has_remain()); - EXPECT_EQ(6, _meta->children_columns_size()); + EXPECT_EQ(7, _meta->children_columns_size()); EXPECT_EQ("nulls", _meta->children_columns(0).name()); EXPECT_EQ("b1.b2", _meta->children_columns(3).name()); EXPECT_EQ("b1.b3.b4", _meta->children_columns(4).name()); - EXPECT_EQ("remain", _meta->children_columns(5).name()); + EXPECT_EQ("b1.b3.b5", _meta->children_columns(5).name()); + EXPECT_EQ("remain", _meta->children_columns(6).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -993,11 +997,12 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToFlatJson) { test_json(writer_opts, jsons, read_col); EXPECT_TRUE(_meta->json_meta().is_flat()); EXPECT_TRUE(_meta->json_meta().has_remain()); - EXPECT_EQ(6, _meta->children_columns_size()); + EXPECT_EQ(7, _meta->children_columns_size()); EXPECT_EQ("nulls", _meta->children_columns(0).name()); EXPECT_EQ("b1.b2", _meta->children_columns(3).name()); EXPECT_EQ("b1.b3.b4", _meta->children_columns(4).name()); - EXPECT_EQ("remain", _meta->children_columns(5).name()); + EXPECT_EQ("b1.b3.b5", _meta->children_columns(5).name()); + EXPECT_EQ("remain", _meta->children_columns(6).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -1029,11 +1034,12 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToFlatJson2) { test_json(writer_opts, jsons, read_col); EXPECT_TRUE(_meta->json_meta().is_flat()); EXPECT_TRUE(_meta->json_meta().has_remain()); - EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ(6, _meta->children_columns_size()); EXPECT_EQ("nulls", _meta->children_columns(0).name()); EXPECT_EQ("b1.b2", _meta->children_columns(2).name()); EXPECT_EQ("b1.b3.b4", _meta->children_columns(3).name()); - EXPECT_EQ("remain", _meta->children_columns(4).name()); + EXPECT_EQ("b1.b3.b5", _meta->children_columns(4).name()); + EXPECT_EQ("remain", _meta->children_columns(5).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -1063,10 +1069,11 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToFlatJson3) { test_json(writer_opts, jsons, read_col); EXPECT_TRUE(_meta->json_meta().is_flat()); EXPECT_TRUE(_meta->json_meta().has_remain()); - EXPECT_EQ(5, _meta->children_columns_size()); + EXPECT_EQ(6, _meta->children_columns_size()); EXPECT_EQ("b1.b2", _meta->children_columns(2).name()); EXPECT_EQ("b1.b3.b4", _meta->children_columns(3).name()); - EXPECT_EQ("remain", _meta->children_columns(4).name()); + EXPECT_EQ("b1.b3.b5", _meta->children_columns(4).name()); + EXPECT_EQ("remain", _meta->children_columns(5).name()); auto* read_json = get_json_column(read_col); EXPECT_FALSE(read_json->is_flat_json()); @@ -1089,24 +1096,12 @@ TEST_F(FlatJsonColumnCompactTest, testNullHyperJsonCompactToFlatJson4) { normal_json(R"({"a": 5, "b": 25, "b1": {"b2": 5, "b3": {"b4": "ab5", "b5": false}}, "e": 35})", true), }; // clang-format on + JsonPathDeriver deriver; + test_compact_path(jsons, &deriver); - ColumnPtr read_col = jsons[0]->clone_empty(); - ColumnWriterOptions writer_opts; - writer_opts.need_flat = true; - test_json(writer_opts, jsons, read_col); - EXPECT_TRUE(_meta->json_meta().is_flat()); - EXPECT_TRUE(_meta->json_meta().has_remain()); - EXPECT_EQ(4, _meta->children_columns_size()); - - auto* read_json = get_json_column(read_col); - EXPECT_FALSE(read_json->is_flat_json()); - EXPECT_EQ(5, read_json->size()); - EXPECT_EQ(0, read_json->get_flat_fields().size()); - EXPECT_EQ(R"({"a": 1, "b1": {"b2": 1, "b3": {"b4": "ab1", "b5": [1, 2, 3]}}, "g": {}})", read_col->debug_item(0)); - EXPECT_EQ(R"({"a": 2, "b1": {"b2": 2, "b3": {"b4": 123, "b5": {}}}, "k": "abc"})", read_col->debug_item(1)); - for (size_t i = 2; i < jsons.size(); i++) { - EXPECT_EQ(jsons[i]->debug_item(0), read_col->debug_item(i)); - } + EXPECT_EQ(2, deriver.flat_paths().size()); + EXPECT_EQ(R"([a(BIGINT), b1.b2(BIGINT)])", + JsonFlatPath::debug_flat_json(deriver.flat_paths(), deriver.flat_types(), deriver.has_remain_json())); } TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemain) { @@ -1196,25 +1191,12 @@ TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemain3) { }, true), }; // clang-format on + JsonPathDeriver deriver; + test_compact_path(jsons, &deriver); - ColumnPtr read_col = jsons[0]->clone_empty(); - ColumnWriterOptions writer_opts; - writer_opts.need_flat = true; - test_json(writer_opts, jsons, read_col); - - EXPECT_TRUE(_meta->json_meta().is_flat()); - EXPECT_TRUE(_meta->json_meta().has_remain()); - EXPECT_EQ(4, _meta->children_columns_size()); - EXPECT_EQ("nulls", _meta->children_columns(0).name()); - EXPECT_EQ("a", _meta->children_columns(1).name()); - EXPECT_EQ("remain", _meta->children_columns(3).name()); - - auto* read_json = get_json_column(read_col); - EXPECT_FALSE(read_json->is_flat_json()); - EXPECT_EQ(21, read_json->size()); - EXPECT_EQ(R"({"a": 12, "b": "abc2", "c": {"d": 22, "e": 221}})", read_col->debug_item(1)); - EXPECT_EQ(R"({"a": 29, "b": "efg9", "c": "c29"})", read_col->debug_item(11)); - EXPECT_EQ(R"({"a": 33, "b": "xwy3", "c": "d33"})", read_col->debug_item(18)); + EXPECT_EQ(2, deriver.flat_paths().size()); + EXPECT_EQ(R"([a(BIGINT), b(VARCHAR)])", + JsonFlatPath::debug_flat_json(deriver.flat_paths(), deriver.flat_types(), deriver.has_remain_json())); } TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemainRead) { @@ -1272,7 +1254,7 @@ TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemainRead) { EXPECT_EQ(R"({c: NULL})", read_col->debug_item(18)); } -TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemainRead3) { +TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemainReadPaths) { config::json_flat_sparsity_factor = 0.6; // clang-format off Columns jsons = { @@ -1305,26 +1287,11 @@ TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactToFlatJsonRemainRead3) { }, true), }; // clang-format on - - ASSIGN_OR_ABORT(auto root, ColumnAccessPath::create(TAccessPathType::FIELD, "root", 0)); - ColumnAccessPath::insert_json_path(root.get(), LogicalType::TYPE_VARCHAR, "c"); - - ColumnPtr read_col = jsons[0]->clone_empty(); - ColumnWriterOptions writer_opts; - writer_opts.need_flat = true; - test_json(writer_opts, jsons, read_col, root.get()); - - EXPECT_EQ(4, _meta->children_columns_size()); - EXPECT_EQ("nulls", _meta->children_columns(0).name()); - EXPECT_EQ("a", _meta->children_columns(1).name()); - EXPECT_EQ("remain", _meta->children_columns(3).name()); - - auto* read_json = get_json_column(read_col); - EXPECT_TRUE(read_json->is_flat_json()); - EXPECT_EQ(21, read_json->size()); - EXPECT_EQ(1, read_json->get_flat_fields().size()); - EXPECT_EQ(R"({c: 'c29'})", read_col->debug_item(11)); - EXPECT_EQ(R"({c: 'd33'})", read_col->debug_item(18)); + JsonPathDeriver deriver; + test_compact_path(jsons, &deriver); + EXPECT_EQ(2, deriver.flat_paths().size()); + EXPECT_EQ(R"([a(BIGINT), b(VARCHAR)])", + JsonFlatPath::debug_flat_json(deriver.flat_paths(), deriver.flat_types(), deriver.has_remain_json())); } TEST_F(FlatJsonColumnCompactTest, testHyperJsonCompactPaths) { diff --git a/be/test/storage/rowset/flat_json_column_rw_test.cpp b/be/test/storage/rowset/flat_json_column_rw_test.cpp index b3038bf25c17b..939b4830f90d0 100644 --- a/be/test/storage/rowset/flat_json_column_rw_test.cpp +++ b/be/test/storage/rowset/flat_json_column_rw_test.cpp @@ -54,7 +54,10 @@ class FlatJsonColumnRWTest : public testing::Test { protected: void SetUp() override { _meta.reset(new ColumnMetaPB()); } - void TearDown() override {} + void TearDown() override { + config::json_flat_sparsity_factor = 0.9; + config::json_flat_null_factor = 0.3; + } std::shared_ptr create_dummy_segment(const std::shared_ptr& fs, const std::string& fname) { return std::make_shared(fs, FileInfo{fname}, 1, _dummy_segment_schema, nullptr); @@ -97,7 +100,6 @@ class FlatJsonColumnRWTest : public testing::Test { // close the file ASSERT_TRUE(wfile->close().ok()); } - LOG(INFO) << "Finish writing"; auto res = ColumnReader::create(_meta.get(), segment.get(), nullptr); ASSERT_TRUE(res.ok()); @@ -451,13 +453,14 @@ TEST_F(FlatJsonColumnRWTest, testMergeRemainFlatJson2) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, nullptr); - EXPECT_EQ(4, writer_opts.meta->children_columns_size()); + EXPECT_EQ(5, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); EXPECT_EQ("a", writer_opts.meta->children_columns(0).name()); EXPECT_EQ("b.b1", writer_opts.meta->children_columns(1).name()); EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(2).name()); - EXPECT_EQ("remain", writer_opts.meta->children_columns(3).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(3).name()); + EXPECT_EQ("remain", writer_opts.meta->children_columns(4).name()); auto* read_json = down_cast(read_col.get()); EXPECT_FALSE(read_json->is_flat_json()); @@ -534,13 +537,14 @@ TEST_F(FlatJsonColumnRWTest, testMergeMiddleRemainFlatJson2) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, nullptr); - EXPECT_EQ(4, writer_opts.meta->children_columns_size()); + EXPECT_EQ(5, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); EXPECT_EQ("a", writer_opts.meta->children_columns(0).name()); EXPECT_EQ("b.b1", writer_opts.meta->children_columns(1).name()); EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(2).name()); - EXPECT_EQ("remain", writer_opts.meta->children_columns(3).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(3).name()); + EXPECT_EQ("remain", writer_opts.meta->children_columns(4).name()); auto* read_json = down_cast(read_col.get()); EXPECT_FALSE(read_json->is_flat_json()); @@ -578,14 +582,15 @@ TEST_F(FlatJsonColumnRWTest, testMergeMiddleRemainFlatJson3) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, root_path.get()); - EXPECT_EQ(5, writer_opts.meta->children_columns_size()); + EXPECT_EQ(6, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); EXPECT_EQ("a", writer_opts.meta->children_columns(0).name()); EXPECT_EQ("b.b1", writer_opts.meta->children_columns(1).name()); EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(2).name()); EXPECT_EQ("b.b2.c1.c2", writer_opts.meta->children_columns(3).name()); - EXPECT_EQ("remain", writer_opts.meta->children_columns(4).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(4).name()); + EXPECT_EQ("remain", writer_opts.meta->children_columns(5).name()); auto* read_json = down_cast(read_col.get()); EXPECT_TRUE(read_json->is_flat_json()); @@ -621,13 +626,14 @@ TEST_F(FlatJsonColumnRWTest, testDeepFlatJson) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, root.get()); - EXPECT_EQ(4, writer_opts.meta->children_columns_size()); + EXPECT_EQ(5, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); EXPECT_EQ("a", writer_opts.meta->children_columns(0).name()); EXPECT_EQ("b.b1", writer_opts.meta->children_columns(1).name()); EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(2).name()); - EXPECT_EQ("remain", writer_opts.meta->children_columns(3).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(3).name()); + EXPECT_EQ("remain", writer_opts.meta->children_columns(4).name()); auto* read_json = down_cast(read_col.get()); EXPECT_TRUE(read_json->is_flat_json()); @@ -669,16 +675,18 @@ TEST_F(FlatJsonColumnRWTest, testHyperFlatJson) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, root.get()); - EXPECT_EQ(7, writer_opts.meta->children_columns_size()); + int index = 0; + EXPECT_EQ(8, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); - EXPECT_EQ("a", writer_opts.meta->children_columns(0).name()); - EXPECT_EQ("b.b1", writer_opts.meta->children_columns(1).name()); - EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(2).name()); - EXPECT_EQ("b.b2.c1.c2", writer_opts.meta->children_columns(3).name()); - EXPECT_EQ("ff.f1", writer_opts.meta->children_columns(4).name()); - EXPECT_EQ("gg", writer_opts.meta->children_columns(5).name()); - EXPECT_EQ("remain", writer_opts.meta->children_columns(6).name()); + EXPECT_EQ("a", writer_opts.meta->children_columns(index++).name()); + EXPECT_EQ("b.b1", writer_opts.meta->children_columns(index++).name()); + EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(index++).name()); + EXPECT_EQ("b.b2.c1.c2", writer_opts.meta->children_columns(index++).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(index++).name()); + EXPECT_EQ("ff.f1", writer_opts.meta->children_columns(index++).name()); + EXPECT_EQ("gg", writer_opts.meta->children_columns(index++).name()); + EXPECT_EQ("remain", writer_opts.meta->children_columns(index++).name()); auto* read_json = down_cast(read_col.get()); EXPECT_TRUE(read_json->is_flat_json()); @@ -1104,14 +1112,15 @@ TEST_F(FlatJsonColumnRWTest, testMergeRemainNullFlatJson2) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, nullptr); - EXPECT_EQ(5, writer_opts.meta->children_columns_size()); + EXPECT_EQ(6, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); EXPECT_EQ("nulls", writer_opts.meta->children_columns(0).name()); EXPECT_EQ("a", writer_opts.meta->children_columns(1).name()); EXPECT_EQ("b.b1", writer_opts.meta->children_columns(2).name()); EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(3).name()); - EXPECT_EQ("remain", writer_opts.meta->children_columns(4).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(4).name()); + EXPECT_EQ("remain", writer_opts.meta->children_columns(5).name()); auto* read_json = down_cast(down_cast(read_col.get())->data_column().get()); EXPECT_FALSE(read_json->is_flat_json()); @@ -1181,14 +1190,15 @@ TEST_F(FlatJsonColumnRWTest, testMergeMiddleRemainNullFlatJson2) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, nullptr); - EXPECT_EQ(5, writer_opts.meta->children_columns_size()); + EXPECT_EQ(6, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); EXPECT_EQ("nulls", writer_opts.meta->children_columns(0).name()); EXPECT_EQ("a", writer_opts.meta->children_columns(1).name()); EXPECT_EQ("b.b1", writer_opts.meta->children_columns(2).name()); EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(3).name()); - EXPECT_EQ("remain", writer_opts.meta->children_columns(4).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(4).name()); + EXPECT_EQ("remain", writer_opts.meta->children_columns(5).name()); auto* read_json = down_cast(down_cast(read_col.get())->data_column().get()); EXPECT_FALSE(read_json->is_flat_json()); @@ -1221,7 +1231,7 @@ TEST_F(FlatJsonColumnRWTest, testMergeMiddleRemainNullFlatJson3) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, root_path.get()); - EXPECT_EQ(6, writer_opts.meta->children_columns_size()); + EXPECT_EQ(7, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); EXPECT_EQ("nulls", writer_opts.meta->children_columns(0).name()); @@ -1229,7 +1239,8 @@ TEST_F(FlatJsonColumnRWTest, testMergeMiddleRemainNullFlatJson3) { EXPECT_EQ("b.b1", writer_opts.meta->children_columns(2).name()); EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(3).name()); EXPECT_EQ("b.b2.c1.c2", writer_opts.meta->children_columns(4).name()); - EXPECT_EQ("remain", writer_opts.meta->children_columns(5).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(5).name()); + EXPECT_EQ("remain", writer_opts.meta->children_columns(6).name()); auto* read_json = down_cast(down_cast(read_col.get())->data_column().get()); EXPECT_TRUE(read_json->is_flat_json()); @@ -1260,14 +1271,15 @@ TEST_F(FlatJsonColumnRWTest, testDeepNullFlatJson) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, root.get()); - EXPECT_EQ(5, writer_opts.meta->children_columns_size()); + EXPECT_EQ(6, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); EXPECT_EQ("nulls", writer_opts.meta->children_columns(0).name()); EXPECT_EQ("a", writer_opts.meta->children_columns(1).name()); EXPECT_EQ("b.b1", writer_opts.meta->children_columns(2).name()); EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(3).name()); - EXPECT_EQ("remain", writer_opts.meta->children_columns(4).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(4).name()); + EXPECT_EQ("remain", writer_opts.meta->children_columns(5).name()); auto* read_json = down_cast(down_cast(read_col.get())->data_column().get()); EXPECT_TRUE(read_json->is_flat_json()); @@ -1303,7 +1315,7 @@ TEST_F(FlatJsonColumnRWTest, testHyperNullFlatJson) { writer_opts.need_flat = true; test_json(writer_opts, "/test_flat_json_rw2.data", write_col, read_col, root.get()); - EXPECT_EQ(8, writer_opts.meta->children_columns_size()); + EXPECT_EQ(9, writer_opts.meta->children_columns_size()); EXPECT_TRUE(writer_opts.meta->json_meta().is_flat()); EXPECT_TRUE(writer_opts.meta->json_meta().has_remain()); int index = 0; @@ -1312,6 +1324,7 @@ TEST_F(FlatJsonColumnRWTest, testHyperNullFlatJson) { EXPECT_EQ("b.b1", writer_opts.meta->children_columns(index++).name()); EXPECT_EQ("b.b2.b3", writer_opts.meta->children_columns(index++).name()); EXPECT_EQ("b.b2.c1.c2", writer_opts.meta->children_columns(index++).name()); + EXPECT_EQ("b.b4", writer_opts.meta->children_columns(index++).name()); EXPECT_EQ("ff.f1", writer_opts.meta->children_columns(index++).name()); EXPECT_EQ("gg", writer_opts.meta->children_columns(index++).name()); EXPECT_EQ("remain", writer_opts.meta->children_columns(index++).name()); diff --git a/be/test/util/json_flattener_test.cpp b/be/test/util/json_flattener_test.cpp index 414293e0835e6..8359a86395883 100644 --- a/be/test/util/json_flattener_test.cpp +++ b/be/test/util/json_flattener_test.cpp @@ -69,9 +69,9 @@ TEST_P(JsonPathDeriverTest, json_path_deriver_test) { std::vector path = jf.flat_paths(); std::vector type = jf.flat_types(); - ASSERT_EQ(param_has_remain, jf.has_remain_json()); - ASSERT_EQ(param_flat_path, path); - ASSERT_EQ(param_flat_type, type); + EXPECT_EQ(param_has_remain, jf.has_remain_json()); + EXPECT_EQ(param_flat_path, path); + EXPECT_EQ(param_flat_type, type); } // clang-format off @@ -83,14 +83,15 @@ INSTANTIATE_TEST_SUITE_P(JsonPathDeriverCases, JsonPathDeriverTest, std::make_tuple(R"( {"k1": 1, "k2": 2} )", R"( {"k1": 3, "k3": 4} )", true, std::vector {"k1"}, std::vector {TYPE_BIGINT}), // EMPTY - std::make_tuple(R"( {"k1": 1, "k2": {}} )", R"( {"k1": 3, "k2": {}} )", true, std::vector {"k1"}, std::vector {TYPE_BIGINT}), + std::make_tuple(R"( {"k1": 1, "k2": {}} )", R"( {"k1": 3, "k2": {}} )", false, std::vector {"k1", "k2"}, std::vector {TYPE_BIGINT, TYPE_JSON}), + std::make_tuple(R"( {"k1": 1, "k2": {}} )", R"( {"k1": 3, "k2": {"k3": 123}} )", true, std::vector {"k1", "k2"}, std::vector {TYPE_BIGINT, TYPE_JSON}), std::make_tuple(R"( {} )", R"( {"k1": 3} )", true, std::vector {}, std::vector {}), std::make_tuple(R"( {"": 123} )", R"( {"": 234} )", true, std::vector {}, std::vector {}), // DEEP std::make_tuple(R"( {"k2": {"j1": 1, "j2": 2}} )", R"( {"k2": {"j1": 3, "j2": 4}} )", false, std::vector {"k2.j1", "k2.j2"}, std::vector {TYPE_BIGINT, TYPE_BIGINT}), std::make_tuple(R"( {"k2": {"j1": 1, "j2": 2}} )", R"( {"k2": {"j1": 3, "j3": 4}} )", true, std::vector {"k2.j1"}, std::vector {TYPE_BIGINT}), - std::make_tuple(R"( {"k2": {"j1": 1, "j2": 2}} )", R"( {"k2": {"j1": 3, "j2": {"p1": "abc"}}} )", true, std::vector {"k2.j1"}, std::vector {TYPE_BIGINT}), + std::make_tuple(R"( {"k2": {"j1": 1, "j2": 2}} )", R"( {"k2": {"j1": 3, "j2": {"p1": "abc"}}} )", true, std::vector {"k2.j1", "k2.j2"}, std::vector {TYPE_BIGINT, TYPE_JSON}), std::make_tuple(R"( {"k2": {"j1": 1, "j2": {"p1": [1,2,3,4]}}} )", R"( {"k2": {"j1": 3, "j2": {"p1": "abc"}}} )", false, std::vector {"k2.j1", "k2.j2.p1"}, std::vector {TYPE_BIGINT, TYPE_JSON}) )); // clang-format on @@ -104,7 +105,10 @@ class JsonFlattenerTest : public testing::Test { protected: void SetUp() override {} - void TearDown() override {} + void TearDown() override { + config::json_flat_sparsity_factor = 0.9; + config::json_flat_null_factor = 0.3; + } std::vector test_json(const std::vector& inputs, const std::vector& paths, const std::vector& types, bool has_remain) { @@ -306,6 +310,7 @@ TEST_F(JsonFlattenerTest, testSortHitNums) { { config::json_flat_sparsity_factor = 0.3; JsonPathDeriver jf; + jf.set_generate_filter(true); jf.derived({json_input}); auto& result = jf.flat_paths(); @@ -313,11 +318,15 @@ TEST_F(JsonFlattenerTest, testSortHitNums) { "k13", "k15", "k6", "k7", "k8", "k9", "z16"}; EXPECT_EQ(true, jf.has_remain_json()); EXPECT_EQ(paths, result); + ASSERT_TRUE(nullptr != jf.remain_fitler()); + EXPECT_FALSE(jf.remain_fitler()->test_bytes("b20", 3)); + EXPECT_TRUE(jf.remain_fitler()->test_bytes("k5", 2)); } { config::json_flat_sparsity_factor = 0.4; JsonPathDeriver jf; + jf.set_generate_filter(true); jf.derived({json_input}); auto& result = jf.flat_paths(); @@ -325,30 +334,108 @@ TEST_F(JsonFlattenerTest, testSortHitNums) { "k11", "k13", "k15", "k8", "k9", "z16"}; EXPECT_EQ(true, jf.has_remain_json()); EXPECT_EQ(paths, result); + ASSERT_TRUE(nullptr != jf.remain_fitler()); + EXPECT_FALSE(jf.remain_fitler()->test_bytes("b20", 3)); + EXPECT_TRUE(jf.remain_fitler()->test_bytes("k5", 2)); } { config::json_flat_sparsity_factor = 0.8; JsonPathDeriver jf; + jf.set_generate_filter(true); jf.derived({json_input}); auto& result = jf.flat_paths(); std::vector paths = {"a19", "b20", "c18", "e17", "z16"}; EXPECT_EQ(true, jf.has_remain_json()); EXPECT_EQ(paths, result); + ASSERT_TRUE(nullptr != jf.remain_fitler()); + EXPECT_FALSE(jf.remain_fitler()->test_bytes("b20", 3)); + EXPECT_TRUE(jf.remain_fitler()->test_bytes("k9", 2)); } { config::json_flat_sparsity_factor = 0; JsonPathDeriver jf; + jf.set_generate_filter(true); jf.derived({json_input}); auto& result = jf.flat_paths(); std::vector paths = {"a10", "a19", "b20", "c18", "e17", "g14", "h12", "k1", "k11", "k13", "k15", "k2", "k3", "k4", "k5", "k6", "k7", "k8", "k9", "z16"}; + EXPECT_EQ(false, jf.has_remain_json()); + EXPECT_EQ(paths, result); + ASSERT_FALSE(nullptr != jf.remain_fitler()); + } +} + +TEST_F(JsonFlattenerTest, testRemainFilter) { + // clang-format off + std::vector jsons = { + R"({"K1": 123, "K2": "some", "K3": {"f1": "valu1", "n2": 456}, "K4": [true, "abc", 789], "K5": {"nf": {"s1": "text", "subfield2": 123, "subfield3": ["a", "b", "c"]}}})", + R"({"K1": 456, "K2": "anor", "K3": {"f1": "valu3", "n4": 789}, "K4": [false, "def", 101112], "K6": {"nf": {"s1": 789, "subfield2": "text", "subfield3": [1, 2, 3]}}})", + R"({"K1": 789, "K2": "yete", "K3": {"f1": 1011122, "n6": "nested_value6"}, "K4": [true, "xyz", 131415], "K7": {"nf": {"s1": "text", "subfield2": ["x", "y", "z"], "subfield3": 456}}})", + R"({"K1": 101, "K2": "onee", "K3": {"f1": "valu7", "n8": ["a", "b", "c"]}, "K4": [false, "uvw", 789], "K8": {"nf": {"s1": 101112, "subfield2": "text", "subfield3": 123}}})", + R"({"K1": 131, "K2": "fine", "K3": {"f1": 7892342, "n1": "nested_value10"}, "K4": [true, "pqr", 456], "K9": {"nf": {"s1": ["p", "q", "r"], "subfield2": 789, "subfield3": "text"}}})", + }; + // clang-format on + + ColumnPtr input = JsonColumn::create(); + JsonColumn* json_input = down_cast(input.get()); + for (const auto& json : jsons) { + ASSIGN_OR_ABORT(auto json_value, JsonValue::parse(json)); + json_input->append(&json_value); + } + + { + config::json_flat_sparsity_factor = 0.3; + JsonPathDeriver jf; + jf.set_generate_filter(true); + jf.derived({json_input}); + + auto& result = jf.flat_paths(); + std::vector paths = {"K1", "K2", "K3.f1", "K4"}; EXPECT_EQ(true, jf.has_remain_json()); EXPECT_EQ(paths, result); + ASSERT_TRUE(nullptr != jf.remain_fitler()); + for (const auto& pp : paths) { + EXPECT_FALSE(jf.remain_fitler()->test_bytes(pp.data(), pp.size())); + } + EXPECT_TRUE(jf.remain_fitler()->test_bytes("K5", 2)); + EXPECT_TRUE(jf.remain_fitler()->test_bytes("n2", 2)); + EXPECT_TRUE(jf.remain_fitler()->test_bytes("K3", 2)); + EXPECT_TRUE(jf.remain_fitler()->test_bytes("K9", 2)); + EXPECT_TRUE(jf.remain_fitler()->test_bytes("K6", 2)); + EXPECT_TRUE(jf.remain_fitler()->test_bytes("subfield2", 9)); + } +} + +TEST_F(JsonFlattenerTest, testPointJson) { + // clang-format off + std::vector jsons = { + R"( {"k1.k2.k3.k4": 1, "k2": 2} )", + R"( {"k1.k2.k3.k4": 3, "k2": 4} )" + }; + // clang-format on + + ColumnPtr input = JsonColumn::create(); + JsonColumn* json_input = down_cast(input.get()); + for (const auto& json : jsons) { + ASSIGN_OR_ABORT(auto json_value, JsonValue::parse(json)); + json_input->append(&json_value); } + JsonPathDeriver jf; + jf.derived({json_input}); + + auto& result = jf.flat_paths(); + std::vector paths = {"k2"}; + EXPECT_EQ(true, jf.has_remain_json()); + EXPECT_EQ(paths, result); + + std::vector types = {TYPE_BIGINT}; + auto result_col = test_json(jsons, paths, types, false); + EXPECT_EQ("2", result_col[0]->debug_item(0)); + EXPECT_EQ("4", result_col[0]->debug_item(1)); } } // namespace starrocks diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java b/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java index 8d906347c7608..731cd4c654872 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/FunctionSet.java @@ -1258,6 +1258,12 @@ private void initAggregateBuiltins() { // flat json meta addBuiltin(AggregateFunction.createBuiltin(FLAT_JSON_META, Lists.newArrayList(Type.JSON), Type.ARRAY_VARCHAR, Type.ARRAY_VARCHAR, false, false, false)); + addBuiltin(AggregateFunction.createBuiltin(FLAT_JSON_META, Lists.newArrayList(Type.ANY_STRUCT), + Type.ARRAY_VARCHAR, Type.ARRAY_VARCHAR, false, false, false)); + addBuiltin(AggregateFunction.createBuiltin(FLAT_JSON_META, Lists.newArrayList(Type.ANY_MAP), + Type.ARRAY_VARCHAR, Type.ARRAY_VARCHAR, false, false, false)); + addBuiltin(AggregateFunction.createBuiltin(FLAT_JSON_META, Lists.newArrayList(Type.ANY_ARRAY), + Type.ARRAY_VARCHAR, Type.ARRAY_VARCHAR, false, false, false)); for (Type t : Type.getSupportedTypes()) { // null/char/time is handled through type promotion diff --git a/gensrc/proto/segment.proto b/gensrc/proto/segment.proto index 5cc40145c300f..1fc9439e9073d 100644 --- a/gensrc/proto/segment.proto +++ b/gensrc/proto/segment.proto @@ -165,6 +165,8 @@ message JsonMetaPB { optional bool is_flat = 2; optional bool has_remain = 3; + + optional bytes remain_filter = 4; } message ColumnMetaPB {