Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mutex 2 bits #324

Open
wants to merge 5 commits into
base: feature_HNSW_tiered_index
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ jobs:
with:
mode: start
github-token: ${{ secrets.GH_PERSONAL_ACCESS_TOKEN }}
# Ubuntu 20.04 AMI
ec2-image-id: ami-0f4feb99425e13b50
# Ubuntu 22.04 AMI
ec2-image-id: ami-09b2a1e33ce552e68
ec2-instance-type: t3.xlarge
subnet-id: ${{ secrets.AWS_EC2_SUBNET_ID }}
security-group-id: ${{ secrets.AWS_EC2_SG_ID }}
Expand Down
45 changes: 24 additions & 21 deletions src/VecSim/algorithms/hnsw/hnsw.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class HNSWIndex : public VecSimIndexAbstract<DistType>,
mutable VisitedNodesHandlerPool visited_nodes_handler_pool;
mutable std::mutex entry_point_guard_;
mutable std::mutex index_data_guard_;
mutable vecsim_stl::vector<std::mutex> element_neighbors_locks_;
mutable vecsim_stl::vector<vecsim_stl::one_byte_mutex> element_neighbors_locks_;

#ifdef BUILD_TESTS
#include "VecSim/algorithms/hnsw/hnsw_base_tests_friends.h"
Expand Down Expand Up @@ -159,11 +159,10 @@ class HNSWIndex : public VecSimIndexAbstract<DistType>,
const std::pair<DistType, idType> &neighbor_data,
idType *new_node_neighbors_list,
idType *neighbor_neighbors_list,
std::unique_lock<std::mutex> &node_lock,
std::unique_lock<std::mutex> &neighbor_lock);
inline idType mutuallyConnectNewElement(idType new_node_id,
candidatesMaxHeap<DistType> &top_candidates,
size_t level);
std::unique_lock<vecsim_stl::one_byte_mutex> &node_lock,
std::unique_lock<vecsim_stl::one_byte_mutex> &neighbor_lock);
idType mutuallyConnectNewElement(idType new_node_id,
candidatesMaxHeap<DistType> &top_candidates, size_t level);
template <bool with_timeout>
void greedySearchLevel(const void *vector_data, size_t level, idType &curObj, DistType &curDist,
void *timeoutCtx = nullptr, VecSimQueryResult_Code *rc = nullptr) const;
Expand Down Expand Up @@ -521,7 +520,7 @@ DistType HNSWIndex<DataType, DistType>::processCandidate(
tag_t *elements_tags, vecsim_stl::abstract_priority_queue<DistType, Identifier> &top_candidates,
candidatesMaxHeap<DistType> &candidate_set, DistType lowerBound) const {

std::unique_lock<std::mutex> lock(element_neighbors_locks_[curNodeId]);
std::unique_lock<vecsim_stl::one_byte_mutex> lock(element_neighbors_locks_[curNodeId]);
idType *node_links = get_linklist_at_level(curNodeId, layer);
linkListSize links_num = getListCount(node_links);

Expand Down Expand Up @@ -573,7 +572,7 @@ void HNSWIndex<DataType, DistType>::processCandidate_RangeSearch(
tag_t *elements_tags, std::unique_ptr<vecsim_stl::abstract_results_container> &results,
candidatesMaxHeap<DistType> &candidate_set, DistType dyn_range, double radius) const {

std::unique_lock<std::mutex> lock(element_neighbors_locks_[curNodeId]);
std::unique_lock<vecsim_stl::one_byte_mutex> lock(element_neighbors_locks_[curNodeId]);
idType *node_links = get_linklist_at_level(curNodeId, layer);
linkListSize links_num = getListCount(node_links);

Expand Down Expand Up @@ -703,7 +702,8 @@ template <typename DataType, typename DistType>
void HNSWIndex<DataType, DistType>::revisitNeighborConnections(
size_t level, idType new_node_id, const std::pair<DistType, idType> &neighbor_data,
idType *new_node_neighbors_list, idType *neighbor_neighbors_list,
std::unique_lock<std::mutex> &node_lock, std::unique_lock<std::mutex> &neighbor_lock) {
std::unique_lock<vecsim_stl::one_byte_mutex> &node_lock,
std::unique_lock<vecsim_stl::one_byte_mutex> &neighbor_lock) {
// Note - expect that node_lock and neighbor_lock are locked at that point.

// Collect the existing neighbors and the new node as the neighbor's neighbors candidates.
Expand Down Expand Up @@ -760,9 +760,10 @@ void HNSWIndex<DataType, DistType>::revisitNeighborConnections(

std::sort(nodes_to_update.begin(), nodes_to_update.end());
size_t nodes_to_update_count = nodes_to_update.size();
std::unique_lock<std::mutex> locks[nodes_to_update_count];
std::unique_lock<vecsim_stl::one_byte_mutex> locks[nodes_to_update_count];
for (size_t i = 0; i < nodes_to_update_count; i++) {
locks[i] = std::unique_lock<std::mutex>(element_neighbors_locks_[nodes_to_update[i]]);
locks[i] = std::unique_lock<vecsim_stl::one_byte_mutex>(
element_neighbors_locks_[nodes_to_update[i]]);
}

auto *neighbour_incoming_edges = getIncomingEdgesPtr(selected_neighbor, level);
Expand Down Expand Up @@ -855,17 +856,19 @@ idType HNSWIndex<DataType, DistType>::mutuallyConnectNewElement(

for (auto &neighbor_data : selected_neighbors) {
idType selected_neighbor = neighbor_data.second; // neighbor's id
std::unique_lock<std::mutex> node_lock;
std::unique_lock<std::mutex> neighbor_lock;
std::unique_lock<vecsim_stl::one_byte_mutex> node_lock;
std::unique_lock<vecsim_stl::one_byte_mutex> neighbor_lock;
idType lower_id = (new_node_id < selected_neighbor) ? new_node_id : selected_neighbor;
if (lower_id == new_node_id) {
node_lock = std::unique_lock<std::mutex>(element_neighbors_locks_[new_node_id]);
neighbor_lock =
std::unique_lock<std::mutex>(element_neighbors_locks_[selected_neighbor]);
node_lock =
std::unique_lock<vecsim_stl::one_byte_mutex>(element_neighbors_locks_[new_node_id]);
neighbor_lock = std::unique_lock<vecsim_stl::one_byte_mutex>(
element_neighbors_locks_[selected_neighbor]);
} else {
neighbor_lock =
std::unique_lock<std::mutex>(element_neighbors_locks_[selected_neighbor]);
node_lock = std::unique_lock<std::mutex>(element_neighbors_locks_[new_node_id]);
neighbor_lock = std::unique_lock<vecsim_stl::one_byte_mutex>(
element_neighbors_locks_[selected_neighbor]);
node_lock =
std::unique_lock<vecsim_stl::one_byte_mutex>(element_neighbors_locks_[new_node_id]);
}

// get the updated count - this may change between iterations due to releasing the lock.
Expand Down Expand Up @@ -1124,7 +1127,7 @@ void HNSWIndex<DataType, DistType>::greedySearchLevel(const void *vector_data, s
return;
}
changed = false;
std::unique_lock<std::mutex> lock(element_neighbors_locks_[curObj]);
std::unique_lock<vecsim_stl::one_byte_mutex> lock(element_neighbors_locks_[curObj]);
idType *node_links = get_linklist(curObj, level);
linkListSize links_count = getListCount(node_links);

Expand All @@ -1150,7 +1153,7 @@ void HNSWIndex<DataType, DistType>::resizeIndexInternal(size_t new_max_elements)
element_levels_.shrink_to_fit();
resizeLabelLookup(new_max_elements);
visited_nodes_handler_pool.resize(new_max_elements);
vecsim_stl::vector<std::mutex>(new_max_elements, this->allocator)
vecsim_stl::vector<vecsim_stl::one_byte_mutex>(new_max_elements, this->allocator)
.swap(element_neighbors_locks_);
// Reallocate base layer
char *data_level0_memory_new = (char *)this->allocator->reallocate(
Expand Down
5 changes: 3 additions & 2 deletions src/VecSim/algorithms/hnsw/hnsw_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ size_t EstimateInitialSize(const HNSWParams *params) {
est += sizeof(size_t) * params->initialCapacity + sizeof(size_t); // element level
est += sizeof(size_t) * params->initialCapacity +
sizeof(size_t); // Labels lookup hash table buckets.
est += sizeof(std::mutex) * params->initialCapacity + sizeof(size_t); // lock per vector
est += sizeof(vecsim_stl::one_byte_mutex) * params->initialCapacity +
sizeof(size_t); // lock per vector
}

// Explicit allocation calls - always allocate a header.
Expand Down Expand Up @@ -116,7 +117,7 @@ size_t EstimateElementSize(const HNSWParams *params) {
// lookup hash map.
size_t size_meta_data =
sizeof(tag_t) + sizeof(size_t) + sizeof(size_t) + size_label_lookup_node;
size_t size_lock = sizeof(std::mutex);
size_t size_lock = sizeof(vecsim_stl::one_byte_mutex);

/* Disclaimer: we are neglecting two additional factors that consume memory:
* 1. The overall bucket size in labels_lookup hash table is usually higher than the number of
Expand Down
19 changes: 19 additions & 0 deletions src/VecSim/utils/vecsim_stl.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,23 @@ class unordered_set
alloc) {}
};

struct one_byte_mutex {
void lock() {
if (state.exchange(locked, std::memory_order_acquire) == unlocked)
return;
while (state.exchange(sleeper, std::memory_order_acquire) != unlocked)
state.wait(sleeper, std::memory_order_relaxed);
}
void unlock() {
if (state.exchange(unlocked, std::memory_order_release) == sleeper)
state.notify_one();
}

private:
std::atomic<uint8_t> state{unlocked};

static constexpr uint8_t unlocked = 0;
static constexpr uint8_t locked = 0b01;
static constexpr uint8_t sleeper = 0b10;
};
} // namespace vecsim_stl
2 changes: 1 addition & 1 deletion tests/unit/test_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ TYPED_TEST(IndexAllocatorTest, test_hnsw_reclaim_memory) {
// except for the bucket count of the labels_lookup hash table that is calculated separately.
size_t size_total_data_per_element = hnswIndex->size_data_per_element_;
expected_mem_delta += (sizeof(tag_t) + sizeof(void *) + sizeof(size_t) +
size_total_data_per_element + sizeof(std::mutex)) *
size_total_data_per_element + sizeof(vecsim_stl::one_byte_mutex)) *
block_size;
expected_mem_delta +=
(hnswIndex->label_lookup_.bucket_count() - prev_bucket_count) * sizeof(size_t);
Expand Down