Skip to content

Commit

Permalink
Fix the HashTable::storeRowPointer to accept int64 bucketoffset (face…
Browse files Browse the repository at this point in the history
…bookincubator#7308)

Summary:
The bucket offsets in a HashTable could cross INT32_MAX. This leads to incorrect address calculation in `HashTable::storeRowPoint` while storing a row pointer in the table.

We can reproduce this on a TPC-DS 1K dataset.
Incorrect results:
```
select count(ws_order_number) as "order count" from web_sales where ws_order_number in (select ws_order_number from web_sales);
 order count
-------------
   289805994
(1 row)
```

Correct results:
```
presto:tpcds_sf_1000> select count(ws_order_number) from web_sales_on where ws_order_number in (select ws_order_number from web_sales_on);
   _col0
-----------
 720000376
(1 row)
```

Resolves  prestodb/presto#20972

Pull Request resolved: facebookincubator#7308

Reviewed By: Yuhta

Differential Revision: D50988774

Pulled By: xiaoxmeng

fbshipit-source-id: 5233deec0dc803e80509cce7541053e501f8e859
  • Loading branch information
Karteekmurthys authored and facebook-github-bot committed Nov 3, 2023
1 parent cc52535 commit 7ea06dc
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 8 deletions.
12 changes: 6 additions & 6 deletions velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ void HashTable<ignoreNullKeys>::storeKeys(

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::storeRowPointer(
int32_t index,
uint64_t index,
uint64_t hash,
char* row) {
if (hashMode_ == HashMode::kArray) {
Expand All @@ -307,7 +307,7 @@ void HashTable<ignoreNullKeys>::storeRowPointer(
template <bool ignoreNullKeys>
char* HashTable<ignoreNullKeys>::insertEntry(
HashLookup& lookup,
int32_t index,
uint64_t index,
vector_size_t row) {
char* group = rows_->newRow();
lookup.hits[row] = group; // NOLINT
Expand Down Expand Up @@ -374,8 +374,8 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::fullProbe(
return RowContainer::normalizedKey(group) ==
lookup.normalizedKeys[row];
},
[&](int32_t index, int32_t row) {
return isJoin ? nullptr : insertEntry(lookup, row, index);
[&](int32_t row, uint64_t index) {
return isJoin ? nullptr : insertEntry(lookup, index, row);
},
numTombstones_,
!isJoin && extraCheck);
Expand All @@ -386,8 +386,8 @@ FOLLY_ALWAYS_INLINE void HashTable<ignoreNullKeys>::fullProbe(
*this,
0,
[&](char* group, int32_t row) { return compareKeys(group, lookup, row); },
[&](int32_t index, int32_t row) {
return isJoin ? nullptr : insertEntry(lookup, row, index);
[&](int32_t row, uint64_t index) {
return isJoin ? nullptr : insertEntry(lookup, index, row);
},
numTombstones_,
!isJoin && extraCheck);
Expand Down
16 changes: 14 additions & 2 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,18 @@ class HashTable : public BaseHashTable {
setHashMode(mode, numNew);
}

void testingStoreRowPointer(uint64_t index, uint64_t hash, char* row) {
storeRowPointer(index, hash, row);
}

auto testingLoadTags(uint64_t bucketOffset) {
return loadTags(bucketOffset);
}

auto testingGetRow(uint64_t bucketOffset, uint64_t slotIndex) {
return row(bucketOffset, slotIndex);
}

auto& testingOtherTables() const {
return otherTables_;
}
Expand Down Expand Up @@ -622,7 +634,7 @@ class HashTable : public BaseHashTable {
void rehash(bool initNormalizedKeys);
void storeKeys(HashLookup& lookup, vector_size_t row);

void storeRowPointer(int32_t index, uint64_t hash, char* row);
void storeRowPointer(uint64_t index, uint64_t hash, char* row);

// Allocates new tables for tags and payload pointers. The size must
// a power of 2.
Expand Down Expand Up @@ -711,7 +723,7 @@ class HashTable : public BaseHashTable {
bool initNormalizedKeys,
raw_vector<uint64_t>& hashes);

char* insertEntry(HashLookup& lookup, int32_t index, vector_size_t row);
char* insertEntry(HashLookup& lookup, uint64_t index, vector_size_t row);

bool compareKeys(const char* group, HashLookup& lookup, vector_size_t row);

Expand Down
25 changes: 25 additions & 0 deletions velox/exec/tests/HashTableTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,31 @@ TEST_P(HashTableTest, offsetOverflowLoadTags) {
}
}

TEST_P(HashTableTest, offsetOverflowStoreRowPointer) {
std::vector<std::unique_ptr<VectorHasher>> hashers;
hashers.push_back(std::make_unique<VectorHasher>(TINYINT(), 0));
auto table = HashTable<false>::createForJoin(
std::move(hashers),
{} /*dependentTypes=*/,
true,
false,
1'000,
pool_.get());
table->testingSetHashMode(
BaseHashTable::HashMode::kNormalizedKey, 700'000'000);
// Set greater than INT_MAX bucketOffset.
uint64_t bucketOffset = 2'848'622'336;
// Setting random value and hash value.
uint8_t value = 10;
uint64_t hash = 168'520'4148'842'825'593;
table->testingStoreRowPointer(bucketOffset, hash, (char*)&value);
auto expectedTag = BaseHashTable::hashTag(hash);
auto actualTagSimdBatch = table->testingLoadTags(bucketOffset);
ASSERT_EQ(expectedTag, actualTagSimdBatch.get(0));
auto slotIndex = bucketOffset & (sizeof(BaseHashTable::TagVector) - 1);
ASSERT_EQ(value, *(uint8_t*)table->testingGetRow(bucketOffset, slotIndex));
}

DEBUG_ONLY_TEST_P(HashTableTest, failureInCreateRowPartitions) {
// This tests an issue in parallelJoinBuild where an exception in
// createRowPartitions could lead to concurrency issues in async table
Expand Down

0 comments on commit 7ea06dc

Please sign in to comment.