Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SecondaryIndex #2391

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr<BlockEntry> block_entry,
case IndexType::kSecondary: {
if (memory_secondary_index_.get() == nullptr) {
std::unique_lock<std::shared_mutex> lck(rw_locker_);
memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, begin_row_id);
memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, this, begin_row_id);
}
BlockColumnEntry *block_column_entry = block_entry->GetColumnBlockEntry(column_idx);
memory_secondary_index_->InsertBlockData(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
Expand Down Expand Up @@ -506,7 +506,7 @@ void SegmentIndexEntry::PopulateEntirely(const SegmentEntry *segment_entry, Txn
break;
}
case IndexType::kSecondary: {
memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, base_row_id);
memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, this, base_row_id);
u64 column_id = column_def->id();
SizeT column_idx = table_entry->GetColumnIdxByID(column_id);
auto block_entry_iter = BlockEntryIter(segment_entry);
Expand Down Expand Up @@ -989,6 +989,8 @@ BaseMemIndex *SegmentIndexEntry::GetMemIndex() const {
return static_cast<BaseMemIndex *>(memory_ivf_index_.get());
} else if (memory_indexer_.get() != nullptr) {
return static_cast<BaseMemIndex *>(memory_indexer_.get());
} else if (memory_secondary_index_.get() != nullptr) {
return static_cast<BaseMemIndex *>(memory_secondary_index_.get());
} else if (memory_bmp_index_.get() != nullptr) {
return static_cast<BaseMemIndex *>(memory_bmp_index_.get());
}
Expand Down
79 changes: 59 additions & 20 deletions src/storage/secondary_index/secondary_index_in_mem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
module;

#include <bit>
#include <cassert>
#include <vector>

module secondary_index_in_mem;
Expand All @@ -31,33 +32,54 @@ import infinity_exception;
import secondary_index_data;
import chunk_index_entry;
import segment_index_entry;
import table_index_entry;
import buffer_handle;
import logger;
import base_memindex;
import memindex_tracer;

