Skip to content

CNDB-12407: lazily load token in PrimaryKeyWithSource #1500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d85140c
CNDB-12407: lazily load token in PrimaryKeyWithSource
michaeljmarshall Jan 13, 2025
2c69a27
Fix PrimaryKeyWithSource for min/max row in segment
michaeljmarshall Jan 14, 2025
e5febf4
Load fully qualified PrK for SegmentMetadata extrema
michaeljmarshall Jan 15, 2025
2ae0895
Remove unnecessary comparison logic
michaeljmarshall Jan 15, 2025
409afb0
Ensure PrK is loaded to allow closure of PrKMap
michaeljmarshall Jan 17, 2025
312e1b3
Fix source sstable key comparison logic; skip skipTo call with min token
michaeljmarshall Jan 17, 2025
20043f2
Optimize sai union iterator for non-overlapping case
michaeljmarshall Jan 17, 2025
0955f4a
Temp fix to workaround SegmentMetadata inconsistencies
michaeljmarshall Jan 17, 2025
fbfc9e3
Improve PrimaryKeyWithSource#equals impl
michaeljmarshall Jan 21, 2025
6599d33
Merge remote-tracking branch 'datastax/main' into cndb-12407
michaeljmarshall Mar 18, 2025
7dd099c
Fix compilation after merge commit broke it
michaeljmarshall Mar 18, 2025
1db23d1
Merge remote-tracking branch 'datastax/main' into cndb-12407
michaeljmarshall Apr 15, 2025
1792246
Make tests pass
michaeljmarshall Apr 15, 2025
c4aa3c0
Fix minor bug in union constructor to fix tests
michaeljmarshall Apr 18, 2025
bd7bb94
Update SAITester to always reset QUERY_OPT_LEVEL to 1
michaeljmarshall Apr 18, 2025
068c095
Minor tweaks to comments and readability
michaeljmarshall Apr 18, 2025
6f3b214
Merge remote-tracking branch 'datastax/main' into cndb-12407
michaeljmarshall Apr 22, 2025
90759a2
Isolate test scenario requiring special handling in SegmentMetadata
michaeljmarshall Apr 23, 2025
e23c58f
Merge remote-tracking branch 'datastax/main' into cndb-12407
michaeljmarshall Apr 24, 2025
a996763
Merge remote-tracking branch 'datastax/main' into cndb-12407
michaeljmarshall May 8, 2025
2b1dd39
Fix comment typo
michaeljmarshall May 8, 2025
ba15240
Tweak comment to be more helpful
michaeljmarshall May 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ protected PrimaryKey computeNext()
if (rowId == PostingList.END_OF_STREAM)
return endOfData();

var primaryKey = primaryKeyMap.primaryKeyFromRowId(rowId);
return new PrimaryKeyWithSource(primaryKey, primaryKeyMap.getSSTableId(), rowId);
return new PrimaryKeyWithSource(primaryKeyMap, rowId, searcherContext.minimumKey, searcherContext.maximumKey);
}
catch (Throwable t)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,38 @@

public class PrimaryKeyWithSource implements PrimaryKey
{
private final PrimaryKey primaryKey;
private final PrimaryKeyMap primaryKeyMap;
private final SSTableId<?> sourceSstableId;
private final long sourceRowId;
private PrimaryKey delegatePrimaryKey;
private final PrimaryKey sourceSstableMinKey;
private final PrimaryKey sourceSstableMaxKey;

public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId<?> sstableId, long sstableRowId)
public PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
{
assert primaryKey != null : "Cannot construct a PrimaryKeyWithSource with a null primaryKey";
this.primaryKey = primaryKey;
this.sourceSstableId = sstableId;
this.primaryKeyMap = primaryKeyMap;
this.sourceSstableId = primaryKeyMap.getSSTableId();
this.sourceRowId = sstableRowId;
this.sourceSstableMinKey = sourceSstableMinKey;
this.sourceSstableMaxKey = sourceSstableMaxKey;
}

