Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mmap2 #2407

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
16 changes: 16 additions & 0 deletions src/parser/type/serialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ inline std::string ReadBuf<std::string>(const char *buf) {
return str;
}

template <>
inline std::tuple<> ReadBuf<std::tuple<>>(const char *buf) {
return {};
}

template <>
inline std::tuple<> ReadBufAdv<std::tuple<>>(const char *&buf) {
return {};
}

template <>
inline std::string ReadBufAdv<std::string>(const char *&buf) {
int32_t size = ReadBufAdv<int32_t>(buf);
Expand Down Expand Up @@ -93,6 +103,12 @@ inline void WriteBufAdv<std::string>(char *&buf, const std::string &value) {
buf += len;
}

template <>
inline void WriteBuf<std::tuple<>>(char *const buf, const std::tuple<> &) {}

template <>
inline void WriteBufAdv<std::tuple<>>(char *&buf, const std::tuple<> &) {}

template<typename T>
inline void WriteBufVecAdv(char *&buf, const T *data, size_t size) {
static_assert(std::is_standard_layout_v<T>, "T must be POD");
Expand Down
9 changes: 8 additions & 1 deletion src/storage/buffer/buffer_obj.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,16 @@ void BufferObj::UpdateFileWorkerInfo(UniquePtr<FileWorker> new_file_worker) {
}
}

BufferHandle BufferObj::Load() {
BufferHandle BufferObj::Load(bool no_mmap) {
buffer_mgr_->AddRequestCount();
std::unique_lock<std::mutex> locker(w_locker_);
if (type_ == BufferType::kMmap && no_mmap) {
if (rc_ > 0) {
String error_message = fmt::format("Buffer {} is mmaped, but has {} references", GetFilename(), rc_);
UnrecoverableError(error_message);
}
type_ = BufferType::kPersistent;
}
if (type_ == BufferType::kMmap) {
switch (status_) {
case BufferStatus::kLoaded: {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/buffer_obj.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public:

public:
// called by ObjectHandle when load first time for that ObjectHandle
BufferHandle Load();
BufferHandle Load(bool no_mmap = false);

// called by BufferMgr in GC process.
bool Free();
Expand Down
7 changes: 6 additions & 1 deletion src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
module;

#include <tuple>
#include <cstring>
#include<cerrno>

module file_worker;

Expand Down Expand Up @@ -223,7 +225,10 @@ void FileWorker::Mmap() {
auto [defer_fn, read_path] = GetFilePathInner(false);
bool use_object_cache = persistence_manager_ != nullptr;
SizeT file_size = VirtualStore::GetFileSize(read_path);
VirtualStore::MmapFile(read_path, mmap_addr_, file_size);
int ret = VirtualStore::MmapFile(read_path, mmap_addr_, file_size);
if (ret < 0) {
UnrecoverableError(fmt::format("Mmap file {} failed. {}", read_path, strerror(errno)));
}
if (use_object_cache) {
const void *ptr = mmap_addr_ + obj_addr_.part_offset_;
this->ReadFromMmapImpl(ptr, obj_addr_.part_size_);
Expand Down
4 changes: 2 additions & 2 deletions src/storage/buffer/file_worker/file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ protected:

virtual void ReadFromFileImpl(SizeT file_size) = 0;

Pair<Optional<DeferFn<std::function<void()>>>, String> GetFilePathInner(bool spill);

private:
String ChooseFileDir(bool spill) const;

Pair<Optional<DeferFn<std::function<void()>>>, String> GetFilePathInner(bool spill);

public:
const SharedPtr<String> data_dir_{};
const SharedPtr<String> temp_dir_{};
Expand Down
36 changes: 30 additions & 6 deletions src/storage/buffer/file_worker/hnsw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ HnswFileWorker::HnswFileWorker(SharedPtr<String> data_dir,
SharedPtr<String> file_name,
SharedPtr<IndexBase> index_base,
SharedPtr<ColumnDef> column_def,
PersistenceManager* persistence_manager,
PersistenceManager *persistence_manager,
SizeT index_size)
: IndexFileWorker(std::move(data_dir),
std::move(temp_dir),
Expand Down Expand Up @@ -82,7 +82,7 @@ void HnswFileWorker::AllocateInMemory() {
}

void HnswFileWorker::FreeInMemory() {
if (!data_) {
if (!data_ && !hnsw_mem_) {
String error_message = "FreeInMemory: Data is not allocated.";
UnrecoverableError(error_message);
}
Expand All @@ -99,6 +99,10 @@ void HnswFileWorker::FreeInMemory() {
*p);
delete p;
data_ = nullptr;

auto [defer_fn, read_path] = GetFilePathInner(false);
VirtualStore::MunmapFile(read_path);
hnsw_mem_ = nullptr;
}

bool HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const FileWorkerSaveCtx &ctx) {
Expand All @@ -113,19 +117,34 @@ bool HnswFileWorker::WriteToFileImpl(bool to_spill, bool &prepare_success, const
if constexpr (std::is_same_v<T, std::nullptr_t>) {
UnrecoverableError("Invalid index type.");
} else {
index->Save(*file_handle_);
using IndexT = std::decay_t<decltype(*index)>;
if constexpr (IndexT::kOwnMem) {
index->SaveToPtr(*file_handle_);
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
prepare_success = true;
return true;
}

void HnswFileWorker::ReadFromFileImpl(SizeT file_size) {
void HnswFileWorker::ReadFromFileImpl(SizeT fsize) {
if (data_ != nullptr) {
UnrecoverableError("Data is already allocated.");
}
data_ = static_cast<void *>(new AbstractHnsw(HnswIndexInMem::InitAbstractIndex(index_base_.get(), column_def_.get())));

auto [defer_fn, read_path] = GetFilePathInner(false);
SizeT file_size = VirtualStore::GetFileSize(read_path);
u8 *mmap_addr = nullptr;
int ret = VirtualStore::MmapFile(read_path, mmap_addr, file_size);
hnsw_mem_ = mmap_addr;
if (ret < 0) {
UnrecoverableError(fmt::format("Mmap file {} failed. {}", read_path, strerror(errno)));
}

data_ = static_cast<void *>(new AbstractHnsw(HnswIndexInMem::InitAbstractIndex(index_base_.get(), column_def_.get(), false /*own_mem*/)));
auto *hnsw_index = reinterpret_cast<AbstractHnsw *>(data_);
std::visit(
[&](auto &&index) {
Expand All @@ -134,7 +153,12 @@ void HnswFileWorker::ReadFromFileImpl(SizeT file_size) {
UnrecoverableError("Invalid index type.");
} else {
using IndexT = std::decay_t<decltype(*index)>;
index = IndexT::Load(*file_handle_).release();
if constexpr (!IndexT::kOwnMem) {
const auto *mmap_addr = static_cast<const char *>(hnsw_mem_);
index = IndexT::LoadFromPtr(mmap_addr, file_size).release();
} else {
UnrecoverableError("Invalid index type.");
}
}
},
*hnsw_index);
Expand Down
1 change: 1 addition & 0 deletions src/storage/buffer/file_worker/hnsw_file_worker.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ protected:

private:
SizeT index_size_{};
void *hnsw_mem_{};
};

} // namespace infinity
2 changes: 1 addition & 1 deletion src/storage/io/virtual_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ i32 VirtualStore::MmapFile(const String &file_path, u8 *&data_ptr, SizeT &data_l
return -1;
i32 f = open(file_path.c_str(), O_RDONLY);
void *tmpd = mmap(NULL, len_f, PROT_READ, MAP_SHARED, f, 0);
close(f);
if (tmpd == MAP_FAILED)
return -1;
close(f);
i32 rc = madvise(tmpd,
len_f,
MADV_NORMAL
Expand Down
137 changes: 68 additions & 69 deletions src/storage/knn_index/knn_hnsw/abstract_hnsw.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,32 +77,16 @@ HnswIndexInMem::HnswIndexInMem(RowID begin_row_id,
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
using IndexT = std::decay_t<decltype(*index)>;
hnsw_ = IndexT::Make(chunk_size, max_chunk_num, dim, M, ef_construction).release();
if constexpr (IndexT::kOwnMem) {
index = IndexT::Make(chunk_size, max_chunk_num, dim, M, ef_construction).release();
} else {
UnrecoverableError("HnswIndexInMem::HnswIndexInMem: index does not own memory");
}
}
},
hnsw_);
}

AbstractHnsw HnswIndexInMem::InitAbstractIndex(const IndexBase *index_base, const ColumnDef *column_def) {
const auto *index_hnsw = static_cast<const IndexHnsw *>(index_base);
const auto *embedding_info = static_cast<const EmbeddingInfo *>(column_def->type()->type_info().get());

switch (embedding_info->Type()) {
case EmbeddingDataType::kElemFloat: {
return InitAbstractIndex<float>(index_hnsw);
}
case EmbeddingDataType::kElemUInt8: {
return InitAbstractIndex<u8>(index_hnsw);
}
case EmbeddingDataType::kElemInt8: {
return InitAbstractIndex<i8>(index_hnsw);
}
default: {
return nullptr;
}
}
}

HnswIndexInMem::~HnswIndexInMem() {
SizeT mem_usage = 0;
std::visit(
Expand Down Expand Up @@ -146,27 +130,33 @@ void HnswIndexInMem::InsertVecs(SizeT block_offset,
std::visit(
[&](auto &&index) {
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
if constexpr (std::is_same_v<T, std::nullptr_t>) {
return;
} else {
using IndexT = std::decay_t<decltype(*index)>;
using DataType = typename IndexT::DataType;
SizeT mem_usage{};
switch (const auto &column_data_type = block_column_entry->column_type(); column_data_type->type()) {
case LogicalType::kEmbedding: {
MemIndexInserterIter<DataType> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertVecs(index, std::move(iter), config, mem_usage);
break;
}
case LogicalType::kMultiVector: {
MemIndexInserterIter<MultiVectorRef<DataType>> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertVecs(index, std::move(iter), config, mem_usage);
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported column type for HNSW index: {}", column_data_type->ToString()));
break;
if constexpr (IndexT::kOwnMem) {
using DataType = typename IndexT::DataType;
SizeT mem_usage{};
switch (const auto &column_data_type = block_column_entry->column_type(); column_data_type->type()) {
case LogicalType::kEmbedding: {
MemIndexInserterIter<DataType> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertVecs(index, std::move(iter), config, mem_usage);
break;
}
case LogicalType::kMultiVector: {
MemIndexInserterIter<MultiVectorRef<DataType>> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertVecs(index, std::move(iter), config, mem_usage);
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported column type for HNSW index: {}", column_data_type->ToString()));
break;
}
}
this->IncreaseMemoryUsageBase(mem_usage);
} else {
UnrecoverableError("HnswIndexInMem::InsertVecs: index does not own memory");
}
this->IncreaseMemoryUsageBase(mem_usage);
}
},
hnsw_);
Expand All @@ -183,38 +173,42 @@ void HnswIndexInMem::InsertVecs(const SegmentEntry *segment_entry,
using T = std::decay_t<decltype(index)>;
if constexpr (!std::is_same_v<T, std::nullptr_t>) {
using IndexT = std::decay_t<decltype(*index)>;
using DataType = typename IndexT::DataType;
if constexpr (!IndexT::kOwnMem) {
UnrecoverableError("HnswIndexInMem::InsertVecs: index does not own memory");
} else {
using DataType = typename IndexT::DataType;

SizeT mem_usage{};
switch (const auto &column_data_type = segment_entry->GetTableEntry()->GetColumnDefByID(column_id)->type();
column_data_type->type()) {
case LogicalType::kEmbedding: {
if (check_ts) {
OneColumnIterator<DataType> iter(segment_entry, buffer_mgr, column_id, begin_ts);
InsertVecs(index, std::move(iter), config, mem_usage);
} else {
OneColumnIterator<DataType, false> iter(segment_entry, buffer_mgr, column_id, begin_ts);
InsertVecs(index, std::move(iter), config, mem_usage);
SizeT mem_usage{};
switch (const auto &column_data_type = segment_entry->GetTableEntry()->GetColumnDefByID(column_id)->type();
column_data_type->type()) {
case LogicalType::kEmbedding: {
if (check_ts) {
OneColumnIterator<DataType> iter(segment_entry, buffer_mgr, column_id, begin_ts);
InsertVecs(index, std::move(iter), config, mem_usage);
} else {
OneColumnIterator<DataType, false> iter(segment_entry, buffer_mgr, column_id, begin_ts);
InsertVecs(index, std::move(iter), config, mem_usage);
}
break;
}
break;
}
case LogicalType::kMultiVector: {
const auto ele_size = column_data_type->type_info()->Size();
if (check_ts) {
OneColumnIterator<MultiVectorRef<DataType>> iter(segment_entry, buffer_mgr, column_id, begin_ts, ele_size);
InsertVecs(index, std::move(iter), config, mem_usage);
} else {
OneColumnIterator<MultiVectorRef<DataType>, false> iter(segment_entry, buffer_mgr, column_id, begin_ts, ele_size);
InsertVecs(index, std::move(iter), config, mem_usage);
case LogicalType::kMultiVector: {
const auto ele_size = column_data_type->type_info()->Size();
if (check_ts) {
OneColumnIterator<MultiVectorRef<DataType>> iter(segment_entry, buffer_mgr, column_id, begin_ts, ele_size);
InsertVecs(index, std::move(iter), config, mem_usage);
} else {
OneColumnIterator<MultiVectorRef<DataType>, false> iter(segment_entry, buffer_mgr, column_id, begin_ts, ele_size);
InsertVecs(index, std::move(iter), config, mem_usage);
}
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported column type for HNSW index: {}", column_data_type->ToString()));
break;
}
break;
}
default: {
UnrecoverableError(fmt::format("Unsupported column type for HNSW index: {}", column_data_type->ToString()));
break;
}
this->IncreaseMemoryUsageBase(mem_usage);
}
this->IncreaseMemoryUsageBase(mem_usage);
}
},
hnsw_);
Expand All @@ -231,9 +225,14 @@ SharedPtr<ChunkIndexEntry> HnswIndexInMem::Dump(SegmentIndexEntry *segment_index
if constexpr (std::is_same_v<T, std::nullptr_t>) {
return;
} else {
row_count = index->GetVecNum();
index_size = index->GetSizeInBytes();
dump_size = index->mem_usage();
using IndexT = typename std::remove_pointer_t<T>;
if constexpr (IndexT::kOwnMem) {
row_count = index->GetVecNum();
index_size = index->GetSizeInBytes();
dump_size = index->mem_usage();
} else {
UnrecoverableError("HnswIndexInMem::Dump: index does not own memory");
}
}
},
hnsw_);
Expand Down
Loading
Loading