Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Robin Hood Hashing 源码分析 | SF-Zhou's Blog #11065

Open
guevara opened this issue Apr 8, 2024 · 0 comments
Open

Robin Hood Hashing 源码分析 | SF-Zhou's Blog #11065

guevara opened this issue Apr 8, 2024 · 0 comments

Comments

@guevara
Copy link
Owner

guevara commented Apr 8, 2024

Robin Hood Hashing 源码分析 | SF-Zhou's Blog



https://ift.tt/xqPtNnW



SF-Zhou


从 C++11 开始,STL 会提供哈希表 std::unordered_map 的实现,用起来确实很方便,不过性能上就差强人意了。robin_hood::unordered_map 作为 std::unordered_map 的替代品,提供了与标准库中一致的接口,同时带来 2 到 3 倍的性能提升,着实让人心动。笔者年前尝试使用该哈希表,但由于其内部的 Bug 导致低概率的抛出异常,不得已又退回使用标准库。今年 3 月底的时候其作者修复了该 Bug,笔者也第一时间测试使用,并上线到现网环境,截止目前无任何故障。安全起见,笔者分析了该哈希表的具体实现,分析的代码版本为 3.11.1,目前也没有发现潜在的安全隐患。依笔者之见,Robin Hood 高性能的秘诀是开放寻址、平坦化和限制冲突。

1. 开放寻址

目前主流的 STL 实现均使用闭式寻址(Closed Addressing),当发生冲突时,需要使用额外的数据结构处理冲突。例如 GCC 中使用的是链表,查询时会先对 key 进行哈希确定桶的位置,再比对桶对应的链表中的元素。闭式寻址的优势是删除简单,相同负载系数下对比开放寻址性能更好。但冲突剧烈时,查询的复杂度也会从 退化到 ,此时也依赖 Rehash 减少冲突。

Bucket Collision Chain
0
1
2 ②②②
3
4
5
6
7 ⑦⑦

而 Robin Hood 中使用的是开放寻址(Open Addressing),发生冲突时会尝试找下一个空桶的位置,每个桶至多存放一个元素,这也就限制了其负载系数至多为 1。其优势是有更好的缓存局部性,负载系数低时性能优异,劣势是删除时复杂度更高,负载系数高时冲突剧烈。单纯使用开放寻址无法应对复杂的现实 需求,为了提高性能还需要额外的优化策略。

Bucket Open Addressing
0
1
2
3
4
5
6
7

2. 平坦化

平坦化(Flatten)是指将哈希表中的元素直接存储在哈希桶数组中。非平坦化的实现会在哈希桶数组中存放元素的指针,查询时先读桶中的数据,再访问对应的元素,会产生一次间接寻址。平坦化则可以减少一次寻址操作,确定桶的位置后就可以直接访问元素。其优势自然是获得更好的性能和缓存局部性,劣势是需要使用更多的内存空间,以 80% 的负载系数为例,Rehash 后 60% 的内存空间存放的是空桶。另外平坦化要求键值对支持移动构造和移动复制,Robin Hood 对符合该条件并且键值对总大小小于 6 个 size_t 的会启用平坦化的实现:

template <typename Key, typename T, typename Hash = hash<Key>,
          typename KeyEqual = std::equal_to<Key>, size_t MaxLoadFactor100 = 80>
using unordered_map = detail::Table<
    sizeof(robin_hood::pair<Key, T>) <= sizeof(size_t) * 6 &&
        std::is_nothrow_move_constructible<robin_hood::pair<Key, T>>::value &&
        std::is_nothrow_move_assignable<robin_hood::pair<Key, T>>::value,
    MaxLoadFactor100, Key, T, Hash, KeyEqual>;

对于不符合条件的键值对,Robin Hood 中也提供了非平坦化实现。简单压测可以发现,相同的键值对类型平坦化相较于非平坦化可以提升一倍多的性能。

3. 限制冲突

Robin Hood 中使用了 uint8_t 类型的 Info 字段记录 key 的目标桶与实际存放桶之间的距离,使用该字段实现:

  1. 检查桶是否为空桶;
  2. 限制目标桶与实际桶之间的距离小于 256,使查询的复杂度收敛;
  3. 保证桶中实际存放的键值对的顺序始终与键值对目标桶的顺序一致。