public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId<?> sourceSstableId, long sourceRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
{
this.delegatePrimaryKey = primaryKey;
this.primaryKeyMap = null;
this.sourceSstableId = sourceSstableId;
this.sourceRowId = sourceRowId;
this.sourceSstableMinKey = sourceSstableMinKey;
this.sourceSstableMaxKey = sourceSstableMaxKey;
}

private PrimaryKey primaryKey()
{
if (delegatePrimaryKey == null)
delegatePrimaryKey = primaryKeyMap.primaryKeyFromRowId(sourceRowId);

return delegatePrimaryKey;
}

public long getSourceRowId()
Expand All @@ -54,43 +76,43 @@ public SSTableId<?> getSourceSstableId()
@Override
public Token token()
{
return primaryKey.token();
return primaryKey().token();
}

@Override
public DecoratedKey partitionKey()
{
return primaryKey.partitionKey();
return primaryKey().partitionKey();
}

@Override
public Clustering clustering()
{
return primaryKey.clustering();
return primaryKey().clustering();
}

@Override
public PrimaryKey loadDeferred()
{
return primaryKey.loadDeferred();
return primaryKey().loadDeferred();
}

@Override
public ByteSource asComparableBytes(ByteComparable.Version version)
{
return primaryKey.asComparableBytes(version);
return primaryKey().asComparableBytes(version);
}

@Override
public ByteSource asComparableBytesMinPrefix(ByteComparable.Version version)
{
return primaryKey.asComparableBytesMinPrefix(version);
return primaryKey().asComparableBytesMinPrefix(version);
}

@Override
public ByteSource asComparableBytesMaxPrefix(ByteComparable.Version version)
{
return primaryKey.asComparableBytesMaxPrefix(version);
return primaryKey().asComparableBytesMaxPrefix(version);
}

@Override
Expand All @@ -101,8 +123,23 @@ public int compareTo(PrimaryKey o)
var other = (PrimaryKeyWithSource) o;
if (sourceSstableId.equals(other.sourceSstableId))
return Long.compare(sourceRowId, other.sourceRowId);
// Compare to the other source sstable's min and max keys to determine if the keys are comparable.
// Note that these are already loaded into memory as part of the segment's metadata, so the comparison
// is cheaper than loading the actual keys.
if (sourceSstableMinKey.compareTo(other.sourceSstableMaxKey) > 0)
return 1;
if (sourceSstableMaxKey.compareTo(other.sourceSstableMinKey) < 0)
return -1;
}
else
{
if (sourceSstableMinKey.compareTo(o) > 0)
return 1;
if (sourceSstableMaxKey.compareTo(o) < 0)
return -1;
}
return primaryKey.compareTo(o);

return primaryKey().compareTo(o);
}

@Override
Expand All @@ -111,22 +148,29 @@ public boolean equals(Object o)
if (o instanceof PrimaryKeyWithSource)
{
var other = (PrimaryKeyWithSource) o;
// If they are from the same source sstable, we can compare the row ids directly.
if (sourceSstableId.equals(other.sourceSstableId))
return sourceRowId == other.sourceRowId;

// If the source sstable primary key ranges do not intersect, the keys cannot be equal.
if (sourceSstableMinKey.compareTo(other.sourceSstableMaxKey) > 0
|| sourceSstableMaxKey.compareTo(other.sourceSstableMinKey) < 0)
return false;
}
return primaryKey.equals(o);

return primaryKey().equals(o);
}

@Override
public int hashCode()
{
return primaryKey.hashCode();
return primaryKey().hashCode();
}

@Override
public String toString()
{
return String.format("%s (source sstable: %s, %s)", primaryKey, sourceSstableId, sourceRowId);
return String.format("%s (source sstable: %s, %s)", delegatePrimaryKey, sourceSstableId, sourceRowId);
}

