Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-12407: lazily load token in PrimaryKeyWithSource #1500

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ protected PrimaryKey computeNext()
if (rowId == PostingList.END_OF_STREAM)
return endOfData();

var primaryKey = primaryKeyMap.primaryKeyFromRowId(rowId);
return new PrimaryKeyWithSource(primaryKey, primaryKeyMap.getSSTableId(), rowId);
return new PrimaryKeyWithSource(primaryKeyMap, rowId, searcherContext.minimumKey, searcherContext.maximumKey);
}
catch (Throwable t)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,38 @@

public class PrimaryKeyWithSource implements PrimaryKey
{
private final PrimaryKey primaryKey;
private final PrimaryKeyMap primaryKeyMap;
private final SSTableId<?> sourceSstableId;
private final long sourceRowId;
private PrimaryKey delegatePrimaryKey;
private final PrimaryKey sourceSstableMinKey;
private final PrimaryKey sourceSstableMaxKey;

public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId<?> sstableId, long sstableRowId)
public PrimaryKeyWithSource(PrimaryKeyMap primaryKeyMap, long sstableRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
{
assert primaryKey != null : "Cannot construct a PrimaryKeyWithSource with a null primaryKey";
this.primaryKey = primaryKey;
this.sourceSstableId = sstableId;
this.primaryKeyMap = primaryKeyMap;
this.sourceSstableId = primaryKeyMap.getSSTableId();
this.sourceRowId = sstableRowId;
this.sourceSstableMinKey = sourceSstableMinKey;
this.sourceSstableMaxKey = sourceSstableMaxKey;
}

public PrimaryKeyWithSource(PrimaryKey primaryKey, SSTableId<?> sourceSstableId, long sourceRowId, PrimaryKey sourceSstableMinKey, PrimaryKey sourceSstableMaxKey)
{
this.delegatePrimaryKey = primaryKey;
this.primaryKeyMap = null;
this.sourceSstableId = sourceSstableId;
this.sourceRowId = sourceRowId;
this.sourceSstableMinKey = sourceSstableMinKey;
this.sourceSstableMaxKey = sourceSstableMaxKey;
}

private PrimaryKey primaryKey()
{
if (delegatePrimaryKey == null)
delegatePrimaryKey = primaryKeyMap.primaryKeyFromRowId(sourceRowId);

return delegatePrimaryKey;
}

public long getSourceRowId()
Expand All @@ -53,43 +75,43 @@ public SSTableId<?> getSourceSstableId()
@Override
public Token token()
{
return primaryKey.token();
return primaryKey().token();
}

@Override
public DecoratedKey partitionKey()
{
return primaryKey.partitionKey();
return primaryKey().partitionKey();
}

@Override
public Clustering clustering()
{
return primaryKey.clustering();
return primaryKey().clustering();
}

@Override
public PrimaryKey loadDeferred()
{
return primaryKey.loadDeferred();
return primaryKey().loadDeferred();
}

@Override
public ByteSource asComparableBytes(ByteComparable.Version version)
{
return primaryKey.asComparableBytes(version);
return primaryKey().asComparableBytes(version);
}

@Override
public ByteSource asComparableBytesMinPrefix(ByteComparable.Version version)
{
return primaryKey.asComparableBytesMinPrefix(version);
return primaryKey().asComparableBytesMinPrefix(version);
}

@Override
public ByteSource asComparableBytesMaxPrefix(ByteComparable.Version version)
{
return primaryKey.asComparableBytesMaxPrefix(version);
return primaryKey().asComparableBytesMaxPrefix(version);
}

@Override
Expand All @@ -100,8 +122,16 @@ public int compareTo(PrimaryKey o)
var other = (PrimaryKeyWithSource) o;
if (sourceSstableId.equals(other.sourceSstableId))
return Long.compare(sourceRowId, other.sourceRowId);
// Compare to the other source sstable's min and max keys to determine if the keys are comparable.
// Note that these are already loaded into memory as part of the segment's metadata, so they comparison
// is cheaper than loading the actual keys.
if (sourceSstableMinKey.compareTo(other.sourceSstableMaxKey) > 0)
return 1;
if (sourceSstableMaxKey.compareTo(other.sourceSstableMinKey) < 0)
return -1;
}
return primaryKey.compareTo(o);

return primaryKey().compareTo(o);
}

@Override
Expand All @@ -110,21 +140,28 @@ public boolean equals(Object o)
if (o instanceof PrimaryKeyWithSource)
{
var other = (PrimaryKeyWithSource) o;
// If they are from the same source sstable, we can compare the row ids directly.
if (sourceSstableId.equals(other.sourceSstableId))
return sourceRowId == other.sourceRowId;

// If the source sstable does not contain the PrimaryKey, the keys cannot be equal.
if (sourceSstableMinKey.compareTo(other.sourceSstableMaxKey) > 0
|| sourceSstableMaxKey.compareTo(other.sourceSstableMinKey) < 0)
return false;
}
return primaryKey.equals(o);