Bucket Info=0 Info=1 Info=2 Info=3 Info=4
0
1
2
3
4
5
6
7
8 (Buffer)
9 (Buffer)

如上表所示,使用 Info=0 表示空桶,非空桶时 Info 记录键值对与哈希目标桶的距离,当超过限制时执行扩容。插入时根据距离判断键值对的目标桶位置并以此排序执行插入,删除时也根据该距离判断是否需要将键值对前移。申请哈希数组时至多会额外申请 0xFF 个空间存储尾端冲突的键值对。

执行插入的代码如下:

template <typename... Args>
std::pair<iterator, bool> emplace(Args&&... args) {
  ROBIN_HOOD_TRACE(this)

Node n{*this, std::forward<Args>(args)...};

auto idxAndState = insertKeyPrepareEmptySpot(getFirstConst(n));
switch (idxAndState.second) {
case InsertionState::key_found:
n.destroy(*this);
break;

<span>case</span> InsertionState<span>::</span>new_node<span>:</span>
  <span>::</span><span>new</span> <span>(</span><span><span>static_cast</span><span><span>&lt;</span><span>void</span><span>*</span><span>&gt;</span></span></span><span>(</span><span>&amp;</span>mKeyVals<span>[</span>idxAndState<span>.</span>first<span>]</span><span>)</span><span>)</span>
    <span>Node</span><span>(</span><span>*</span><span>this</span><span>,</span> std<span>::</span><span>move</span><span>(</span>n<span>)</span><span>)</span><span>;</span>
  <span>break</span><span>;</span>

<span>case</span> InsertionState<span>::</span>overwrite_node<span>:</span>
  mKeyVals<span>[</span>idxAndState<span>.</span>first<span>]</span> <span>=</span> std<span>::</span><span>move</span><span>(</span>n<span>)</span><span>;</span>
  <span>break</span><span>;</span>

<span>case</span> InsertionState<span>::</span>overflow_error<span>:</span>
  n<span>.</span><span>destroy</span><span>(</span><span>*</span><span>this</span><span>)</span><span>;</span>
  <span>throwOverflowError</span><span>(</span><span>)</span><span>;</span>
  <span>break</span><span>;</span>

}

return std::make_pair(
iterator(mKeyVals + idxAndState.first, mInfo + idxAndState.first),
InsertionState::key_found != idxAndState.second);
}