namespace infinity {

constexpr u32 map_memory_bloat_factor = 3;

template <typename RawValueType>
class SecondaryIndexInMemT final : public SecondaryIndexInMem {
using KeyType = ConvertToOrderedType<RawValueType>;
const RowID begin_row_id_;
const u32 max_size_;
mutable std::shared_mutex map_mutex_;
MultiMap<KeyType, u32> in_mem_secondary_index_;

protected:
u32 GetRowCountNoLock() const override { return in_mem_secondary_index_.size(); }
u32 MemoryCostOfEachRow() const override { return map_memory_bloat_factor * (sizeof(KeyType) + sizeof(u32)); }
u32 MemoryCostOfThis() const override { return sizeof(*this); }

public:
explicit SecondaryIndexInMemT(const RowID begin_row_id, const u32 max_size) : begin_row_id_(begin_row_id), max_size_(max_size) {}
u32 GetRowCount() const override { return in_mem_secondary_index_.size(); }
SecondaryIndexInMemT(SegmentIndexEntry *segment_index_entry, const RowID begin_row_id)
: SecondaryIndexInMem(segment_index_entry), begin_row_id_(begin_row_id) {
IncreaseMemoryUsageBase(MemoryCostOfThis());
}
~SecondaryIndexInMemT() override {
DecreaseMemoryUsageBase(MemoryCostOfThis() + GetRowCount() * MemoryCostOfEachRow());
}
u32 GetRowCount() const override {
std::shared_lock lock(map_mutex_);
return in_mem_secondary_index_.size();
}
void InsertBlockData(const SegmentOffset block_offset,
BlockColumnEntry *block_column_entry,
BufferManager *buffer_manager,
const u32 row_offset,
const u32 row_count) override {
MemIndexInserterIter<RawValueType> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertInner(iter);
const auto inserted_rows = InsertInner(iter);
assert(inserted_rows == row_count);
IncreaseMemoryUsageBase(inserted_rows * MemoryCostOfEachRow());
}
SharedPtr<ChunkIndexEntry> Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const override {
assert(segment_index_entry == segment_index_entry_);
std::shared_lock lock(map_mutex_);
u32 row_count = GetRowCount();
const u32 row_count = GetRowCountNoLock();
auto new_chunk_index_entry = segment_index_entry->CreateSecondaryIndexChunkIndexEntry(begin_row_id_, row_count, buffer_mgr);
BufferHandle handle = new_chunk_index_entry->GetIndex();
auto data_ptr = static_cast<SecondaryIndexData *>(handle.GetDataMut());
Expand All @@ -70,7 +92,8 @@ class SecondaryIndexInMemT final : public SecondaryIndexInMem {
}

private:
void InsertInner(auto &iter) {
u32 InsertInner(auto &iter) {
u32 inserted_count = 0;
std::unique_lock lock(map_mutex_);
while (true) {
auto opt = iter.Next();
Expand All @@ -87,7 +110,9 @@ class SecondaryIndexInMemT final : public SecondaryIndexInMem {
const KeyType key = ConvertToOrderedKeyValue(*v_ptr);
in_mem_secondary_index_.emplace(key, offset);
}
++inserted_count;
}
return inserted_count;
}

Pair<u32, Bitmask> RangeQueryInner(const u32 segment_row_count, const KeyType b, const KeyType e) const {
Expand All @@ -107,44 +132,58 @@ class SecondaryIndexInMemT final : public SecondaryIndexInMem {
}
};

SharedPtr<SecondaryIndexInMem> SecondaryIndexInMem::NewSecondaryIndexInMem(const SharedPtr<ColumnDef> &column_def, RowID begin_row_id, u32 max_size) {
MemIndexTracerInfo SecondaryIndexInMem::GetInfo() const {
auto *table_index_entry = segment_index_entry_->table_index_entry();
SharedPtr<String> index_name = table_index_entry->GetIndexName();
auto *table_entry = table_index_entry->table_index_meta()->GetTableEntry();
SharedPtr<String> table_name = table_entry->GetTableName();
SharedPtr<String> db_name = table_entry->GetDBName();
const auto row_cnt = GetRowCount();
const auto mem = MemoryCostOfThis() + row_cnt * MemoryCostOfEachRow();
return MemIndexTracerInfo(std::move(index_name), std::move(table_name), std::move(db_name), mem, row_cnt);
}

TableIndexEntry *SecondaryIndexInMem::table_index_entry() const { return segment_index_entry_->table_index_entry(); }

SharedPtr<SecondaryIndexInMem> SecondaryIndexInMem::NewSecondaryIndexInMem(const SharedPtr<ColumnDef> &column_def,
SegmentIndexEntry *segment_index_entry,
RowID begin_row_id) {
if (!column_def->type()->CanBuildSecondaryIndex()) {
String error_message = "Column type can't build secondary index";
UnrecoverableError(error_message);
UnrecoverableError("Column type can't build secondary index");
}
switch (column_def->type()->type()) {
case LogicalType::kTinyInt: {
return MakeShared<SecondaryIndexInMemT<TinyIntT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<TinyIntT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kSmallInt: {
return MakeShared<SecondaryIndexInMemT<SmallIntT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<SmallIntT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kInteger: {
return MakeShared<SecondaryIndexInMemT<IntegerT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<IntegerT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kBigInt: {
return MakeShared<SecondaryIndexInMemT<BigIntT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<BigIntT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kFloat: {
return MakeShared<SecondaryIndexInMemT<FloatT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<FloatT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kDouble: {
return MakeShared<SecondaryIndexInMemT<DoubleT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<DoubleT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kDate: {
return MakeShared<SecondaryIndexInMemT<DateT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<DateT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kTime: {
return MakeShared<SecondaryIndexInMemT<TimeT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<TimeT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kDateTime: {
return MakeShared<SecondaryIndexInMemT<DateTimeT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<DateTimeT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kTimestamp: {
return MakeShared<SecondaryIndexInMemT<TimestampT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<TimestampT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kVarchar: {
return MakeShared<SecondaryIndexInMemT<VarcharT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<VarcharT> >(segment_index_entry, begin_row_id);
}
default: {
return nullptr;
Expand Down
32 changes: 28 additions & 4 deletions src/storage/secondary_index/secondary_index_in_mem.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,53 @@ export module secondary_index_in_mem;

import stl;
import roaring_bitmap;
import internal_types;
import column_def;
import table_index_entry;
import base_memindex;
import memindex_tracer;

namespace infinity {

struct RowID;
struct BlockColumnEntry;
class BufferManager;
class ColumnDef;
struct ChunkIndexEntry;
struct SegmentIndexEntry;

export class SecondaryIndexInMem {
export class SecondaryIndexInMem : public BaseMemIndex {
protected:
SegmentIndexEntry *segment_index_entry_ = nullptr;

explicit SecondaryIndexInMem(SegmentIndexEntry *segment_index_entry) : segment_index_entry_(segment_index_entry) {}

virtual u32 GetRowCountNoLock() const = 0;

virtual u32 MemoryCostOfEachRow() const = 0;

virtual u32 MemoryCostOfThis() const = 0;

public:
virtual ~SecondaryIndexInMem() = default;

MemIndexTracerInfo GetInfo() const override;

TableIndexEntry *table_index_entry() const override;

virtual u32 GetRowCount() const = 0;

virtual void InsertBlockData(SegmentOffset block_offset,
BlockColumnEntry *block_column_entry,
BufferManager *buffer_manager,
u32 row_offset,
u32 row_count) = 0;

virtual SharedPtr<ChunkIndexEntry> Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const = 0;

virtual Pair<u32, Bitmask> RangeQuery(const void *input) const = 0;

static SharedPtr<SecondaryIndexInMem> NewSecondaryIndexInMem(const SharedPtr<ColumnDef> &column_def, RowID begin_row_id, u32 max_size = 5 << 20);
static SharedPtr<SecondaryIndexInMem> NewSecondaryIndexInMem(const SharedPtr<ColumnDef> &column_def,
SegmentIndexEntry *segment_index_entry,
RowID begin_row_id);
};

} // namespace infinity