return primaryKey().equals(o);
}

@Override
public int hashCode()
{
return primaryKey.hashCode();
return primaryKey().hashCode();
}

@Override
public String toString()
{
return String.format("%s (source sstable: %s, %s)", primaryKey, sourceSstableId, sourceRowId);
return String.format("%s (source sstable: %s, %s)", delegatePrimaryKey, sourceSstableId, sourceRowId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.index.sai.IndexContext;
import org.apache.cassandra.index.sai.SSTableContext;
import org.apache.cassandra.index.sai.disk.ModernResettableByteBuffersIndexOutput;
import org.apache.cassandra.index.sai.disk.PostingList;
import org.apache.cassandra.index.sai.disk.PrimaryKeyWithSource;
import org.apache.cassandra.index.sai.disk.format.IndexComponentType;
import org.apache.cassandra.index.sai.disk.format.Version;
import org.apache.cassandra.index.sai.disk.io.IndexInput;
Expand All @@ -44,6 +47,7 @@
import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.io.sstable.SSTableId;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;

Expand Down Expand Up @@ -134,18 +138,36 @@ public class SegmentMetadata implements Comparable<SegmentMetadata>
private static final Logger logger = LoggerFactory.getLogger(SegmentMetadata.class);

@SuppressWarnings("resource")
private SegmentMetadata(IndexInput input, IndexContext context, Version version) throws IOException
private SegmentMetadata(IndexInput input, IndexContext context, Version version, SSTableContext sstableContext) throws IOException
{
PrimaryKey.Factory primaryKeyFactory = context.keyFactory();
AbstractType<?> termsType = context.getValidator();

this.version = version;
this.segmentRowIdOffset = input.readLong();
this.numRows = input.readLong();
this.minSSTableRowId = input.readLong();
this.maxSSTableRowId = input.readLong();
this.minKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));
this.maxKey = primaryKeyFactory.createPartitionKeyOnly(DatabaseDescriptor.getPartitioner().decorateKey(readBytes(input)));

// The next values are the min/max partition keys. As a tempory test, we are skipping them because they are
// not as useful as the min/max row ids, which are always correct for both flushed and compacted sstables.
skipBytes(input);
skipBytes(input);

// Get the fully qualified PrimaryKey min and max objects to ensure that we skip several edge cases related
// to possibly confusing equality semantics where PrimaryKeyWithSource slightly diverges from PrimaryKey where
// PrimaryKey is just a partition key without a materializable clustering key.
final PrimaryKey min, max;
try (var pkm = sstableContext.primaryKeyMapFactory().newPerSSTablePrimaryKeyMap())
{
// We need to load eagerly to allow us to close the partition key map. Otherwise, all tests will
// pass due to the side effect of calling partitionKey(), but it'll fail when you remove the -ea flag.
min = pkm.primaryKeyFromRowId(minSSTableRowId).loadDeferred();
max = pkm.primaryKeyFromRowId(maxSSTableRowId).loadDeferred();
}

this.minKey = new PrimaryKeyWithSource(min, sstableContext.sstable.getId(), minSSTableRowId, min, max);
this.maxKey = new PrimaryKeyWithSource(max, sstableContext.sstable.getId(), maxSSTableRowId, min, max);

this.minTerm = readBytes(input);
this.maxTerm = readBytes(input);
TermsDistribution td = null;
Expand All @@ -164,7 +186,7 @@ private SegmentMetadata(IndexInput input, IndexContext context, Version version)
}

@SuppressWarnings("resource")
public static List<SegmentMetadata> load(MetadataSource source, IndexContext context) throws IOException
public static List<SegmentMetadata> load(MetadataSource source, IndexContext context, SSTableContext sstableContext) throws IOException
{

IndexInput input = source.get(NAME);
Expand All @@ -175,7 +197,7 @@ public static List<SegmentMetadata> load(MetadataSource source, IndexContext con

for (int i = 0; i < segmentCount; i++)
{
segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion()));
segmentMetadata.add(new SegmentMetadata(input, context, source.getVersion(), sstableContext));
}

return segmentMetadata;
Expand Down Expand Up @@ -300,6 +322,12 @@ private static ByteBuffer readBytes(IndexInput input) throws IOException
return ByteBuffer.wrap(bytes);
}

private static void skipBytes(IndexInput input) throws IOException
{
int len = input.readVInt();
input.skipBytes(len);
}