template <typename OtherKey>
std::pair<size_t, InsertionState> insertKeyPrepareEmptySpot(OtherKey&& key) {
for (int i = 0; i < 256; ++i) {
size_t idx{};
InfoType info{};

<span>keyToIdx</span><span>(</span>key<span>,</span> <span>&amp;</span>idx<span>,</span> <span>&amp;</span>info<span>)</span><span>;</span>

<span>nextWhileLess</span><span>(</span><span>&amp;</span>info<span>,</span> <span>&amp;</span>idx<span>)</span><span>;</span>


<span>while</span> <span>(</span>info <span>==</span> mInfo<span>[</span>idx<span>]</span><span>)</span> <span>{</span>
  
  <span>if</span> <span>(</span><span>WKeyEqual</span><span>::</span><span>operator</span><span>(</span><span>)</span><span>(</span>key<span>,</span> mKeyVals<span>[</span>idx<span>]</span><span>.</span><span>getFirst</span><span>(</span><span>)</span><span>)</span><span>)</span> <span>{</span>
    
    
    <span>return</span> std<span>::</span><span>make_pair</span><span>(</span>idx<span>,</span> InsertionState<span>::</span>key_found<span>)</span><span>;</span>
  <span>}</span>
  
  <span>next</span><span>(</span><span>&amp;</span>info<span>,</span> <span>&amp;</span>idx<span>)</span><span>;</span>
<span>}</span>


<span>if</span> <span>(</span><span>ROBIN_HOOD_UNLIKELY</span><span>(</span>mNumElements <span>&gt;=</span> mMaxNumElementsAllowed<span>)</span><span>)</span> <span>{</span>
  
  <span>if</span> <span>(</span><span>!</span><span>increase_size</span><span>(</span><span>)</span><span>)</span> <span>{</span>
    <span>return</span> std<span>::</span><span>make_pair</span><span>(</span><span>size_t</span><span>(</span><span>0</span><span>)</span><span>,</span> InsertionState<span>::</span>overflow_error<span>)</span><span>;</span>
  <span>}</span>
  <span>continue</span><span>;</span>
<span>}</span>



<span>auto</span> <span>const</span> insertion_idx <span>=</span> idx<span>;</span>
<span>auto</span> <span>const</span> insertion_info <span>=</span> info<span>;</span>
<span>if</span> <span>(</span><span>ROBIN_HOOD_UNLIKELY</span><span>(</span>insertion_info <span>+</span> mInfoInc <span>&gt;</span> <span>0xFF</span><span>)</span><span>)</span> <span>{</span>
  
  mMaxNumElementsAllowed <span>=</span> <span>0</span><span>;</span>
<span>}</span>



<span>while</span> <span>(</span><span>0</span> <span>!=</span> mInfo<span>[</span>idx<span>]</span><span>)</span> <span>{</span>
  <span>next</span><span>(</span><span>&amp;</span>info<span>,</span> <span>&amp;</span>idx<span>)</span><span>;</span>
<span>}</span>


<span>if</span> <span>(</span>idx <span>!=</span> insertion_idx<span>)</span> <span>{</span>
  
  <span>shiftUp</span><span>(</span>idx<span>,</span> insertion_idx<span>)</span><span>;</span>
<span>}</span>


mInfo<span>[</span>insertion_idx<span>]</span> <span>=</span> <span><span>static_cast</span><span><span>&lt;</span><span>uint8_t</span><span>&gt;</span></span></span><span>(</span>insertion_info<span>)</span><span>;</span>
<span>++</span>mNumElements<span>;</span>

<span>return</span> std<span>::</span><span>make_pair</span><span>(</span>
  insertion_idx<span>,</span> idx <span>==</span> insertion_idx <span>?</span> InsertionState<span>::</span>new_node
  <span>:</span> InsertionState<span>::</span>overwrite_node<span>)</span><span>;</span>

}

return std::make_pair(size_t(0), InsertionState::overflow_error);
}

template <typename HashKey>
void keyToIdx(HashKey&& key, size_t idx, InfoType info) const {

auto h = static_cast<uint64_t>(WHash::operator()(key));

h *= mHashMultiplier;
h ^= h >> 33U;

info = mInfoInc + static_cast<InfoType>((h & InfoMask) >> mInfoHashShift);
idx = (static_cast<size_t>(h) >> InitialInfoNumBits) & mMask;
}

void next(InfoType info, size_t idx) const noexcept {

idx = idx + 1;

*info += mInfoInc;
}

void nextWhileLess(InfoType info, size_t idx) const noexcept {

while (info < mInfo[idx]) {
next(info, idx);
}
}

执行删除的代码:


iterator erase(iterator pos) {
  ROBIN_HOOD_TRACE(this)

auto const idx = static_cast<size_t>(pos.mKeyVals - mKeyVals);

shiftDown(idx);
--mNumElements;

if (*pos.mInfo) {

<span>return</span> pos<span>;</span>

}

return ++pos;
}

void shiftDown(size_t idx) noexcept(
std::is_nothrow_move_assignable<Node>::value) {

mKeyVals[idx].destroy(*this);

while (mInfo[idx + 1] >= 2 * mInfoInc) {
ROBIN_HOOD_COUNT(shiftDown)

mInfo<span>[</span>idx<span>]</span> <span>=</span> <span><span>static_cast</span><span><span>&lt;</span><span>uint8_t</span><span>&gt;</span></span></span><span>(</span>mInfo<span>[</span>idx <span>+</span> <span>1</span><span>]</span> <span>-</span> mInfoInc<span>)</span><span>;</span>
mKeyVals<span>[</span>idx<span>]</span> <span>=</span> std<span>::</span><span>move</span><span>(</span>mKeyVals<span>[</span>idx <span>+</span> <span>1</span><span>]</span><span>)</span><span>;</span>
<span>++</span>idx<span>;</span>

}

mInfo[idx] = 0;

mKeyVals[idx].~Node();
}