@Override
Expand All @@ -136,6 +180,6 @@ public long ramBytesUsed()
return RamUsageEstimator.NUM_BYTES_OBJECT_HEADER +
2L * RamUsageEstimator.NUM_BYTES_OBJECT_REF +
Long.BYTES +
primaryKey.ramBytesUsed();
primaryKey().ramBytesUsed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,18 @@
import java.util.Objects;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.ModernResettableByteBuffersIndexOutput;
import org.apache.cassandra.index.sai.disk.PostingList;
import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource;
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexInput;
Expand Down Expand Up @@ -134,18 +137,52 @@ public class SegmentMetadata implements Comparable<SegmentMetadata>
private static final Logger logger = LoggerFactory.getLogger(SegmentMetadata.class);

@SuppressWarnings("resource")
private SegmentMetadata(IndexInput input, IndexContext context, Version version) throws IOException
private SegmentMetadata(IndexInput input, IndexContext context, Version version, SSTableContext sstableContext, boolean loadFullResolutionBounds) throws IOException
{
PrimaryKey.Factory primaryKeyFactory = context.keyFactory();
if (!loadFullResolutionBounds)
logger.warn("Loading segment metadata without full primary key boundary resolution. Some ORDER BY queries" +
" may not work correctly.");

AbstractType<?> termsType = context.getValidator();

this.version = version;
this.segmentRowIdOffset = input.readLong();
this.numRows = input.readLong();
this.minSSTableRowId = input.readLong();
this.maxSSTableRowId = input.readLong();
this.minKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
this.maxKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));

if (loadFullResolutionBounds)
{
// Skip the min/max partition keys since we want the fully resolved PrimaryKey for better semantics.
// Also, these values are not always correct for flushed sstables, but the min/max row ids are, which
// provides further justification for skipping them.
skipBytes(input);
skipBytes(input);

// Get the fully qualified PrimaryKey min and max objects to ensure that we skip several edge cases related
// to possibly confusing equality semantics. The main issue is how we handl PrimaryKey objects that
// are not fully qualified when doing a binary search on a collection of PrimaryKeyWithSource objects.
// By materializing the fully qualified PrimaryKey objects, we get the right binary search result.
final PrimaryKey min, max;
try (var pkm = sstableContext.primaryKeyMapFactory().newPerSSTablePrimaryKeyMap())
{
// We need to load eagerly to allow us to close the partition key map.
min = pkm.primaryKeyFromRowId(minSSTableRowId).loadDeferred();
max = pkm.primaryKeyFromRowId(maxSSTableRowId).loadDeferred();
}

this.minKey = new PrimaryKeyWithSource(min, sstableContext.sstable.getId(), minSSTableRowId, min, max);
this.maxKey = new PrimaryKeyWithSource(max, sstableContext.sstable.getId(), maxSSTableRowId, min, max);
}
else
{
assert sstableContext == null;
// Only valid in some very specific tests.
PrimaryKey.Factory primaryKeyFactory = context.keyFactory();
this.minKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
this.maxKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
}

this.minTerm = readBytes(input);
this.maxTerm = readBytes(input);
TermsDistribution td = null;
Expand All @@ -164,7 +201,27 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version)
}

@SuppressWarnings("resource")
public static List<SegmentMetadata> load(MetadataSource source, IndexContext context) throws IOException
public static List<SegmentMetadata> load(MetadataSource source, IndexContext context, SSTableContext sstableContext) throws IOException
{
return load(source, context, sstableContext, true);
}

/**
* This is only visible for testing because the SegmentFlushTest creates fake boundary scenarios that break
* normal assumptions about the min/max row ids mapping to specific positions in the per-sstable index components.
* Only set loadFullResolutionBounds to false in tests when you are sure that is the only possible solution.
*/
@VisibleForTesting
@SuppressWarnings("resource")
public static List<SegmentMetadata> loadForTesting(MetadataSource source, IndexContext context) throws IOException
{
return load(source, context, null, false);
}

/**
* Only set loadFullResolutionBounds to false in tests when you are sure that is exactly what you want.
*/
private static List<SegmentMetadata> load(MetadataSource source, IndexContext context, SSTableContext sstableContext, boolean loadFullResolutionBounds) throws IOException
{

IndexInput input = source.get(NAME);
Expand All @@ -175,7 +232,7 @@ public static List<SegmentMetadata> load(MetadataSource source, IndexContext con

for (int i = 0; i < segmentCount; i++)
{
segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion()));
segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion(), sstableContext, loadFullResolutionBounds));
}

return segmentMetadata;
Expand Down Expand Up @@ -300,6 +357,12 @@ private static ByteBuffer readBytes(IndexInput input) throws IOException
return ByteBuffer.wrap(bytes);
}

private static void skipBytes(IndexInput input) throws IOException
{
int len = input.readVInt();
input.skipBytes(len);
}

static void writeBytes(ByteBuffer buf, IndexOutput out)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.plan.Orderer;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.PrimaryKeyListUtil;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey;
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.io.sstable.format.SSTableReader;
Expand Down Expand Up @@ -87,7 +88,7 @@ public V1SearchableIndex(SSTableContext sstableContext, IndexComponents.ForRead

final MetadataSource source = MetadataSource.loadMetadata(perIndexComponents);

metadatas = SegmentMetadata.load(source, indexContext);
metadatas = SegmentMetadata.load(source, indexContext, sstableContext);

for (SegmentMetadata metadata : metadatas)
{
Expand Down Expand Up @@ -232,7 +233,7 @@ public List<CloseableIterator<PrimaryKeyWithSortKey>> orderResultsBy(QueryContex
for (Segment segment : segments)
{
// Only pass the primary keys in a segment's range to the segment index.
var segmentKeys = getKeysInRange(keys, segment);
var segmentKeys = PrimaryKeyListUtil.getKeysInRange(keys, segment.metadata.minKey, segment.metadata.maxKey);
var segmentLimit = segment.proportionalAnnLimit(limit, totalRows);
results.add(segment.orderResultsBy(context, segmentKeys, orderer, segmentLimit));
}
Expand Down Expand Up @@ -289,37 +290,6 @@ public long estimateMatchingRowsCount(Expression predicate, AbstractBounds<Parti
return rowCount;
}

/** Create a sublist of the keys within (inclusive) the segment's bounds */
protected List<PrimaryKey> getKeysInRange(List<PrimaryKey> keys, Segment segment)
{
int minIndex = findBoundaryIndex(keys, segment, true);
int maxIndex = findBoundaryIndex(keys, segment, false);
return keys.subList(minIndex, maxIndex);
}

private int findBoundaryIndex(List<PrimaryKey> keys, Segment segment, boolean findMin)
{
// The minKey and maxKey are sometimes just partition keys (not primary keys), so binarySearch
// may not return the index of the least/greatest match.
var key = findMin ? segment.metadata.minKey : segment.metadata.maxKey;
int index = Collections.binarySearch(keys, key);
if (index < 0)
return -index - 1;
if (findMin)
{
while (index > 0 && keys.get(index - 1).equals(key))
index--;
}
else
{
while (index < keys.size() - 1 && keys.get(index + 1).equals(key))
index++;
// We must include the PrimaryKey at the boundary
index++;
}
return index;
}

@Override
public void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.plan.Orderer;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.PrimaryKeyListUtil;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithScore;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey;
import org.apache.cassandra.index.sai.utils.RangeUtil;
Expand Down Expand Up @@ -306,10 +307,10 @@ public CloseableIterator<PrimaryKeyWithSortKey> orderResultsBy(QueryContext cont
// Compute the keys that exist in the current memtable and their corresponding graph ordinals
var keysInGraph = new HashSet<PrimaryKey>();
var relevantOrdinals = new IntHashSet();
keys.stream()
.dropWhile(k -> k.compareTo(minimumKey) < 0)
.takeWhile(k -> k.compareTo(maximumKey) <= 0)
.forEach(k ->

var keysInRange = PrimaryKeyListUtil.getKeysInRange(keys, minimumKey, maximumKey);

keysInRange.forEach(k ->
{
var v = graph.vectorForKey(k);
if (v == null)
Expand Down
Loading