static void writeBytes(ByteBuffer buf, IndexOutput out)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.plan.Orderer;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.PrimaryKeyListUtil;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey;
import org.apache.cassandra.index.sai.utils.TypeUtil;
import org.apache.cassandra.io.sstable.format.SSTableReader;
Expand Down Expand Up @@ -87,7 +88,7 @@ public V1SearchableIndex(SSTableContext sstableContext, IndexComponents.ForRead

final MetadataSource source = MetadataSource.loadMetadata(perIndexComponents);

metadatas = SegmentMetadata.load(source, indexContext);
metadatas = SegmentMetadata.load(source, indexContext, sstableContext);

for (SegmentMetadata metadata : metadatas)
{
Expand Down Expand Up @@ -230,7 +231,7 @@ public List<CloseableIterator<PrimaryKeyWithSortKey>> orderResultsBy(QueryContex
for (Segment segment : segments)
{
// Only pass the primary keys in a segment's range to the segment index.
var segmentKeys = getKeysInRange(keys, segment);
var segmentKeys = PrimaryKeyListUtil.getKeysInRange(keys, segment.metadata.minKey, segment.metadata.maxKey);
var segmentLimit = segment.proportionalAnnLimit(limit, totalRows);
results.add(segment.orderResultsBy(context, segmentKeys, orderer, segmentLimit));
}
Expand Down Expand Up @@ -287,37 +288,6 @@ public long estimateMatchingRowsCount(Expression predicate, AbstractBounds<Parti
return rowCount;
}

/** Create a sublist of the keys within (inclusive) the segment's bounds */
protected List<PrimaryKey> getKeysInRange(List<PrimaryKey> keys, Segment segment)
{
int minIndex = findBoundaryIndex(keys, segment, true);
int maxIndex = findBoundaryIndex(keys, segment, false);
return keys.subList(minIndex, maxIndex);
}

private int findBoundaryIndex(List<PrimaryKey> keys, Segment segment, boolean findMin)
{
// The minKey and maxKey are sometimes just partition keys (not primary keys), so binarySearch
// may not return the index of the least/greatest match.
var key = findMin ? segment.metadata.minKey : segment.metadata.maxKey;
int index = Collections.binarySearch(keys, key);
if (index < 0)
return -index - 1;
if (findMin)
{
while (index > 0 && keys.get(index - 1).equals(key))
index--;
}
else
{
while (index < keys.size() - 1 && keys.get(index + 1).equals(key))
index++;
// We must include the PrimaryKey at the boundary
index++;
}
return index;
}

@Override
public void close() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.cassandra.index.sai.plan.Expression;
import org.apache.cassandra.index.sai.plan.Orderer;
import org.apache.cassandra.index.sai.utils.PrimaryKey;
import org.apache.cassandra.index.sai.utils.PrimaryKeyListUtil;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithScore;
import org.apache.cassandra.index.sai.utils.PrimaryKeyWithSortKey;
import org.apache.cassandra.index.sai.utils.RangeUtil;
Expand Down Expand Up @@ -288,10 +289,10 @@ public CloseableIterator<PrimaryKeyWithSortKey> orderResultsBy(QueryContext cont
// Compute the keys that exist in the current memtable and their corresponding graph ordinals
var keysInGraph = new HashSet<PrimaryKey>();
var relevantOrdinals = new IntHashSet();
keys.stream()
.dropWhile(k -> k.compareTo(minimumKey) < 0)
.takeWhile(k -> k.compareTo(maximumKey) <= 0)
.forEach(k ->

var keysInRange = PrimaryKeyListUtil.getKeysInRange(keys, minimumKey, maximumKey);

keysInRange.forEach(k ->
{
var v = graph.vectorForKey(k);
if (v == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,19 @@ protected KeyRangeIterator buildIterator()
return rangeIterators.get(0);

default:
//TODO Need to test whether an initial sort improves things
return new KeyRangeUnionIterator(statistics, rangeIterators);
rangeIterators.sort((a, b) -> a.getMinimum().compareTo(b.getMinimum()));
boolean isDisjoint = true;
for (int i = 0; i < rangeIterators.size() - 1; i++)
{
if (rangeIterators.get(i).getMaximum().compareTo(rangeIterators.get(i + 1).getMinimum()) > 0)
{
isDisjoint = false;
break;
}
}
// If the iterators are not overlapping, then we can use the concat iterator which is more efficient
return isDisjoint ? new KeyRangeConcatIterator(statistics, rangeIterators)
: new KeyRangeUnionIterator(statistics, rangeIterators);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,10 @@ public CloseableIterator<PrimaryKeyWithSortKey> getTopKRows(KeyRangeIterator sou
*/
private List<PrimaryKey> materializeKeys(KeyRangeIterator source)
{
// Skip to the first key in the range
source.skipTo(primaryKeyFactory().createTokenOnly(mergeRange.left.getToken()));
// Skip to the first key in the range if it is not the minimum token
if (!mergeRange.left.isMinimum())
source.skipTo(firstPrimaryKey);

if (!source.hasNext())
return List.of();

Expand Down
Loading