执行扩容的代码如下:


bool increase_size() {

if (0 == mMask) {
initData(InitialNumElements);
return true;
}

auto const maxNumElementsAllowed = calcMaxNumElementsAllowed(mMask + 1);
if (mNumElements < maxNumElementsAllowed && try_increase_info()) {
return true;
}

ROBIN_HOOD_LOG("mNumElements="
<< mNumElements << ", maxNumElementsAllowed="
<< maxNumElementsAllowed << ", load="
<< (static_cast<double>(mNumElements) * 100.0 /
(static_cast<double>(mMask) + 1)))

nextHashMultiplier();
if (mNumElements * 2 < calcMaxNumElementsAllowed(mMask + 1)) {

<span>rehashPowerOfTwo</span><span>(</span>mMask <span>+</span> <span>1</span><span>,</span> <span>true</span><span>)</span><span>;</span>

} else {

<span>rehashPowerOfTwo</span><span>(</span><span>(</span>mMask <span>+</span> <span>1</span><span>)</span> <span>*</span> <span>2</span><span>,</span> <span>false</span><span>)</span><span>;</span>

}
return true;
}

void nextHashMultiplier() {

mHashMultiplier += UINT64_C(0xc4ceb9fe1a85ec54);
}

void rehashPowerOfTwo(size_t numBuckets, bool forceFree) {
ROBIN_HOOD_TRACE(this)

Node const oldKeyVals = mKeyVals;
uint8_t const
const oldInfo = mInfo;

const size_t oldMaxElementsWithBuffer =
calcNumElementsWithBuffer(mMask + 1);

initData(numBuckets);
if (oldMaxElementsWithBuffer > 1) {
for (size_t i = 0; i < oldMaxElementsWithBuffer; ++i) {
if (oldInfo[i] != 0) {

    <span>insert_move</span><span>(</span>std<span>::</span><span>move</span><span>(</span>oldKeyVals<span>[</span>i<span>]</span><span>)</span><span>)</span><span>;</span>
    
    oldKeyVals<span>[</span>i<span>]</span><span>.</span><span>~</span><span>Node</span><span>(</span><span>)</span><span>;</span>
  <span>}</span>
<span>}</span>




<span>if</span> <span>(</span>oldKeyVals <span>!=</span> <span><span>reinterpret_cast_no_cast_align_warning</span><span><span>&lt;</span>Node<span>*</span><span>&gt;</span></span></span><span>(</span><span>&amp;</span>mMask<span>)</span><span>)</span> <span>{</span>
  
  <span>if</span> <span>(</span>forceFree<span>)</span> <span>{</span>
    std<span>::</span><span>free</span><span>(</span>oldKeyVals<span>)</span><span>;</span>
  <span>}</span> <span>else</span> <span>{</span>
    <span>DataPool</span><span>::</span><span>addOrFree</span><span>(</span>oldKeyVals<span>,</span>
                        <span>calcNumBytesTotal</span><span>(</span>oldMaxElementsWithBuffer<span>)</span><span>)</span><span>;</span>
  <span>}</span>
<span>}</span>

}
}

实际上 info 字段有限的位中还存储了部分哈希的信息用于加速找 key,随着扩容其位数会逐渐降低,这里就不再详述了。

4. 使用建议

并不存在某一种哈希表可以适用所有场景,不过大部分场景下 Robin Hood 的性能都不错,推荐尝试。笔者之前优化的项目中哈希表占用整个服务 8% 左右的 CPU,替换为 Robin Hood 后哈希表部分的 CPU 占用降低到 3%。

References

  1. "Hash table", Wikipedia
  2. "martinus/robin-hood-hashing", GitHub






via sf-zhou.github.io https://ift.tt/n8RZidg

April 8, 2024 at 04:25PM
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant