Skip to content

Commit

Permalink
util: Account for hash collisions in ObjectSharedPool (envoyproxy#25027)
Browse files Browse the repository at this point in the history
Fixes envoyproxy#24617

Commit Message: util: Account for hash collisions in ObjectSharedPool
Additional Description:
Risk Level: Low
Testing: new test added
Docs Changes: N/A
Release Notes: N/A
Platform Specific Features: N/A

Signed-off-by: pcrao <[email protected]>
  • Loading branch information
pradeepcrao authored Jan 24, 2023
1 parent 84810b1 commit 6aba5bf
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 58 deletions.
13 changes: 6 additions & 7 deletions source/common/config/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,12 @@ bool Metadata::metadataLabelMatch(const LabelSet& label_set,

ConstMetadataSharedPoolSharedPtr
Metadata::getConstMetadataSharedPool(Singleton::Manager& manager, Event::Dispatcher& dispatcher) {
return manager
.getTyped<SharedPool::ObjectSharedPool<const envoy::config::core::v3::Metadata, MessageUtil>>(
SINGLETON_MANAGER_REGISTERED_NAME(const_metadata_shared_pool), [&dispatcher] {
return std::make_shared<
SharedPool::ObjectSharedPool<const envoy::config::core::v3::Metadata, MessageUtil>>(
dispatcher);
});
return manager.getTyped<SharedPool::ObjectSharedPool<const envoy::config::core::v3::Metadata,
MessageUtil, MessageUtil>>(
SINGLETON_MANAGER_REGISTERED_NAME(const_metadata_shared_pool), [&dispatcher] {
return std::make_shared<SharedPool::ObjectSharedPool<
const envoy::config::core::v3::Metadata, MessageUtil, MessageUtil>>(dispatcher);
});
}

} // namespace Config
Expand Down
5 changes: 3 additions & 2 deletions source/common/config/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
namespace Envoy {
namespace Config {

using ConstMetadataSharedPoolSharedPtr = std::shared_ptr<
SharedPool::ObjectSharedPool<const envoy::config::core::v3::Metadata, MessageUtil>>;
using ConstMetadataSharedPoolSharedPtr =
std::shared_ptr<SharedPool::ObjectSharedPool<const envoy::config::core::v3::Metadata,
MessageUtil, MessageUtil>>;

/**
* MetadataKey presents the key name and path to retrieve value from metadata.
Expand Down
147 changes: 101 additions & 46 deletions source/common/shared_pool/shared_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "source/common/common/non_copyable.h"
#include "source/common/common/thread_synchronizer.h"

#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"

namespace Envoy {
namespace SharedPool {
Expand All @@ -30,60 +30,37 @@ namespace SharedPool {
* There is also a need to ensure that the thread where ObjectSharedPool's destructor is also in the
* main thread, or that ObjectSharedPool destruct before the program exit
*/
template <typename T, typename HashFunc = std::hash<T>,
template <typename T, typename HashFunc = std::hash<T>, typename EqualFunc = std::equal_to<T>,
class = typename std::enable_if<std::is_copy_constructible<T>::value>::type>
class ObjectSharedPool : public Singleton::Instance,
public std::enable_shared_from_this<ObjectSharedPool<T, HashFunc>>,
NonCopyable {
class ObjectSharedPool
: public Singleton::Instance,
public std::enable_shared_from_this<ObjectSharedPool<T, HashFunc, EqualFunc>>,
NonCopyable {
public:
ObjectSharedPool(Event::Dispatcher& dispatcher)
: thread_id_(std::this_thread::get_id()), dispatcher_(dispatcher) {}

void deleteObject(const size_t hash_key) {
if (std::this_thread::get_id() == thread_id_) {
// There may be new inserts with the same hash value before deleting the old element,
// so there is no need to delete it at this time.
if (object_pool_.find(hash_key) != object_pool_.end() &&
object_pool_[hash_key].use_count() == 0) {
object_pool_.erase(hash_key);
}
} else {
// Most of the time, the object's destructor occurs in the main thread, but with some
// exceptions, it is destructed in the worker thread. In order to keep the object_pool_ thread
// safe, the deleteObject needs to be delivered to the main thread.
auto this_shared_ptr = this->shared_from_this();
// Used for testing to simulate some race condition scenarios
sync_.syncPoint(DeleteObjectOnMainThread);
dispatcher_.post([hash_key, this_shared_ptr] { this_shared_ptr->deleteObject(hash_key); });
}
}

std::shared_ptr<T> getObject(const T& obj) {
ASSERT(std::this_thread::get_id() == thread_id_);
auto hashed_value = HashFunc{}(obj);
auto object_it = object_pool_.find(hashed_value);
if (object_it != object_pool_.end()) {
auto lock_object = object_it->second.lock();
if (lock_object) {

// Return from the object pool if we find the object there.
if (auto iter = object_pool_.find(&obj); iter != object_pool_.end()) {
if (auto lock_object = iter->lock(); static_cast<bool>(lock_object) == true) {
return lock_object;
} else {
// Remove the weak_ptr since all associated shared_ptrs have been
// destroyed.
object_pool_.erase(iter);
}
}

// Create a shared_ptr and add the object to the object_pool.
auto this_shared_ptr = this->shared_from_this();
std::shared_ptr<T> obj_shared(new T(obj), [hashed_value, this_shared_ptr](T* ptr) {
std::shared_ptr<T> obj_shared(new T(obj), [this_shared_ptr](T* ptr) {
this_shared_ptr->sync().syncPoint(ObjectSharedPool<T>::ObjectDeleterEntry);
// release ptr as early as possible to avoid exposure of ptr, resulting in undefined behavior.
delete ptr;
this_shared_ptr->deleteObject(hashed_value);
this_shared_ptr->deleteObject(ptr);
});

// When inserted, it is possible that the old elements still exist before they can be deleted,
// and the insertion will fail and therefore need to be overwritten.
auto ret = object_pool_.try_emplace(hashed_value, obj_shared);
if (!ret.second) {
ASSERT(ret.first->second.use_count() == 0);
ret.first->second = obj_shared;
}
object_pool_.emplace(obj_shared);
return obj_shared;
}

Expand All @@ -99,18 +76,96 @@ class ObjectSharedPool : public Singleton::Instance,
static const char DeleteObjectOnMainThread[];
static const char ObjectDeleterEntry[];

friend class SharedPoolTest;

private:
void deleteObject(T* ptr) {
if (std::this_thread::get_id() == thread_id_) {
deleteObjectOnMainThread(ptr);
} else {
// Most of the time, the object's destructor occurs in the main thread, but with some
// exceptions, it is destructed in the worker thread. In order to keep the object_pool_ thread
// safe, the deleteObject needs to be delivered to the main thread.
auto this_shared_ptr = this->shared_from_this();
// Used for testing to simulate some race condition scenarios
sync_.syncPoint(DeleteObjectOnMainThread);
dispatcher_.post([ptr, this_shared_ptr] { this_shared_ptr->deleteObjectOnMainThread(ptr); });
}
}

void deleteObjectOnMainThread(T* ptr) {
ASSERT(std::this_thread::get_id() == thread_id_);
if (auto iter = object_pool_.find(ptr); iter != object_pool_.end()) {
// It is possible that the entry in object_pool_ corresponds to a
// different weak_ptr, due to a race condition in a shared_ptr being
// destroyed on another thread, and getObject() being called on the main
// thread.
if (iter->use_count() == 0) {
object_pool_.erase(iter);
}
}
// Wait till here to delete the pointer because we don't want the OS to
// reallocate the memory location before this method completes to prevent
// "hash collisions".
delete ptr;
}

class Element {
public:
Element(const std::shared_ptr<T>& ptr) : ptr_{ptr.get()}, weak_ptr_{ptr} {}

Element() = delete;
Element(const Element&) = delete;

Element(Element&&) = default;

std::shared_ptr<T> lock() const { return weak_ptr_.lock(); }
long use_count() const { return weak_ptr_.use_count(); }

friend struct Hash;
friend struct Compare;

struct Hash {
using is_transparent = void; // NOLINT(readability-identifier-naming)
constexpr size_t operator()(const T* ptr) const { return HashFunc{}(*ptr); }
constexpr size_t operator()(const Element& element) const {
return HashFunc{}(*element.ptr_);
}
};
struct Compare {
using is_transparent = void; // NOLINT(readability-identifier-naming)
bool operator()(const Element& a, const Element& b) const {
ASSERT(a.ptr_ != nullptr && b.ptr_ != nullptr);
return a.ptr_ == b.ptr_ ||
(a.ptr_ != nullptr && b.ptr_ != nullptr && EqualFunc{}(*a.ptr_, *b.ptr_));
}
bool operator()(const Element& a, const T* ptr) const {
ASSERT(a.ptr_ != nullptr && ptr != nullptr);
return a.ptr_ == ptr || (a.ptr_ != nullptr && ptr != nullptr && EqualFunc{}(*a.ptr_, *ptr));
}
};

private:
const T* const ptr_ = nullptr; ///< This is only used to speed up
///< comparisons and should never be
///< made available outside this class.
std::weak_ptr<T> weak_ptr_;
};

const std::thread::id thread_id_;
absl::flat_hash_map<size_t, std::weak_ptr<T>> object_pool_;
absl::flat_hash_set<Element, typename Element::Hash, typename Element::Compare> object_pool_;
// Use a multimap to allow for multiple objects with the same hash key.
// std::unordered_multimap<size_t, std::weak_ptr<T>> object_pool_;
Event::Dispatcher& dispatcher_;
Thread::ThreadSynchronizer sync_;
};

template <typename T, typename HashFunc, class V>
const char ObjectSharedPool<T, HashFunc, V>::DeleteObjectOnMainThread[] = "delete-object-on-main";
template <typename T, typename HashFunc, typename EqualFunc, class V>
const char ObjectSharedPool<T, HashFunc, EqualFunc, V>::DeleteObjectOnMainThread[] =
"delete-object-on-main";

template <typename T, typename HashFunc, class V>
const char ObjectSharedPool<T, HashFunc, V>::ObjectDeleterEntry[] = "deleter-entry";
template <typename T, typename HashFunc, typename EqualFunc, class V>
const char ObjectSharedPool<T, HashFunc, EqualFunc, V>::ObjectDeleterEntry[] = "deleter-entry";

} // namespace SharedPool
} // namespace Envoy
54 changes: 51 additions & 3 deletions test/common/shared_pool/shared_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ class SharedPoolTest : public testing::Test {
});
}

template <typename T> void deleteObject(std::shared_ptr<ObjectSharedPool<T>> pool, T* ptr) {
pool->deleteObject(ptr);
}

~SharedPoolTest() override {
dispatcher_->exit();
dispatcher_thread_->join();
Expand Down Expand Up @@ -94,20 +98,22 @@ TEST_F(SharedPoolTest, ThreadSafeForDeleteObject) {
std::shared_ptr<ObjectSharedPool<int>> pool;
{
// same thread
int* an_int = new int(4);
createObjectSharedPool(pool);
dispatcher_->post([&pool, this]() {
pool->deleteObject(std::hash<int>{}(4));
dispatcher_->post([&pool, this, &an_int]() {
deleteObject(pool, an_int);
go_.Notify();
});
go_.WaitForNotification();
}

{
// different threads
int* an_int = new int(4);
createObjectSharedPool(pool);
Thread::ThreadFactory& thread_factory = Thread::threadFactoryForTest();
auto thread =
thread_factory.createThread([&pool]() { pool->deleteObject(std::hash<int>{}(4)); });
thread_factory.createThread([this, &pool, &an_int]() { deleteObject(pool, an_int); });
thread->join();
}
}
Expand Down Expand Up @@ -174,5 +180,47 @@ TEST_F(SharedPoolTest, RaceCondtionForGetObjectWithObjectDeleter) {
deferredDeleteSharedPoolOnMainThread(pool);
}

TEST_F(SharedPoolTest, HashCollision) {
Event::MockDispatcher dispatcher;
struct MyHash {
constexpr size_t operator()(int x) const { return x < 10 ? 0 : 1; }
};

auto pool = std::make_shared<ObjectSharedPool<int, MyHash>>(dispatcher);
{
// Verify that the hash function works as intended.
static_assert(MyHash{}(4) == 0);
static_assert(MyHash{}(3) == 0);
static_assert(MyHash{}(15) == 1);
static_assert(MyHash{}(12) == 1);

// Instantiate objects that hash to the same value.
auto o = pool->getObject(4);
auto o1 = pool->getObject(3);

// Verify that there are separate entries in the pool for objects with the
// same hash value.
EXPECT_EQ(2, pool->poolSize());

EXPECT_EQ(*o, 4);
EXPECT_EQ(*o1, 3);

auto o2 = pool->getObject(15);
auto o3 = pool->getObject(12);
auto o4 = pool->getObject(3);
auto o5 = pool->getObject(1);

EXPECT_EQ(o4.get(), o1.get());
EXPECT_EQ(*o2, 15);
EXPECT_EQ(*o3, 12);
EXPECT_EQ(*o4, 3);
EXPECT_EQ(*o5, 1);

EXPECT_EQ(5, pool->poolSize());
}

EXPECT_EQ(0, pool->poolSize());
}

} // namespace SharedPool
} // namespace Envoy

0 comments on commit 6aba5bf

Please sign in to comment.