diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index 2b5b2104ea8c..53ce251331f3 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -96,7 +96,7 @@ void TTable::AddTuple( ui64 * intColumns, char ** stringColumns, ui32 * strings keyIntVals.insert(keyIntVals.end(), TempTuple.begin() + NullsBitmapSize_, TempTuple.end()); if (IsAny_) { - if ( !AddKeysToHashTable(kh, keyIntVals.begin() + offset) ) { + if ( !AddKeysToHashTable(kh, keyIntVals.begin() + offset, iColumns) ) { keyIntVals.resize(offset); return; } @@ -221,9 +221,15 @@ inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1, for (ui32 i = 0; i < nStringColumns; i ++) { currSize1 = *(stringSizes1 + i); currSize2 = *(stringSizes2 + i); + if (currSize1 != currSize2) + return false; currOffset1 += currSize1 + sizeof(ui32); currOffset2 += currSize2 + sizeof(ui32); } + + if (0 != std::memcmp(vals1, vals2, currOffset1)) + return false; + for (ui32 i = 0; i < nIColumns; i ++) { currSize1 = *(stringSizes1 + nStringColumns + i ); @@ -245,6 +251,43 @@ inline bool CompareIColumns( const ui32* stringSizes1, const char * vals1, return true; } +inline bool CompareIColumns( const char * vals1, + const char * vals2, + NYql::NUdf::TUnboxedValue * iColumns, + TColTypeInterface * colInterfaces, + ui64 nStringColumns, ui64 nIColumns) { + ui32 currOffset1 = 0; + NYql::NUdf::TUnboxedValue val1; + TStringBuf str1; + + for (ui32 i = 0; i < nStringColumns; i ++) { + auto currSize1 = ReadUnaligned(vals1 + currOffset1); + auto currSize2 = ReadUnaligned(vals2 + currOffset1); + if (currSize1 != currSize2) + return false; + currOffset1 += currSize1 + sizeof(ui32); + } + + if (0 != std::memcmp(vals1, vals2, currOffset1)) + return false; + + for (ui32 i = 0; i < nIColumns; i ++) { + + auto currSize1 = ReadUnaligned(vals1 + currOffset1); + currOffset1 += sizeof(ui32); + str1 = TStringBuf(vals1 + currOffset1, currSize1); + val1 = (colInterfaces + i)->Packer->Unpack(str1, colInterfaces->HolderFactory); + auto &val2 = iColumns[i]; + if ( ! ((colInterfaces + i)->EquateI->Equals(val1,val2)) ) { + return false; + } + + currOffset1 += currSize1; + + } + return true; +} + // Resizes KeysHashTable to new slots, keeps old content. void ResizeHashTable(KeysHashTable &t, ui64 newSlots){ @@ -257,7 +300,7 @@ void ResizeHashTable(KeysHashTable &t, ui64 newSlots){ auto newIt = newTable.begin() + t.SlotSize * newSlotNum; while (*newIt != 0) { newIt += t.SlotSize; - if (newIt >= newTable.end()) { + if (newIt == newTable.end()) { newIt = newTable.begin(); } } @@ -294,13 +337,9 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef ui64 tuplesFound = 0; - std::vector> joinSlots, spillSlots, slotToIdx; - std::vector> stringsOffsets1, stringsOffsets2; + std::vector> joinSlots; ui64 reservedSize = 6 * (DefaultTupleBytes * DefaultTuplesNum) / sizeof(ui64); joinSlots.reserve( reservedSize ); - spillSlots.reserve( reservedSize ); - stringsOffsets1.reserve(JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 1); - stringsOffsets2.reserve(JoinTable2->NumberOfStringColumns + JoinTable2->NumberOfIColumns + 1); std::vector> joinResults; @@ -334,9 +373,10 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef std::swap(tuplesNum1, tuplesNum2); } - joinResults.reserve(3 * tuplesNum1 ); + if (tuplesNum2 == 0) + continue; - ui64 slotSize = headerSize2; + ui64 slotSize = headerSize2 + 1; ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - tuplesNum2 * headerSize2) ) / ( 2 * tuplesNum2 + 1) + 1; @@ -345,173 +385,134 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef } - ui64 nSlots = 3 * tuplesNum2 + 1; + ui64 nSlots = (3 * tuplesNum2 + 1) | 1; joinSlots.clear(); - spillSlots.clear(); - slotToIdx.clear(); joinSlots.resize(nSlots*slotSize, 0); - slotToIdx.resize(nSlots, 0); + + auto firstSlot = [begin = joinSlots.begin(), slotSize, nSlots](auto hash) { + ui64 slotNum = hash % nSlots; + return begin + slotNum * slotSize; + }; + + auto nextSlot = [begin = joinSlots.begin(), end = joinSlots.end(), slotSize](auto it) { + it += slotSize; + if (it == end) + it = begin; + return it; + }; ui32 tuple2Idx = 0; auto it2 = bucket2->KeyIntVals.begin(); - while (it2 != bucket2->KeyIntVals.end() ) { - - ui64 keysValSize; + for (ui64 keysValSize = headerSize2; it2 != bucket2->KeyIntVals.end(); it2 += keysValSize, ++tuple2Idx) { if ( table2HasKeyStringColumns || table2HasKeyIColumns) { keysValSize = headerSize2 + *(it2 + headerSize2 - 1) ; - } else { - keysValSize = headerSize2; } ui64 hash = *it2; ui64 * nullsPtr = it2+1; - if (!HasBitSet(nullsPtr, 1)) - { + if (HasBitSet(nullsPtr, 1)) + continue; - ui64 slotNum = hash % nSlots; - auto slotIt = joinSlots.begin() + slotNum * slotSize; + auto slotIt = firstSlot(hash); - while (*slotIt != 0) - { - slotIt += slotSize; - if (slotIt == joinSlots.end()) - slotIt = joinSlots.begin(); - } + for (; *slotIt != 0; slotIt = nextSlot(slotIt)) + { + } - if (keysValSize <= slotSize) - { - std::copy_n(it2, keysValSize, slotIt); - } - else - { - std::copy_n(it2, headerSize2, slotIt); - ui64 stringsPos = spillSlots.size(); - spillSlots.insert(spillSlots.end(), it2 + headerSize2, it2 + keysValSize); - *(slotIt + headerSize2) = stringsPos; - } - ui64 currSlotNum = (slotIt - joinSlots.begin()) / slotSize; - slotToIdx[currSlotNum] = tuple2Idx; + if (keysValSize <= slotSize - 1) + { + std::copy_n(it2, keysValSize, slotIt); } - it2 += keysValSize; - tuple2Idx ++; + else + { + std::copy_n(it2, headerSize2, slotIt); + + *(slotIt + headerSize2) = it2 + headerSize2 - bucket2->KeyIntVals.begin(); + } + slotIt[slotSize - 1] = tuple2Idx; } ui32 tuple1Idx = 0; auto it1 = bucket1->KeyIntVals.begin(); - while ( it1 < bucket1->KeyIntVals.end() ) { + // /-------headerSize---------------------------\ + // hash nulls-bitmap keyInt[] KeyIHash[] strSize| [strPos | strs] slotIdx + // \---------------------------------------slotSize ---------------------/ + // bit0 of nulls bitmap denotes key-with-nulls + // strSize only present if HasKeyStrCol || HasKeyICol + // strPos is only present if (HasKeyStrCol || HasKeyICol) && strSize + headerSize >= slotSize + // slotSize, slotIdx and strPos is only for hashtable (table2) + + for (ui64 keysValSize = headerSize1; it1 != bucket1->KeyIntVals.end(); it1 += keysValSize, ++tuple1Idx ) { - ui64 keysValSize; if ( table1HasKeyStringColumns || table1HasKeyIColumns ) { keysValSize = headerSize1 + *(it1 + headerSize1 - 1) ; - } else { - keysValSize = headerSize1; } - ui64 hash = *it1; ui64 * nullsPtr = it1+1; if (HasBitSet(nullsPtr, 1)) { - it1 += keysValSize; - tuple1Idx ++; continue; } - ui64 slotNum = hash % nSlots; - auto slotIt = joinSlots.begin() + slotNum * slotSize; - while (*slotIt != 0 && slotIt != joinSlots.end()) + auto slotIt = firstSlot(hash); + for (; *slotIt != 0; slotIt = nextSlot(slotIt) ) { + if (*slotIt != hash) + continue; - bool matchFound = false; - if (((keysValSize - nullsSize1) <= (slotSize - nullsSize2)) && !table1HasKeyIColumns ) { - if (std::equal(it1 + keyIntOffset1, it1 + keysValSize, slotIt + keyIntOffset2)) { - tuplesFound++; - matchFound = true; - } - } - - if (((keysValSize - nullsSize1) > (slotSize - nullsSize2)) && !table1HasKeyIColumns) { - if (std::equal(it1 + keyIntOffset1, it1 + headerSize1, slotIt + keyIntOffset2)) { - ui64 stringsPos = *(slotIt + headerSize2); - ui64 stringsSize = *(it1 + headerSize1 - 1); - if (std::equal(it1 + headerSize1, it1 + headerSize1 + stringsSize, spillSlots.begin() + stringsPos)) { - tuplesFound++; - matchFound = true; - } - } - } - - - if (table1HasKeyIColumns) - { - bool headerMatch = false; - bool stringsMatch = false; - bool iValuesMatch = false; + if (table1HasKeyIColumns || !(keysValSize - nullsSize1 <= slotSize - 1 - nullsSize2)) { + // 2nd condition cannot be true unless HasKeyStringColumns or HasKeyIColumns, hence size at the end of header is present - if (std::equal(it1 + keyIntOffset1, it1 + headerSize1 - 1, slotIt + keyIntOffset2)) { - headerMatch = true; - } + if (!std::equal(it1 + keyIntOffset1, it1 + headerSize1 - 1, slotIt + keyIntOffset2)) + continue; auto slotStringsStart = slotIt + headerSize2; + ui64 slotStringsSize = *(slotIt + headerSize2 - 1); - if (keysValSize > slotSize ) { + if (headerSize2 + slotStringsSize + 1 > slotSize) + { ui64 stringsPos = *(slotIt + headerSize2); - slotStringsStart = spillSlots.begin() + stringsPos; + slotStringsStart = bucket2->KeyIntVals.begin() + stringsPos; } - if ( !table1HasKeyStringColumns) { - stringsMatch = true; - } else { - ui64 stringsSize = *(it1 + headerSize1 - 1); - if (headerMatch && std::equal( it1 + headerSize1, it1 + headerSize1 + stringsSize, slotStringsStart )) { - stringsMatch = true; - } - } - - if (headerMatch && stringsMatch ) { - - - tuple2Idx = slotToIdx[(slotIt - joinSlots.begin()) / slotSize]; - i64 stringsOffsetsIdx1 = tuple1Idx * (JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 2); + if (table1HasKeyIColumns) + { + ui64 stringsOffsetsIdx1 = tuple1Idx * (JoinTable1->NumberOfStringColumns + JoinTable1->NumberOfIColumns + 2); ui64 stringsOffsetsIdx2 = tuple2Idx * (JoinTable2->NumberOfStringColumns + JoinTable2->NumberOfIColumns + 2); ui32 * stringsSizesPtr1 = bucket1->StringsOffsets.data() + stringsOffsetsIdx1 + 2; ui32 * stringsSizesPtr2 = bucket2->StringsOffsets.data() + stringsOffsetsIdx2 + 2; - - iValuesMatch = CompareIColumns( stringsSizesPtr1 , - (char *) (it1 + headerSize1 ), - stringsSizesPtr2, - (char *) (slotStringsStart), - JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns ); - } - - if (headerMatch && stringsMatch && iValuesMatch) { - tuplesFound++; - matchFound = true; + if (!CompareIColumns( stringsSizesPtr1 , + (char *) (it1 + headerSize1 ), + stringsSizesPtr2, + (char *) (slotStringsStart), + JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns )) + continue; + } else { + ui64 stringsSize = *(it1 + headerSize1 - 1); + if (stringsSize != slotStringsSize || !std::equal(it1 + headerSize1, it1 + headerSize1 + stringsSize, slotStringsStart)) + continue; } + } else { + if (!std::equal(it1 + keyIntOffset1, it1 + keysValSize, slotIt + keyIntOffset2)) + continue; } - if (matchFound) + tuple2Idx = slotIt[slotSize - 1]; + + tuplesFound++; + JoinTuplesIds joinIds; + joinIds.id1 = tuple1Idx; + joinIds.id2 = tuple2Idx; + if (JoinTable2->TableBucketsStats[bucket].TuplesNum > JoinTable1->TableBucketsStats[bucket].TuplesNum) { - JoinTuplesIds joinIds; - joinIds.id1 = tuple1Idx; - joinIds.id2 = slotToIdx[(slotIt - joinSlots.begin()) / slotSize]; - if (JoinTable2->TableBucketsStats[bucket].TuplesNum > JoinTable1->TableBucketsStats[bucket].TuplesNum) - { - std::swap(joinIds.id1, joinIds.id2); - } - joinResults.emplace_back(joinIds); + std::swap(joinIds.id1, joinIds.id2); } - - slotIt += slotSize; - if (slotIt == joinSlots.end()) - slotIt = joinSlots.begin(); + joinResults.emplace_back(joinIds); } - - it1 += keysValSize; - tuple1Idx ++; } std::sort(joinResults.begin(), joinResults.end(), [](JoinTuplesIds a, JoinTuplesIds b) @@ -676,7 +677,7 @@ inline bool TTable::HasJoinedTupleId(TTable *joinedTable, ui32 &tupleId2) { -inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys) { +inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf::TUnboxedValue * iColumns) { if (t.NSlots == 0) { t.SlotSize = HeaderSize + NumberOfKeyStringColumns * 2; @@ -684,8 +685,8 @@ inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys) { t.NSlots = DefaultTuplesNum; } - if ( ( (t.NSlots - t.FillCount) * 100 ) / t.NSlots < 50 ) { - ResizeHashTable(t, 2 * t.NSlots); + if ( t.FillCount > t.NSlots/2 ) { + ResizeHashTable(t, 2 * t.NSlots + 1); } if ( HasBitSet(keys + HashSize, 1)) // Keys with null value @@ -703,55 +704,50 @@ inline bool TTable::AddKeysToHashTable(KeysHashTable& t, ui64* keys) { keysSize = HeaderSize + keyStringsSize; } + auto nextSlot = [begin = t.Table.begin(), end = t.Table.end(), slotSize = t.SlotSize](auto it) { + it += slotSize; + if (it == end) + it = begin; + return it; + }; - while (*it != 0) { - - while (*it == hash) { - - ui64 storedStringsSize = 0; - ui64 storedKeysSize = HeaderSize; - if ( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) { - storedStringsSize = *(it + HeaderSize - 1); - storedKeysSize = HeaderSize + storedStringsSize; - } + for (auto itValSize = HeaderSize; *it != 0; it = nextSlot(it)) { - bool headerMatch = false; - bool stringsMatch = false; - headerMatch = std::equal(it + keyIntOffset, it + HeaderSize, keys + keyIntOffset); - if (!headerMatch) { - break; - } + if (*it != hash) + continue; - if ( headerMatch && !(NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0) ) { - return false; - } + if ( NumberOfKeyIColumns == 0 && (itValSize <= t.SlotSize)) { + if (!std::equal(it + keyIntOffset, it + itValSize, keys + keyIntOffset)) + continue; + return false; + } - if (storedStringsSize != keyStringsSize) { - break; - } + Y_DEBUG_ABORT_UNLESS( NumberOfKeyStringColumns > 0 || NumberOfKeyIColumns > 0); - ui64 * stringsStart; - if (storedKeysSize <= t.SlotSize) { - stringsStart = it + HeaderSize; - } else { - ui64 spillOffset = *(it + HeaderSize); - stringsStart = t.SpillData.begin() + spillOffset; - } + itValSize = HeaderSize + *(it + HeaderSize - 1); + auto slotStringsStart = it + HeaderSize; - stringsMatch = std::equal(keys + HeaderSize, keys + HeaderSize + keyStringsSize, stringsStart ); + if (!std::equal(it + keyIntOffset, it + HeaderSize - 1, keys + keyIntOffset)) + continue; - if ( headerMatch && stringsMatch ) { - return false; - } + if (NumberOfKeyIColumns > 0) { + if (!CompareIColumns( + (char *) (slotStringsStart), + (char *) (keys + HeaderSize ), + iColumns, + JoinTable1 -> ColInterfaces, JoinTable1->NumberOfStringColumns, JoinTable1 -> NumberOfKeyIColumns )) + continue; + return false; + } - break; + Y_DEBUG_ABORT_UNLESS(!(itValSize <= t.SlotSize)); - } + ui64 stringsPos = *(it + HeaderSize); + slotStringsStart = t.SpillData.begin() + stringsPos; - it += t.SlotSize; - if (it >= t.Table.end()) { - it = t.Table.begin(); - } + if (keysSize != itValSize || !std::equal(slotStringsStart, slotStringsStart + itValSize, keys + HeaderSize)) + continue; + return false; } if (keysSize > t.SlotSize) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 2709bf334f12..418616deef17 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -15,7 +15,7 @@ class TTableBucketSpiller; const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32 const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1; const ui64 NumberOfBuckets = (0x00000001 << BitsForNumberOfBuckets); // Number of hashed keys buckets to distribute incoming tables tuples -const ui64 DefaultTuplesNum = 100; // Default initial number of tuples in one bucket to allocate memory +const ui64 DefaultTuplesNum = 101; // Default initial number of tuples in one bucket to allocate memory const ui64 DefaultTupleBytes = 64; // Default size of all columns in table row for estimations const ui64 HashSize = 1; // Using ui64 hash size const ui64 SpillingSizeLimit = 1_MB; // Don't try to spill if net effect is lower than this size @@ -241,7 +241,7 @@ class TTable { inline bool HasJoinedTupleId(TTable* joinedTable, ui32& tupleId2); // Adds keys to KeysHashTable, return true if added, false if equal key already added - inline bool AddKeysToHashTable(KeysHashTable& t, ui64* keys); + inline bool AddKeysToHashTable(KeysHashTable& t, ui64* keys, NYql::NUdf::TUnboxedValue * iColumns); ui64 TotalPacked = 0; // Total number of packed tuples ui64 TotalUnpacked = 0; // Total number of unpacked tuples