From 6f21dae00ac8227d8b9fac7ed975cf7705ee65ab Mon Sep 17 00:00:00 2001 From: Gopal Srinivasa Date: Wed, 13 Mar 2024 23:56:13 +0530 Subject: [PATCH 1/5] First version of multi filter search. Buggy --- apps/search_disk_index.cpp | 61 ++++- include/pq_flash_index.h | 12 +- include/utils.h | 9 +- src/pq_flash_index.cpp | 448 ++++++++++++++++++++++++------------- src/utils.cpp | 15 ++ 5 files changed, 373 insertions(+), 172 deletions(-) diff --git a/apps/search_disk_index.cpp b/apps/search_disk_index.cpp index 7e2a7ac6d..9b1a5fa2c 100644 --- a/apps/search_disk_index.cpp +++ b/apps/search_disk_index.cpp @@ -4,6 +4,7 @@ #include "common_includes.h" #include +#include "utils.h" #include "index.h" #include "disk_utils.h" #include "math_utils.h" @@ -47,6 +48,44 @@ void print_stats(std::string category, std::vector percentiles, std::vect diskann::cout << std::endl; } +template +void parse_labels_of_query(const std::string &filters_for_query, + std::unique_ptr> &pFlashIndex, + std::vector &label_ids_for_query) +{ + std::vector label_strs_for_query; + diskann::split_string(filters_for_query, MULTIPLE_LABEL_SEPARATOR, label_strs_for_query); + for (auto &label_str_for_query : label_strs_for_query) + { + label_ids_for_query.push_back(pFlashIndex->get_converted_label(label_str_for_query)); + } +} + +template +void populate_label_ids(const std::vector &filters_of_queries, + std::unique_ptr> &pFlashIndex, + std::vector> &label_ids_of_queries, bool apply_one_to_all, uint32_t query_count) +{ + if (apply_one_to_all) + { + std::vector label_ids_of_query; + parse_labels_of_query(filters_of_queries[0], pFlashIndex, label_ids_of_query); + for (auto i = 0; i < query_count; i++) + { + label_ids_of_queries.push_back(label_ids_of_query); + } + } + else + { + for (auto &filters_of_query : filters_of_queries) + { + std::vector label_ids_of_query; + parse_labels_of_query(filters_of_query, pFlashIndex, label_ids_of_query); + label_ids_of_queries.push_back(label_ids_of_query); + } + } +} + template int search_disk_index(diskann::Metric &metric, const std::string &index_path_prefix, const std::string &result_output_prefix, const std::string &query_file, std::string >_file, @@ -173,6 +212,14 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre diskann::cout << "..done" << std::endl; } + std::vector> per_query_label_ids; + if (filtered_search) + { + populate_label_ids(query_filters, _pFlashIndex, per_query_label_ids, (query_filters.size() == 1), query_num ); + } + + + diskann::cout.setf(std::ios_base::fixed, std::ios_base::floatfield); diskann::cout.precision(2); @@ -236,19 +283,10 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre } else { - LabelT label_for_search; - if (query_filters.size() == 1) - { // one label for all queries - label_for_search = _pFlashIndex->get_converted_label(query_filters[0]); - } - else - { // one label for each query - label_for_search = _pFlashIndex->get_converted_label(query_filters[i]); - } _pFlashIndex->cached_beam_search( query + (i * query_aligned_dim), recall_at, L, query_result_ids_64.data() + (i * recall_at), - query_result_dists[test_id].data() + (i * recall_at), optimized_beamwidth, true, label_for_search, - use_reorder_data, stats + i); + query_result_dists[test_id].data() + (i * recall_at), optimized_beamwidth, true, per_query_label_ids[i], + search_io_limit, use_reorder_data, stats + i); } } auto e = std::chrono::high_resolution_clock::now(); @@ -443,7 +481,6 @@ int main(int argc, char **argv) { query_filters = read_file_to_vector_of_strings(query_filters_file); } - try { if (!query_filters.empty() && label_type == "ushort") diff --git a/include/pq_flash_index.h b/include/pq_flash_index.h index ba5258e18..d37988f78 100644 --- a/include/pq_flash_index.h +++ b/include/pq_flash_index.h @@ -2,6 +2,7 @@ // Licensed under the MIT license. #pragma once +#include #include "common_includes.h" #include "aligned_file_reader.h" @@ -35,6 +36,11 @@ template class PQFlashIndex DISKANN_DLLEXPORT int load(uint32_t num_threads, const char *index_prefix); #endif + DISKANN_DLLEXPORT void load_labels(const std::string& disk_index_filepath); + DISKANN_DLLEXPORT void load_label_medoid_map( + const std::string &labels_to_medoids_filepath, std::istream &medoid_stream); + DISKANN_DLLEXPORT void load_dummy_map(const std::string& dummy_map_filepath, std::istream &dummy_map_stream); + #ifdef EXEC_ENV_OLS DISKANN_DLLEXPORT int load_from_separate_paths(diskann::MemoryMappedFiles &files, uint32_t num_threads, const char *index_filepath, const char *pivots_filepath, @@ -77,7 +83,7 @@ template class PQFlashIndex DISKANN_DLLEXPORT void cached_beam_search(const T *query, const uint64_t k_search, const uint64_t l_search, uint64_t *res_ids, float *res_dists, const uint64_t beam_width, - const bool use_filter, const LabelT &filter_label, + const bool use_filter, const std::vector &filter_labels, const uint32_t io_limit, const bool use_reorder_data = false, QueryStats *stats = nullptr); @@ -116,7 +122,9 @@ template class PQFlashIndex private: DISKANN_DLLEXPORT inline bool point_has_label(uint32_t point_id, LabelT label_id); - std::unordered_map load_label_map(std::basic_istream &infile); + DISKANN_DLLEXPORT inline bool point_has_any_label(uint32_t point_id, const std::vector &label_ids); + void load_label_map(std::basic_istream &map_reader, + std::unordered_map &string_to_int_map); DISKANN_DLLEXPORT void parse_label_file(std::basic_istream &infile, size_t &num_pts_labels); DISKANN_DLLEXPORT void get_label_file_metadata(const std::string &fileContent, uint32_t &num_pts, uint32_t &num_total_labels); diff --git a/include/utils.h b/include/utils.h index d3af5c3a9..463331435 100644 --- a/include/utils.h +++ b/include/utils.h @@ -57,6 +57,7 @@ typedef int FileHandle; #define PBSTR "||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||" #define PBWIDTH 60 +#define MULTIPLE_LABEL_SEPARATOR "|" inline bool file_exists_impl(const std::string &name, bool dirCheck = false) { @@ -683,6 +684,9 @@ DISKANN_DLLEXPORT double calculate_range_search_recall(unsigned num_queries, std::vector> &groundtruth, std::vector> &our_results); +DISKANN_DLLEXPORT void split_string(const std::string &string_to_split, const std::string &delimiter, + std::vector &pieces); + template inline void load_bin(const std::string &bin_file, std::unique_ptr &data, size_t &npts, size_t &dim, size_t offset = 0) @@ -1101,11 +1105,6 @@ inline std::vector read_file_to_vector_of_strings(const std::string { break; } - if (line.find(',') != std::string::npos) - { - std::cerr << "Every query must have exactly one filter" << std::endl; - exit(-1); - } if (!line.empty() && (line.back() == '\r' || line.back() == '\n')) { line.erase(line.size() - 1); diff --git a/src/pq_flash_index.cpp b/src/pq_flash_index.cpp index 12b186fa3..4a8a0d4c7 100644 --- a/src/pq_flash_index.cpp +++ b/src/pq_flash_index.cpp @@ -27,7 +27,6 @@ namespace diskann { - template PQFlashIndex::PQFlashIndex(std::shared_ptr &fileReader, diskann::Metric m) : reader(fileReader), metric(m), _thread_data(nullptr) @@ -571,9 +570,8 @@ void PQFlashIndex::generate_random_labels(std::vector &labels } template -std::unordered_map PQFlashIndex::load_label_map(std::basic_istream &map_reader) +void PQFlashIndex::load_label_map(std::basic_istream &map_reader, std::unordered_map& string_to_int_map) { - std::unordered_map string_to_int_mp; std::string line, token; LabelT token_as_num; std::string label_str; @@ -584,9 +582,8 @@ std::unordered_map PQFlashIndex::load_label_map( label_str = token; getline(iss, token, '\t'); token_as_num = (LabelT)std::stoul(token); - string_to_int_mp[label_str] = token_as_num; + string_to_int_map[label_str] = token_as_num; } - return string_to_int_mp; } template @@ -674,6 +671,29 @@ inline bool PQFlashIndex::point_has_label(uint32_t point_id, LabelT l return ret_val; } +template +bool PQFlashIndex::point_has_any_label(uint32_t point_id, const std::vector &label_ids) +{ + uint32_t start_vec = _pts_to_label_offsets[point_id]; + uint32_t num_lbls = _pts_to_labels[start_vec]; + bool ret_val = false; + for (auto &cur_lbl : label_ids) + { + for (uint32_t i = 0; i < num_lbls; i++) + { + if (_pts_to_labels[start_vec + 1 + i] == cur_lbl) + { + ret_val = true; + break; + } + } + if (ret_val == true) + break; + } + return ret_val; +} + + template void PQFlashIndex::parse_label_file(std::basic_istream &infile, size_t &num_points_labels) { @@ -763,80 +783,81 @@ template void PQFlashIndex::set_univers _universal_filter_label = label; } -#ifdef EXEC_ENV_OLS template -int PQFlashIndex::load(MemoryMappedFiles &files, uint32_t num_threads, const char *index_prefix) +void PQFlashIndex::load_label_medoid_map(const std::string& labels_to_medoids_filepath, std::istream& medoid_stream) { -#else -template int PQFlashIndex::load(uint32_t num_threads, const char *index_prefix) -{ -#endif - std::string pq_table_bin = std::string(index_prefix) + "_pq_pivots.bin"; - std::string pq_compressed_vectors = std::string(index_prefix) + "_pq_compressed.bin"; - std::string _disk_index_file = std::string(index_prefix) + "_disk.index"; -#ifdef EXEC_ENV_OLS - return load_from_separate_paths(files, num_threads, _disk_index_file.c_str(), pq_table_bin.c_str(), - pq_compressed_vectors.c_str()); -#else - return load_from_separate_paths(num_threads, _disk_index_file.c_str(), pq_table_bin.c_str(), - pq_compressed_vectors.c_str()); -#endif -} + std::string line, token; -#ifdef EXEC_ENV_OLS -template -int PQFlashIndex::load_from_separate_paths(diskann::MemoryMappedFiles &files, uint32_t num_threads, - const char *index_filepath, const char *pivots_filepath, - const char *compressed_filepath) -{ -#else + _filter_to_medoid_ids.clear(); + try + { + while (std::getline(medoid_stream, line)) + { + std::istringstream iss(line); + uint32_t cnt = 0; + std::vector medoids; + LabelT label; + while (std::getline(iss, token, ',')) + { + if (cnt == 0) + label = (LabelT)std::stoul(token); + else + medoids.push_back((uint32_t)stoul(token)); + cnt++; + } + _filter_to_medoid_ids[label].swap(medoids); + } + } + catch (std::system_error &e) + { + throw FileException(labels_to_medoids_filepath, e, __FUNCSIG__, __FILE__, __LINE__); + } +} template -int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, const char *index_filepath, - const char *pivots_filepath, const char *compressed_filepath) +void PQFlashIndex::load_dummy_map(const std::string &dummy_map_filepath, std::istream &dummy_map_stream) { -#endif - std::string pq_table_bin = pivots_filepath; - std::string pq_compressed_vectors = compressed_filepath; - std::string _disk_index_file = index_filepath; - std::string medoids_file = std::string(_disk_index_file) + "_medoids.bin"; - std::string centroids_file = std::string(_disk_index_file) + "_centroids.bin"; - - std::string labels_file = std ::string(_disk_index_file) + "_labels.txt"; - std::string labels_to_medoids = std ::string(_disk_index_file) + "_labels_to_medoids.txt"; - std::string dummy_map_file = std ::string(_disk_index_file) + "_dummy_map.txt"; - std::string labels_map_file = std ::string(_disk_index_file) + "_labels_map.txt"; - size_t num_pts_in_label_file = 0; + std::string line, token; - size_t pq_file_dim, pq_file_num_centroids; -#ifdef EXEC_ENV_OLS - get_bin_metadata(files, pq_table_bin, pq_file_num_centroids, pq_file_dim, METADATA_SIZE); -#else - get_bin_metadata(pq_table_bin, pq_file_num_centroids, pq_file_dim, METADATA_SIZE); -#endif + try + { + while (std::getline(dummy_map_stream, line)) + { + std::istringstream iss(line); + uint32_t cnt = 0; + uint32_t dummy_id; + uint32_t real_id; + while (std::getline(iss, token, ',')) + { + if (cnt == 0) + dummy_id = (uint32_t)stoul(token); + else + real_id = (uint32_t)stoul(token); + cnt++; + } + _dummy_pts.insert(dummy_id); + _has_dummy_pts.insert(real_id); + _dummy_to_real_map[dummy_id] = real_id; - this->_disk_index_file = _disk_index_file; + if (_real_to_dummy_map.find(real_id) == _real_to_dummy_map.end()) + _real_to_dummy_map[real_id] = std::vector(); - if (pq_file_num_centroids != 256) + _real_to_dummy_map[real_id].emplace_back(dummy_id); + } + } + catch (std::system_error &e) { - diskann::cout << "Error. Number of PQ centroids is not 256. Exiting." << std::endl; - return -1; + throw FileException (dummy_map_filepath, e, __FUNCSIG__, __FILE__, __LINE__); } +} - this->_data_dim = pq_file_dim; - // will change later if we use PQ on disk or if we are using - // inner product without PQ - this->_disk_bytes_per_point = this->_data_dim * sizeof(T); - this->_aligned_dim = ROUND_UP(pq_file_dim, 8); - - size_t npts_u64, nchunks_u64; -#ifdef EXEC_ENV_OLS - diskann::load_bin(files, pq_compressed_vectors, this->data, npts_u64, nchunks_u64); -#else - diskann::load_bin(pq_compressed_vectors, this->data, npts_u64, nchunks_u64); -#endif +template void PQFlashIndex::load_labels(const std::string &disk_index_file) +{ + std::string labels_file = _disk_index_file + "_labels.txt"; + std::string labels_to_medoids = _disk_index_file + "_labels_to_medoids.txt"; + std::string dummy_map_file = _disk_index_file + "_dummy_map.txt"; + std::string labels_map_file = _disk_index_file + "_labels_map.txt"; + size_t num_pts_in_label_file = 0; - this->_num_points = npts_u64; - this->_n_chunks = nchunks_u64; #ifdef EXEC_ENV_OLS if (files.fileExists(labels_file)) { @@ -864,7 +885,7 @@ int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, cons #else std::ifstream map_reader(labels_map_file); #endif - _label_map = load_label_map(map_reader); + load_label_map(map_reader, _label_map); #ifndef EXEC_ENV_OLS map_reader.close(); @@ -882,32 +903,7 @@ int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, cons std::ifstream medoid_stream(labels_to_medoids); assert(medoid_stream.is_open()); #endif - std::string line, token; - - _filter_to_medoid_ids.clear(); - try - { - while (std::getline(medoid_stream, line)) - { - std::istringstream iss(line); - uint32_t cnt = 0; - std::vector medoids; - LabelT label; - while (std::getline(iss, token, ',')) - { - if (cnt == 0) - label = (LabelT)std::stoul(token); - else - medoids.push_back((uint32_t)stoul(token)); - cnt++; - } - _filter_to_medoid_ids[label].swap(medoids); - } - } - catch (std::system_error &e) - { - throw FileException(labels_to_medoids, e, __FUNCSIG__, __FILE__, __LINE__); - } + load_label_medoid_map(labels_to_medoids, medoid_stream); } std::string univ_label_file = std ::string(_disk_index_file) + "_universal_label.txt"; @@ -944,37 +940,87 @@ int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, cons std::ifstream dummy_map_stream(dummy_map_file); assert(dummy_map_stream.is_open()); #endif - std::string line, token; - - while (std::getline(dummy_map_stream, line)) - { - std::istringstream iss(line); - uint32_t cnt = 0; - uint32_t dummy_id; - uint32_t real_id; - while (std::getline(iss, token, ',')) - { - if (cnt == 0) - dummy_id = (uint32_t)stoul(token); - else - real_id = (uint32_t)stoul(token); - cnt++; - } - _dummy_pts.insert(dummy_id); - _has_dummy_pts.insert(real_id); - _dummy_to_real_map[dummy_id] = real_id; - - if (_real_to_dummy_map.find(real_id) == _real_to_dummy_map.end()) - _real_to_dummy_map[real_id] = std::vector(); - - _real_to_dummy_map[real_id].emplace_back(dummy_id); - } + load_dummy_map(dummy_map_file, dummy_map_stream); #ifndef EXEC_ENV_OLS dummy_map_stream.close(); #endif diskann::cout << "Loaded dummy map" << std::endl; } } + else + { + diskann::cout << "Index built without filter support." << std::endl; + } +} + +#ifdef EXEC_ENV_OLS +template +int PQFlashIndex::load(MemoryMappedFiles &files, uint32_t num_threads, const char *index_prefix) +{ +#else +template int PQFlashIndex::load(uint32_t num_threads, const char *index_prefix) +{ +#endif + std::string pq_table_bin = std::string(index_prefix) + "_pq_pivots.bin"; + std::string pq_compressed_vectors = std::string(index_prefix) + "_pq_compressed.bin"; + std::string _disk_index_file = std::string(index_prefix) + "_disk.index"; +#ifdef EXEC_ENV_OLS + return load_from_separate_paths(files, num_threads, _disk_index_file.c_str(), pq_table_bin.c_str(), + pq_compressed_vectors.c_str()); +#else + return load_from_separate_paths(num_threads, _disk_index_file.c_str(), pq_table_bin.c_str(), + pq_compressed_vectors.c_str()); +#endif +} + +#ifdef EXEC_ENV_OLS +template +int PQFlashIndex::load_from_separate_paths(diskann::MemoryMappedFiles &files, uint32_t num_threads, + const char *index_filepath, const char *pivots_filepath, + const char *compressed_filepath) +{ +#else +template +int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, const char *index_filepath, + const char *pivots_filepath, const char *compressed_filepath) +{ +#endif + std::string pq_table_bin = pivots_filepath; + std::string pq_compressed_vectors = compressed_filepath; + std::string _disk_index_file = index_filepath; + std::string medoids_file = std::string(_disk_index_file) + "_medoids.bin"; + std::string centroids_file = std::string(_disk_index_file) + "_centroids.bin"; + + size_t pq_file_dim, pq_file_num_centroids; +#ifdef EXEC_ENV_OLS + get_bin_metadata(files, pq_table_bin, pq_file_num_centroids, pq_file_dim, METADATA_SIZE); +#else + get_bin_metadata(pq_table_bin, pq_file_num_centroids, pq_file_dim, METADATA_SIZE); +#endif + + this->_disk_index_file = _disk_index_file; + + if (pq_file_num_centroids != 256) + { + diskann::cout << "Error. Number of PQ centroids is not 256. Exiting." << std::endl; + return -1; + } + + this->_data_dim = pq_file_dim; + // will change later if we use PQ on disk or if we are using + // inner product without PQ + this->_disk_bytes_per_point = this->_data_dim * sizeof(T); + this->_aligned_dim = ROUND_UP(pq_file_dim, 8); + + size_t npts_u64, nchunks_u64; +#ifdef EXEC_ENV_OLS + diskann::load_bin(files, pq_compressed_vectors, this->data, npts_u64, nchunks_u64); +#else + diskann::load_bin(pq_compressed_vectors, this->data, npts_u64, nchunks_u64); +#endif + + this->_num_points = npts_u64; + this->_n_chunks = nchunks_u64; #ifdef EXEC_ENV_OLS _pq_table.load_pq_centroid_bin(files, pq_table_bin.c_str(), nchunks_u64); @@ -1095,6 +1141,8 @@ int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, cons READ_U64(index_metadata, this->_nvecs_per_sector); } + load_labels(_disk_index_file); + diskann::cout << "Disk-Index File Meta-data: "; diskann::cout << "# nodes per sector: " << _nnodes_per_sector; diskann::cout << ", max node len (bytes): " << _max_node_len; @@ -1196,6 +1244,7 @@ int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, cons diskann::cout << "Setting re-scaling factor of base vectors to " << this->_max_base_norm << std::endl; delete[] norm_val; } + diskann::cout << "done.." << std::endl; return 0; } @@ -1250,7 +1299,9 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t const bool use_filter, const LabelT &filter_label, const bool use_reorder_data, QueryStats *stats) { - cached_beam_search(query1, k_search, l_search, indices, distances, beam_width, use_filter, filter_label, + std::vector filters(1); + filters.push_back(filter_label); + cached_beam_search(query1, k_search, l_search, indices, distances, beam_width, use_filter, filters, std::numeric_limits::max(), use_reorder_data, stats); } @@ -1260,15 +1311,15 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t const uint32_t io_limit, const bool use_reorder_data, QueryStats *stats) { - LabelT dummy_filter = 0; - cached_beam_search(query1, k_search, l_search, indices, distances, beam_width, false, dummy_filter, io_limit, + std::vector dummy_filters(0); + cached_beam_search(query1, k_search, l_search, indices, distances, beam_width, false, dummy_filters, io_limit, use_reorder_data, stats); } template void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t k_search, const uint64_t l_search, uint64_t *indices, float *distances, const uint64_t beam_width, - const bool use_filter, const LabelT &filter_label, + const bool use_filters, const std::vector &filter_labels, const uint32_t io_limit, const bool use_reorder_data, QueryStats *stats) { @@ -1294,6 +1345,8 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t float *query_float = pq_query_scratch->aligned_query_float; float *query_rotated = pq_query_scratch->rotated_query; + uint32_t filter_label_count = (uint32_t)filter_labels.size(); + // normalization step. for cosine, we simply normalize the query // for mips, we normalize the first d-1 dims, and add a 0 for last dim, since an extra coordinate was used to // convert MIPS to L2 search @@ -1355,12 +1408,22 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t tsl::robin_set &visited = query_scratch->visited; NeighborPriorityQueue &retset = query_scratch->retset; - retset.reserve(l_search); std::vector &full_retset = query_scratch->full_retset; + tsl::robin_set full_retset_ids; + if (use_filters) { + uint64_t size_to_reserve = std::max(l_search, (std::min((uint64_t)filter_label_count, this->_max_degree) + 1)); + retset.reserve(size_to_reserve); + full_retset.reserve(4096); + full_retset_ids.reserve(4096); + } else { + retset.reserve(l_search + 1); + } + uint32_t best_medoid = 0; + uint32_t cur_list_size = 0; float best_dist = (std::numeric_limits::max)(); - if (!use_filter) + if (!use_filters) { for (uint64_t cur_m = 0; cur_m < _num_medoids; cur_m++) { @@ -1372,35 +1435,36 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t best_dist = cur_expanded_dist; } } - } - else - { - if (_filter_to_medoid_ids.find(filter_label) != _filter_to_medoid_ids.end()) + compute_dists(&best_medoid, 1, dist_scratch); + retset.insert(Neighbor(best_medoid, dist_scratch[0])); + visited.insert(best_medoid); + cur_list_size = 1; + } else { + std::vector filter_specific_medoids; + filter_specific_medoids.reserve(filter_label_count); + location_t ctr = 0; + for (; ctr < filter_label_count && ctr < this->_max_degree; ctr++) { - const auto &medoid_ids = _filter_to_medoid_ids[filter_label]; - for (uint64_t cur_m = 0; cur_m < medoid_ids.size(); cur_m++) + if (filter_labels[ctr] != -1) { - // for filtered index, we dont store global centroid data as for unfiltered index, so we use PQ distance - // as approximation to decide closest medoid matching the query filter. - compute_dists(&medoid_ids[cur_m], 1, dist_scratch); - float cur_expanded_dist = dist_scratch[0]; - if (cur_expanded_dist < best_dist) + for (auto id : this->_filter_to_medoid_ids[filter_labels[ctr]]) { - best_medoid = medoid_ids[cur_m]; - best_dist = cur_expanded_dist; + filter_specific_medoids.push_back(id); } } } - else + compute_dists(filter_specific_medoids.data(), filter_specific_medoids.size(), dist_scratch); + for (ctr = 0; ctr < filter_specific_medoids.size(); ctr++) { - throw ANNException("Cannot find medoid for specified filter.", -1, __FUNCSIG__, __FILE__, __LINE__); + retset.insert(Neighbor(filter_specific_medoids[ctr], dist_scratch[ctr])); + //retset[ctr].id = filter_specific_medoids[ctr]; + //retset[ctr].distance = dist_scratch[ctr]; + //retset[ctr].expanded = false; + visited.insert(filter_specific_medoids[ctr]); } + cur_list_size = (uint32_t) filter_specific_medoids.size(); } - compute_dists(&best_medoid, 1, dist_scratch); - retset.insert(Neighbor(best_medoid, dist_scratch[0])); - visited.insert(best_medoid); - uint32_t cmps = 0; uint32_t hops = 0; uint32_t num_ios = 0; @@ -1415,7 +1479,13 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t std::vector>> cached_nhoods; cached_nhoods.reserve(2 * beam_width); - while (retset.has_unexpanded_node() && num_ios < io_limit) + //if we are doing multi-filter search we don't want to restrict the number of IOs + //at present. Must revisit this decision later. + uint32_t max_ios_for_query = use_filters || (io_limit == 0) ? std::numeric_limits::max() : io_limit; + const std::vector& label_ids = filter_labels; //avoid renaming. + std::vector lbl_vec; + + while (retset.has_unexpanded_node() && num_ios < max_ios_for_query) { // clear iteration state frontier.clear(); @@ -1425,6 +1495,45 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t sector_scratch_idx = 0; // find new beam uint32_t num_seen = 0; + + + for (const auto &lbl : label_ids) + { // assuming that number of OR labels is + // less than max frontier size allowed + uint32_t lbl_marker = 0; + while (lbl_marker < cur_list_size) + { + lbl_vec.clear(); + lbl_vec.emplace_back(lbl); + + if (!retset[lbl_marker].expanded && point_has_any_label(retset[lbl_marker].id, lbl_vec)) + { + num_seen++; + auto iter = _nhood_cache.find(retset[lbl_marker].id); + if (iter != _nhood_cache.end()) + { + cached_nhoods.push_back(std::make_pair(retset[lbl_marker].id, iter->second)); + if (stats != nullptr) + { + stats->n_cache_hits++; + } + } + else + { + frontier.push_back(retset[lbl_marker].id); + } + retset[lbl_marker].expanded = true; + if (this->_count_visited_nodes) + { + reinterpret_cast &>(this->_node_visit_counter[retset[lbl_marker].id].second) + .fetch_add(1); + } + break; + } + lbl_marker++; + } + } + while (retset.has_unexpanded_node() && frontier.size() < beam_width && num_seen < beam_width) { auto nbr = retset.closest_unexpanded(); @@ -1501,7 +1610,24 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t cur_expanded_dist = _disk_pq_table.l2_distance( // disk_pq does not support OPQ yet query_float, (uint8_t *)node_fp_coords_copy); } - full_retset.push_back(Neighbor((uint32_t)cached_nhood.first, cur_expanded_dist)); + if (use_filters) + { + location_t real_id = cached_nhood.first; + if (_dummy_pts.find(real_id) != _dummy_pts.end()) + { + real_id = _dummy_to_real_map[real_id]; + } + if (full_retset_ids.find(real_id) == full_retset_ids.end()) + { + full_retset.push_back(Neighbor((unsigned)real_id, cur_expanded_dist)); + full_retset_ids.insert(real_id); + } + } + else + { + full_retset.push_back(Neighbor((unsigned)cached_nhood.first, cur_expanded_dist)); + } + uint64_t nnbrs = cached_nhood.second.first; uint32_t *node_nbrs = cached_nhood.second.second; @@ -1521,10 +1647,10 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t uint32_t id = node_nbrs[m]; if (visited.insert(id).second) { - if (!use_filter && _dummy_pts.find(id) != _dummy_pts.end()) + if (!use_filters && _dummy_pts.find(id) != _dummy_pts.end()) continue; - if (use_filter && !(point_has_label(id, filter_label)) && + if (use_filters && !(point_has_any_label(id, label_ids)) && (!_use_universal_label || !point_has_label(id, _universal_filter_label))) continue; cmps++; @@ -1566,7 +1692,25 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t else cur_expanded_dist = _disk_pq_table.l2_distance(query_float, (uint8_t *)data_buf); } - full_retset.push_back(Neighbor(frontier_nhood.first, cur_expanded_dist)); + if (use_filters) + { + location_t real_id = frontier_nhood.first; + if (_dummy_pts.find(real_id) != _dummy_pts.end()) + { + real_id = _dummy_to_real_map[real_id]; + } + + if (full_retset_ids.find(real_id) == full_retset_ids.end()) + { + full_retset.push_back(Neighbor(real_id, cur_expanded_dist)); + full_retset_ids.insert(real_id); + } + } + else + { + full_retset.push_back(Neighbor(frontier_nhood.first, cur_expanded_dist)); + } + uint32_t *node_nbrs = (node_buf + 1); // compute node_nbrs <-> query dist in PQ space cpu_timer.reset(); @@ -1584,10 +1728,10 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t uint32_t id = node_nbrs[m]; if (visited.insert(id).second) { - if (!use_filter && _dummy_pts.find(id) != _dummy_pts.end()) + if (!use_filters && _dummy_pts.find(id) != _dummy_pts.end()) continue; - if (use_filter && !(point_has_label(id, filter_label)) && + if (use_filters && !(point_has_any_label(id, label_ids)) && (!_use_universal_label || !point_has_label(id, _universal_filter_label))) continue; cmps++; @@ -1607,10 +1751,8 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t stats->cpu_us += (float)cpu_timer.elapsed(); } } - hops++; } - // re-sort by distance std::sort(full_retset.begin(), full_retset.end()); diff --git a/src/utils.cpp b/src/utils.cpp index 3773cda22..481a63f64 100644 --- a/src/utils.cpp +++ b/src/utils.cpp @@ -252,6 +252,21 @@ double calculate_range_search_recall(uint32_t num_queries, std::vector &pieces) +{ + size_t start = 0; + size_t end; + while ((end = string_to_split.find(delimiter, start)) != std::string::npos) + { + pieces.push_back(string_to_split.substr(start, end - start)); + start = end + delimiter.length(); + } + if (start != string_to_split.length()) + { + pieces.push_back(string_to_split.substr(start, string_to_split.length() - start)); + } +} + #ifdef EXEC_ENV_OLS void get_bin_metadata(AlignedFileReader &reader, size_t &npts, size_t &ndim, size_t offset) { From 538ce9ee4308c6bf7d257ceaaa689ce7587e78fd Mon Sep 17 00:00:00 2001 From: Gopal Srinivasa Date: Mon, 18 Mar 2024 20:40:38 +0530 Subject: [PATCH 2/5] Multi-filter search working for one filter --- apps/search_disk_index.cpp | 27 ++++++++++++++++++++++++++- include/neighbor.h | 5 +++++ src/pq_flash_index.cpp | 17 +++++++---------- 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/apps/search_disk_index.cpp b/apps/search_disk_index.cpp index 9b1a5fa2c..fdc0e79cd 100644 --- a/apps/search_disk_index.cpp +++ b/apps/search_disk_index.cpp @@ -308,6 +308,9 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre auto mean_cpuus = diskann::get_mean_stats(stats, query_num, [](const diskann::QueryStats &stats) { return stats.cpu_us; }); + auto mean_hops = diskann::get_mean_stats( + stats, query_num, [](const diskann::QueryStats &stats) { return stats.n_hops; }); + double recall = 0; if (calc_recall_flag) { @@ -321,10 +324,32 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre << std::setw(16) << mean_cpuus; if (calc_recall_flag) { - diskann::cout << std::setw(16) << recall << std::endl; + diskann::cout << std::setw(16) << recall << std::endl ; } else diskann::cout << std::endl; + + //std::stringstream rslts_string; + //for (auto x = 0; x < query_num; x++) + //{ + // rslts_string << "-----------------------------------------" << std::endl; + // rslts_string << "Query: " << x << std::endl; + // rslts_string << "GT: {"; + // for (auto rx = 0; rx < recall_at; rx++) + // { + // rslts_string << "(" << gt_ids[x* gt_dim + rx] << "," << gt_dists[x * gt_dim + rx] << "), "; + // } + // rslts_string << "}" << std::endl; + // rslts_string << "Results: {"; + // for (auto rx = 0; rx < recall_at; rx++) + // { + // rslts_string << "(" << query_result_ids[test_id][x * recall_at + rx] << "," + // << query_result_dists[test_id][x * recall_at + rx] << "), "; + // } + // rslts_string << "}" << std::endl; + // rslts_string << "-----------------------------------------" << std::endl; + //} + //diskann::cout << rslts_string.str() << std::endl; delete[] stats; } diff --git a/include/neighbor.h b/include/neighbor.h index d7c0c25ed..7e6b58a65 100644 --- a/include/neighbor.h +++ b/include/neighbor.h @@ -109,6 +109,11 @@ class NeighborPriorityQueue return _cur < _size; } + void sort() + { + std::sort(_data.begin(), _data.begin() + _size); + } + size_t size() const { return _size; diff --git a/src/pq_flash_index.cpp b/src/pq_flash_index.cpp index 4a8a0d4c7..a86609f1d 100644 --- a/src/pq_flash_index.cpp +++ b/src/pq_flash_index.cpp @@ -675,20 +675,15 @@ template bool PQFlashIndex::point_has_any_label(uint32_t point_id, const std::vector &label_ids) { uint32_t start_vec = _pts_to_label_offsets[point_id]; - uint32_t num_lbls = _pts_to_labels[start_vec]; + uint32_t num_lbls = _pts_to_label_counts[start_vec]; bool ret_val = false; for (auto &cur_lbl : label_ids) { - for (uint32_t i = 0; i < num_lbls; i++) + if (point_has_label(point_id, cur_lbl)) { - if (_pts_to_labels[start_vec + 1 + i] == cur_lbl) - { - ret_val = true; - break; - } - } - if (ret_val == true) + ret_val = true; break; + } } return ret_val; } @@ -1485,6 +1480,8 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t const std::vector& label_ids = filter_labels; //avoid renaming. std::vector lbl_vec; + retset.sort(); + while (retset.has_unexpanded_node() && num_ios < max_ios_for_query) { // clear iteration state @@ -1619,7 +1616,7 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t } if (full_retset_ids.find(real_id) == full_retset_ids.end()) { - full_retset.push_back(Neighbor((unsigned)real_id, cur_expanded_dist)); + full_retset.push_back(Neighbor((uint32_t)real_id, cur_expanded_dist)); full_retset_ids.insert(real_id); } } From c511a546bb15d076c561a6ec2db68deba4629c81 Mon Sep 17 00:00:00 2001 From: gopalrs <33950290+gopalrs@users.noreply.github.com> Date: Thu, 2 May 2024 22:31:37 +0530 Subject: [PATCH 3/5] Update filtered_ssd_index.md --- workflows/filtered_ssd_index.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/workflows/filtered_ssd_index.md b/workflows/filtered_ssd_index.md index 7457d8c9b..764c5db8a 100644 --- a/workflows/filtered_ssd_index.md +++ b/workflows/filtered_ssd_index.md @@ -54,7 +54,7 @@ Searching a filtered index uses the `apps/search_disk_index.cpp`: 9. **-K**: search for *K* neighbors and measure *K*-recall@*K*, meaning the intersection between the retrieved top-*K* nearest neighbors and ground truth *K* nearest neighbors. 10. **--result_path**: Search results will be stored in files with specified prefix, in bin format. 11. **-L (--search_list)**: A list of search_list sizes to perform search with. Larger parameters will result in slower latencies, but higher accuracies. Must be atleast the value of *K* in arg (9). -12. **--filter_label**: The filter to be used when searching an index with filters. For each query, a search is performed with this filter. +12. **--filter_label**: Filters to be used when searching an index with filters. Multiple filters can be specified, separated by '|' and are treated as OR predicates. For each query, a search is performed with these filters. Example with SIFT10K: @@ -100,4 +100,4 @@ Filtered Disk Index 40 4 2707.77 2817.21 4889.00 51.46 267.03 22.00 50 4 2191.56 3509.43 5943.00 60.80 349.10 23.50 100 4 1257.92 6113.45 7321.00 109.08 609.42 23.90 -``` \ No newline at end of file +``` From d473ebdfcbb84bbcc518b06b8889dec28a1297e8 Mon Sep 17 00:00:00 2001 From: Gopal Srinivasa Date: Wed, 10 Jul 2024 00:06:40 +0530 Subject: [PATCH 4/5] Version almost on par with DLVS multi-filter --- apps/search_disk_index.cpp | 127 ++++++++++++++++++++++++++++++------- include/neighbor.h | 5 -- include/pq_flash_index.h | 32 ++++++---- include/scratch.h | 4 +- include/utils.h | 2 +- src/disk_utils.cpp | 4 +- src/pq_flash_index.cpp | 71 +++++++-------------- src/scratch.cpp | 9 ++- 8 files changed, 158 insertions(+), 96 deletions(-) diff --git a/apps/search_disk_index.cpp b/apps/search_disk_index.cpp index fdc0e79cd..b3312de58 100644 --- a/apps/search_disk_index.cpp +++ b/apps/search_disk_index.cpp @@ -29,9 +29,101 @@ #endif #define WARMUP false +#define DISKANN_DEBUG_INDIVIDUAL_RESULTS namespace po = boost::program_options; +#ifdef DISKANN_DEBUG_INDIVIDUAL_RESULTS +void dump_individual_results(uint64_t test_id, uint64_t query_num, uint32_t *gt_ids, float *gt_dists, uint64_t gt_dim, + const std::vector &query_result_ids, + const std::vector &query_result_dists, uint64_t recall_at, + const std::string &result_output_prefix) +{ + uint32_t cumulative_dist_matches = 0; + uint32_t cumulative_id_matches = 0; + std::stringstream results_stream; + std::stringstream per_query_stats_stream; + + per_query_stats_stream << "query_id\tid_matches\tdist_matches\ttotal_matches\trecall" << std::endl; + for (int qid = 0; qid < query_num; qid++) + { + results_stream << qid << "\t"; + uint32_t per_query_dist_matches = 0; + uint32_t per_query_id_matches = 0; + + for (uint64_t i = 0; i < recall_at; i++) + { + auto rindex = qid * recall_at + i; + results_stream << "(" << query_result_ids[rindex] << "," << query_result_dists[rindex] << ","; + + bool id_match = false; + bool dist_match = false; + for (uint64_t j = 0; j < recall_at; j++) + { + auto gindex = qid * gt_dim + j; + if (query_result_ids[rindex] == gt_ids[gindex]) + { + per_query_id_matches++; + id_match = true; + break; + } + else if (query_result_dists[rindex] / gt_dists[gindex] <= 1.0f) + { + per_query_dist_matches++; + dist_match = true; + break; + } + } + std::string code = "X"; + if (id_match) + { + code = "I"; + } + else if (dist_match) + { + code = "D"; + } + results_stream << code << "),"; + } + + results_stream << std::endl; + + cumulative_id_matches += per_query_id_matches; + cumulative_dist_matches += per_query_dist_matches; + per_query_stats_stream << qid << "\t" << per_query_id_matches << "\t" << per_query_dist_matches << "\t" + << per_query_id_matches + per_query_dist_matches << "\t" + << (per_query_id_matches + per_query_dist_matches) * 1.0f / recall_at << std::endl; + } + { + + std::string results_file = result_output_prefix + "_L" + std::to_string(test_id) + "_results.tsv"; + std::ofstream out(results_file); + out << results_stream.str() << std::endl; + } + { + std::string per_query_stats_file = result_output_prefix + "_L" + std::to_string(test_id) + "_query_stats.tsv"; + std::ofstream out(per_query_stats_file); + out << per_query_stats_stream.str() << std::endl; + } +} + +void write_gt_to_tsv(const std::string &cur_result_path, uint64_t query_num, uint32_t *gt_ids, float *gt_dists, + uint64_t gt_dim) +{ + std::ofstream gt_out(cur_result_path + "_gt.tsv"); + for (int i = 0; i < query_num; i++) + { + gt_out << i << "\t"; + for (int j = 0; j < gt_dim; j++) + { + gt_out << "(" << gt_ids[i * gt_dim + j] << "," << gt_dists[i * gt_dim + j] << "),"; + } + gt_out << std::endl; + } +} +#endif + + void print_stats(std::string category, std::vector percentiles, std::vector results) { diskann::cout << std::setw(20) << category << ": " << std::flush; @@ -54,7 +146,7 @@ void parse_labels_of_query(const std::string &filters_for_query, std::vector &label_ids_for_query) { std::vector label_strs_for_query; - diskann::split_string(filters_for_query, MULTIPLE_LABEL_SEPARATOR, label_strs_for_query); + diskann::split_string(filters_for_query, FILTER_OR_SEPARATOR, label_strs_for_query); for (auto &label_str_for_query : label_strs_for_query) { label_ids_for_query.push_back(pFlashIndex->get_converted_label(label_str_for_query)); @@ -70,7 +162,7 @@ void populate_label_ids(const std::vector &filters_of_queries, { std::vector label_ids_of_query; parse_labels_of_query(filters_of_queries[0], pFlashIndex, label_ids_of_query); - for (auto i = 0; i < query_count; i++) + for (uint32_t i = 0; i < query_count; i++) { label_ids_of_queries.push_back(label_ids_of_query); } @@ -318,6 +410,10 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre query_result_ids[test_id].data(), recall_at, recall_at); best_recall = std::max(recall, best_recall); } +#ifdef DISKANN_DEBUG_INDIVIDUAL_RESULTS + dump_individual_results(test_id, query_num, gt_ids, gt_dists, gt_dim, query_result_ids[test_id], + query_result_dists[test_id], recall_at, result_output_prefix); +#endif diskann::cout << std::setw(6) << L << std::setw(12) << optimized_beamwidth << std::setw(16) << qps << std::setw(16) << mean_latency << std::setw(16) << latency_999 << std::setw(16) << mean_ios @@ -327,31 +423,14 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre diskann::cout << std::setw(16) << recall << std::endl ; } else + { diskann::cout << std::endl; - - //std::stringstream rslts_string; - //for (auto x = 0; x < query_num; x++) - //{ - // rslts_string << "-----------------------------------------" << std::endl; - // rslts_string << "Query: " << x << std::endl; - // rslts_string << "GT: {"; - // for (auto rx = 0; rx < recall_at; rx++) - // { - // rslts_string << "(" << gt_ids[x* gt_dim + rx] << "," << gt_dists[x * gt_dim + rx] << "), "; - // } - // rslts_string << "}" << std::endl; - // rslts_string << "Results: {"; - // for (auto rx = 0; rx < recall_at; rx++) - // { - // rslts_string << "(" << query_result_ids[test_id][x * recall_at + rx] << "," - // << query_result_dists[test_id][x * recall_at + rx] << "), "; - // } - // rslts_string << "}" << std::endl; - // rslts_string << "-----------------------------------------" << std::endl; - //} - //diskann::cout << rslts_string.str() << std::endl; + } delete[] stats; } +#ifdef DISKANN_DEBUG_INDIVIDUAL_RESULTS + write_gt_to_tsv(result_output_prefix, query_num, gt_ids, gt_dists, gt_dim); +#endif diskann::cout << "Done searching. Now saving results " << std::endl; uint64_t test_id = 0; diff --git a/include/neighbor.h b/include/neighbor.h index 7e6b58a65..d7c0c25ed 100644 --- a/include/neighbor.h +++ b/include/neighbor.h @@ -109,11 +109,6 @@ class NeighborPriorityQueue return _cur < _size; } - void sort() - { - std::sort(_data.begin(), _data.begin() + _size); - } - size_t size() const { return _size; diff --git a/include/pq_flash_index.h b/include/pq_flash_index.h index d37988f78..5eaf85a06 100644 --- a/include/pq_flash_index.h +++ b/include/pq_flash_index.h @@ -18,6 +18,11 @@ #include "tsl/robin_set.h" #define FULL_PRECISION_REORDER_MULTIPLIER 3 +#define DEFAULT_VISITED_RESERVE_SIZE 4096 +//default max filters per query is set to the same +//as what we expect Bing to provide. If this is overkill, +//it can be set by clients in the load() function +#define DEFAULT_MAX_FILTERS_PER_QUERY 4096 namespace diskann { @@ -30,24 +35,28 @@ template class PQFlashIndex DISKANN_DLLEXPORT ~PQFlashIndex(); #ifdef EXEC_ENV_OLS - DISKANN_DLLEXPORT int load(diskann::MemoryMappedFiles &files, uint32_t num_threads, const char *index_prefix); + DISKANN_DLLEXPORT int load(diskann::MemoryMappedFiles &files, uint32_t num_threads, const char *index_prefix, + uint32_t max_filters_per_query = DEFAULT_MAX_FILTERS_PER_QUERY); #else // load compressed data, and obtains the handle to the disk-resident index - DISKANN_DLLEXPORT int load(uint32_t num_threads, const char *index_prefix); + DISKANN_DLLEXPORT int load(uint32_t num_threads, const char *index_prefix, + uint32_t max_filters_per_query = DEFAULT_MAX_FILTERS_PER_QUERY); #endif - DISKANN_DLLEXPORT void load_labels(const std::string& disk_index_filepath); - DISKANN_DLLEXPORT void load_label_medoid_map( - const std::string &labels_to_medoids_filepath, std::istream &medoid_stream); - DISKANN_DLLEXPORT void load_dummy_map(const std::string& dummy_map_filepath, std::istream &dummy_map_stream); + DISKANN_DLLEXPORT void load_labels(const std::string &disk_index_filepath); + DISKANN_DLLEXPORT void load_label_medoid_map(const std::string &labels_to_medoids_filepath, + std::istream &medoid_stream); + DISKANN_DLLEXPORT void load_dummy_map(const std::string &dummy_map_filepath, std::istream &dummy_map_stream); #ifdef EXEC_ENV_OLS DISKANN_DLLEXPORT int load_from_separate_paths(diskann::MemoryMappedFiles &files, uint32_t num_threads, const char *index_filepath, const char *pivots_filepath, - const char *compressed_filepath); + const char *compressed_filepath, + uint32_t max_filters_per_query); #else DISKANN_DLLEXPORT int load_from_separate_paths(uint32_t num_threads, const char *index_filepath, - const char *pivots_filepath, const char *compressed_filepath); + const char *pivots_filepath, const char *compressed_filepath, + uint32_t max_filters_per_query); #endif DISKANN_DLLEXPORT void load_cache_list(std::vector &node_list); @@ -116,7 +125,8 @@ template class PQFlashIndex protected: DISKANN_DLLEXPORT void use_medoids_data_as_centroids(); - DISKANN_DLLEXPORT void setup_thread_data(uint64_t nthreads, uint64_t visited_reserve = 4096); + DISKANN_DLLEXPORT void setup_thread_data(uint64_t nthreads, uint64_t visited_reserve = DEFAULT_VISITED_RESERVE_SIZE, + uint64_t max_filters_per_query = DEFAULT_MAX_FILTERS_PER_QUERY); DISKANN_DLLEXPORT void set_universal_label(const LabelT &label); @@ -189,7 +199,7 @@ template class PQFlashIndex // chunk_size = chunk size of each dimension chunk // pq_tables = float* [[2^8 * [chunk_size]] * _n_chunks] uint8_t *data = nullptr; - uint64_t _n_chunks; + uint64_t _n_chunks = 0; FixedChunkPQTable _pq_table; // distance comparator @@ -207,7 +217,7 @@ template class PQFlashIndex // we can optionally have multiple starting points uint32_t *_medoids = nullptr; // defaults to 1 - size_t _num_medoids; + size_t _num_medoids = 1; // by default, it is empty. If there are multiple // centroids, we pick the medoid corresponding to the // closest centroid as the starting point of search diff --git a/include/scratch.h b/include/scratch.h index 2f43e3365..d7a3758aa 100644 --- a/include/scratch.h +++ b/include/scratch.h @@ -150,7 +150,7 @@ template class SSDQueryScratch : public AbstractScratch NeighborPriorityQueue retset; std::vector full_retset; - SSDQueryScratch(size_t aligned_dim, size_t visited_reserve); + SSDQueryScratch(size_t aligned_dim, size_t visited_reserve, size_t max_filters_per_query); ~SSDQueryScratch(); void reset(); @@ -162,7 +162,7 @@ template class SSDThreadData SSDQueryScratch scratch; IOContext ctx; - SSDThreadData(size_t aligned_dim, size_t visited_reserve); + SSDThreadData(size_t aligned_dim, size_t visited_reserve, size_t max_filters_per_query); void clear(); }; diff --git a/include/utils.h b/include/utils.h index 463331435..52137c8b0 100644 --- a/include/utils.h +++ b/include/utils.h @@ -57,7 +57,7 @@ typedef int FileHandle; #define PBSTR "||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||" #define PBWIDTH 60 -#define MULTIPLE_LABEL_SEPARATOR "|" +#define FILTER_OR_SEPARATOR "|" inline bool file_exists_impl(const std::string &name, bool dirCheck = false) { diff --git a/src/disk_utils.cpp b/src/disk_utils.cpp index 016560217..59fc95b06 100644 --- a/src/disk_utils.cpp +++ b/src/disk_utils.cpp @@ -1274,7 +1274,9 @@ int build_disk_index(const char *dataFilePath, const char *indexFilePath, const augmented_labels_file = index_prefix_path + "_augmented_labels.txt"; if (filter_threshold != 0) { - dummy_remap_file = index_prefix_path + "_dummy_remap.txt"; + //changing the dummy map file from _dummy_map.txt to _disk.index_dummy_map.txt to keep with the + //convention that the index files all have the _disk.index prefix. + dummy_remap_file = index_prefix_path + "_disk.index_dummy_map.txt"; breakup_dense_points(data_file_to_use, labels_file_to_use, filter_threshold, augmented_data_file, augmented_labels_file, dummy_remap_file); // RKNOTE: This has large memory footprint, diff --git a/src/pq_flash_index.cpp b/src/pq_flash_index.cpp index a86609f1d..6700666b5 100644 --- a/src/pq_flash_index.cpp +++ b/src/pq_flash_index.cpp @@ -117,7 +117,7 @@ template inline T *PQFlashIndex::offset } template -void PQFlashIndex::setup_thread_data(uint64_t nthreads, uint64_t visited_reserve) +void PQFlashIndex::setup_thread_data(uint64_t nthreads, uint64_t visited_reserve, uint64_t max_filters_per_query) { diskann::cout << "Setting up thread-specific contexts for nthreads: " << nthreads << std::endl; // omp parallel for to generate unique thread IDs @@ -126,7 +126,7 @@ void PQFlashIndex::setup_thread_data(uint64_t nthreads, uint64_t visi { #pragma omp critical { - SSDThreadData *data = new SSDThreadData(this->_aligned_dim, visited_reserve); + SSDThreadData *data = new SSDThreadData(this->_aligned_dim, visited_reserve, max_filters_per_query); this->reader->register_thread(); data->ctx = this->reader->get_ctx(); this->_thread_data.push(data); @@ -598,7 +598,8 @@ LabelT PQFlashIndex::get_converted_label(const std::string &filter_la return _universal_filter_label; } std::stringstream stream; - stream << "Unable to find label in the Label Map"; + stream << "Unable to find label " << filter_label + << " in the Label Map "; diskann::cerr << stream.str() << std::endl; throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__, __LINE__); } @@ -674,8 +675,6 @@ inline bool PQFlashIndex::point_has_label(uint32_t point_id, LabelT l template bool PQFlashIndex::point_has_any_label(uint32_t point_id, const std::vector &label_ids) { - uint32_t start_vec = _pts_to_label_offsets[point_id]; - uint32_t num_lbls = _pts_to_label_counts[start_vec]; bool ret_val = false; for (auto &cur_lbl : label_ids) { @@ -950,10 +949,10 @@ template void PQFlashIndex::load_labels #ifdef EXEC_ENV_OLS template -int PQFlashIndex::load(MemoryMappedFiles &files, uint32_t num_threads, const char *index_prefix) +int PQFlashIndex::load(MemoryMappedFiles &files, uint32_t num_threads, const char *index_prefix, uint32_t max_filters_per_query) { #else -template int PQFlashIndex::load(uint32_t num_threads, const char *index_prefix) +template int PQFlashIndex::load(uint32_t num_threads, const char *index_prefix, uint32_t max_filters_per_query) { #endif std::string pq_table_bin = std::string(index_prefix) + "_pq_pivots.bin"; @@ -961,10 +960,10 @@ template int PQFlashIndex::load(uint32_ std::string _disk_index_file = std::string(index_prefix) + "_disk.index"; #ifdef EXEC_ENV_OLS return load_from_separate_paths(files, num_threads, _disk_index_file.c_str(), pq_table_bin.c_str(), - pq_compressed_vectors.c_str()); + pq_compressed_vectors.c_str(), max_filters_per_query); #else return load_from_separate_paths(num_threads, _disk_index_file.c_str(), pq_table_bin.c_str(), - pq_compressed_vectors.c_str()); + pq_compressed_vectors.c_str(), max_filters_per_query); #endif } @@ -972,12 +971,13 @@ template int PQFlashIndex::load(uint32_ template int PQFlashIndex::load_from_separate_paths(diskann::MemoryMappedFiles &files, uint32_t num_threads, const char *index_filepath, const char *pivots_filepath, - const char *compressed_filepath) + const char *compressed_filepath, uint32_t max_filters_per_query) { #else template int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, const char *index_filepath, - const char *pivots_filepath, const char *compressed_filepath) + const char *pivots_filepath, const char *compressed_filepath, + uint32_t max_filters_per_query) { #endif std::string pq_table_bin = pivots_filepath; @@ -1067,7 +1067,7 @@ int PQFlashIndex::load_from_separate_paths(uint32_t num_threads, cons // bytes are needed to store the header and read in that many using our // 'standard' aligned file reader approach. reader->open(_disk_index_file); - this->setup_thread_data(num_threads); + this->setup_thread_data(num_threads, DEFAULT_VISITED_RESERVE_SIZE, max_filters_per_query); this->_max_nthreads = num_threads; char *bytes = getHeaderBytes(); @@ -1294,7 +1294,7 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t const bool use_filter, const LabelT &filter_label, const bool use_reorder_data, QueryStats *stats) { - std::vector filters(1); + std::vector filters; filters.push_back(filter_label); cached_beam_search(query1, k_search, l_search, indices, distances, beam_width, use_filter, filters, std::numeric_limits::max(), use_reorder_data, stats); @@ -1480,8 +1480,6 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t const std::vector& label_ids = filter_labels; //avoid renaming. std::vector lbl_vec; - retset.sort(); - while (retset.has_unexpanded_node() && num_ios < max_ios_for_query) { // clear iteration state @@ -1495,8 +1493,7 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t for (const auto &lbl : label_ids) - { // assuming that number of OR labels is - // less than max frontier size allowed + { uint32_t lbl_marker = 0; while (lbl_marker < cur_list_size) { @@ -1607,25 +1604,13 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t cur_expanded_dist = _disk_pq_table.l2_distance( // disk_pq does not support OPQ yet query_float, (uint8_t *)node_fp_coords_copy); } - if (use_filters) - { - location_t real_id = cached_nhood.first; - if (_dummy_pts.find(real_id) != _dummy_pts.end()) - { - real_id = _dummy_to_real_map[real_id]; - } - if (full_retset_ids.find(real_id) == full_retset_ids.end()) - { - full_retset.push_back(Neighbor((uint32_t)real_id, cur_expanded_dist)); - full_retset_ids.insert(real_id); - } - } - else + location_t real_id = cached_nhood.first; + if (full_retset_ids.find(real_id) == full_retset_ids.end()) { - full_retset.push_back(Neighbor((unsigned)cached_nhood.first, cur_expanded_dist)); + full_retset.push_back(Neighbor((uint32_t)real_id, cur_expanded_dist)); + full_retset_ids.insert(real_id); } - uint64_t nnbrs = cached_nhood.second.first; uint32_t *node_nbrs = cached_nhood.second.second; @@ -1689,23 +1674,11 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t else cur_expanded_dist = _disk_pq_table.l2_distance(query_float, (uint8_t *)data_buf); } - if (use_filters) - { - location_t real_id = frontier_nhood.first; - if (_dummy_pts.find(real_id) != _dummy_pts.end()) - { - real_id = _dummy_to_real_map[real_id]; - } - - if (full_retset_ids.find(real_id) == full_retset_ids.end()) - { - full_retset.push_back(Neighbor(real_id, cur_expanded_dist)); - full_retset_ids.insert(real_id); - } - } - else + location_t real_id = frontier_nhood.first; + if (full_retset_ids.find(real_id) == full_retset_ids.end()) { - full_retset.push_back(Neighbor(frontier_nhood.first, cur_expanded_dist)); + full_retset.push_back(Neighbor(real_id, cur_expanded_dist)); + full_retset_ids.insert(real_id); } uint32_t *node_nbrs = (node_buf + 1); diff --git a/src/scratch.cpp b/src/scratch.cpp index c3836ccf1..287d41db8 100644 --- a/src/scratch.cpp +++ b/src/scratch.cpp @@ -93,13 +93,16 @@ template void SSDQueryScratch::reset() full_retset.clear(); } -template SSDQueryScratch::SSDQueryScratch(size_t aligned_dim, size_t visited_reserve) +template SSDQueryScratch::SSDQueryScratch(size_t aligned_dim, size_t visited_reserve, size_t max_filters_per_query) { size_t coord_alloc_size = ROUND_UP(sizeof(T) * aligned_dim, 256); diskann::alloc_aligned((void **)&coord_scratch, coord_alloc_size, 256); - diskann::alloc_aligned((void **)§or_scratch, defaults::MAX_N_SECTOR_READS * defaults::SECTOR_LEN, + + size_t max_sectors_in_scratch = (std::max)(defaults::MAX_N_SECTOR_READS, max_filters_per_query); + diskann::alloc_aligned((void **)§or_scratch, max_sectors_in_scratch * defaults::SECTOR_LEN, defaults::SECTOR_LEN); + diskann::alloc_aligned((void **)&this->_aligned_query_T, aligned_dim * sizeof(T), 8 * sizeof(T)); this->_pq_scratch = new PQScratch(defaults::MAX_GRAPH_DEGREE, aligned_dim); @@ -121,7 +124,7 @@ template SSDQueryScratch::~SSDQueryScratch() } template -SSDThreadData::SSDThreadData(size_t aligned_dim, size_t visited_reserve) : scratch(aligned_dim, visited_reserve) +SSDThreadData::SSDThreadData(size_t aligned_dim, size_t visited_reserve, size_t max_filters_per_query) : scratch(aligned_dim, visited_reserve, max_filters_per_query) { } From 7544f8d91ef9ab986ab1e77b55e4574c4a258f98 Mon Sep 17 00:00:00 2001 From: Gopal Srinivasa Date: Thu, 11 Jul 2024 13:12:23 +0530 Subject: [PATCH 5/5] Fixed issue with code hanging when num_threads == 1 --- apps/search_disk_index.cpp | 28 ++++++++++++++++++++++++++++ include/percentile_stats.h | 4 ++++ src/pq_flash_index.cpp | 28 ++++++++++++++++++++++++---- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/apps/search_disk_index.cpp b/apps/search_disk_index.cpp index b3312de58..9987c02ff 100644 --- a/apps/search_disk_index.cpp +++ b/apps/search_disk_index.cpp @@ -33,6 +33,31 @@ namespace po = boost::program_options; +#ifdef DISKANN_DEBUG_PRINT_RETSET +void dump_retset(uint64_t test_id, uint64_t query_num, diskann::QueryStats *stats, const std::string &result_output_prefix) +{ + std::stringstream ss; + if (stats != nullptr) + { + for (int i = 0; i < query_num; i++) + { + ss << i << "\t"; + for (int j = 0; j < (stats + i)->query_retset.size(); j++) + { + ss << "(" << (stats + i)->query_retset[j].id << ", " << (stats + i)->query_retset[j].distance + << "), "; + } + ss << std::endl; + } + + } + std::string results_file = result_output_prefix + "_L" + std::to_string(test_id) + "_retset.tsv"; + std::ofstream writer(results_file); + writer << ss.str() << std::endl; + writer.close(); +} +#endif + #ifdef DISKANN_DEBUG_INDIVIDUAL_RESULTS void dump_individual_results(uint64_t test_id, uint64_t query_num, uint32_t *gt_ids, float *gt_dists, uint64_t gt_dim, const std::vector &query_result_ids, @@ -414,6 +439,9 @@ int search_disk_index(diskann::Metric &metric, const std::string &index_path_pre dump_individual_results(test_id, query_num, gt_ids, gt_dists, gt_dim, query_result_ids[test_id], query_result_dists[test_id], recall_at, result_output_prefix); #endif +#ifdef DISKANN_DEBUG_PRINT_RETSET + dump_retset(test_id, query_num, stats, result_output_prefix); +#endif diskann::cout << std::setw(6) << L << std::setw(12) << optimized_beamwidth << std::setw(16) << qps << std::setw(16) << mean_latency << std::setw(16) << latency_999 << std::setw(16) << mean_ios diff --git a/include/percentile_stats.h b/include/percentile_stats.h index 793257577..361b0109e 100644 --- a/include/percentile_stats.h +++ b/include/percentile_stats.h @@ -33,6 +33,10 @@ struct QueryStats unsigned n_cmps = 0; // # cmps unsigned n_cache_hits = 0; // # cache_hits unsigned n_hops = 0; // # search hops + +#ifdef DISKANN_DEBUG_PRINT_RETSET + std::vector query_retset; //copy of the retset to debug PQ distances. +#endif }; template diff --git a/src/pq_flash_index.cpp b/src/pq_flash_index.cpp index 6700666b5..486888a7a 100644 --- a/src/pq_flash_index.cpp +++ b/src/pq_flash_index.cpp @@ -132,6 +132,7 @@ void PQFlashIndex::setup_thread_data(uint64_t nthreads, uint64_t visi this->_thread_data.push(data); } } + this->_thread_data.push_notify_all(); _load_flag = true; } @@ -501,10 +502,6 @@ template void PQFlashIndex::use_medoids alloc_aligned(((void **)&_centroid_data), _num_medoids * _aligned_dim * sizeof(float), 32); std::memset(_centroid_data, 0, _num_medoids * _aligned_dim * sizeof(float)); - // borrow ctx - ScratchStoreManager> manager(this->_thread_data); - auto data = manager.scratch_space(); - IOContext &ctx = data->ctx; diskann::cout << "Loading centroid data from medoids vector data of " << _num_medoids << " medoid(s)" << std::endl; std::vector nodes_to_read; @@ -1432,6 +1429,9 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t } compute_dists(&best_medoid, 1, dist_scratch); retset.insert(Neighbor(best_medoid, dist_scratch[0])); +#ifdef DISKANN_DEBUG_PRINT_RETSET + stats->query_retset.push_back(Neighbor(best_medoid, dist_scratch[0])); +#endif visited.insert(best_medoid); cur_list_size = 1; } else { @@ -1639,6 +1639,10 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t float dist = dist_scratch[m]; Neighbor nn(id, dist); retset.insert(nn); +#ifdef DISKANN_DEBUG_PRINT_RETSET + stats->query_retset.push_back(nn); +#endif + } } } @@ -1713,6 +1717,10 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t Neighbor nn(id, dist); retset.insert(nn); +#ifdef DISKANN_DEBUG_PRINT_RETSET + stats->query_retset.push_back(nn); +#endif + } } @@ -1726,6 +1734,18 @@ void PQFlashIndex::cached_beam_search(const T *query1, const uint64_t // re-sort by distance std::sort(full_retset.begin(), full_retset.end()); +#ifdef DISKANN_DEBUG_PRINT_RETSET + { + for (int i = 0; i < retset.size(); i++) + { + if (stats != nullptr) + { + stats->query_retset.push_back(retset[i]); + } + } + } +#endif + if (use_reorder_data) { if (!(this->_reorder_data_exists))