Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CNDB-12304: Parallelize ORDER BY row materialization/validation #1489

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3242,6 +3242,34 @@ public ViewFragment(List<SSTableReader> sstables, Iterable<Memtable> memtables)
this.sstables = sstables;
this.memtables = memtables;
}

public void sortSSTablesByMaxTimestampDescending()
{
sstables.sort(SSTableReader.maxTimestampDescending);
}
}

// A RefViewFragment that is sorted by max timestamp descending, which makes this object safe to reuse and to
// share across threads.
public static class SortedRefViewFragment extends RefViewFragment
{
private SortedRefViewFragment(RefViewFragment view)
{
// Copy sstable list to an immutable list to ensure there is not a future change that modifies the list
super(List.copyOf(view.sstables), view.memtables, view.refs);
}

// sstables are expected to be pre-sorted
@Override
public void sortSSTablesByMaxTimestampDescending()
{
}

public static SortedRefViewFragment sortThenCreateFrom(RefViewFragment view)
{
view.sortSSTablesByMaxTimestampDescending();
return new SortedRefViewFragment(view);
}
}

public static class RefViewFragment extends ViewFragment implements AutoCloseable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
if (Tracing.traceSinglePartitions())
Tracing.trace("Acquiring sstable references");

view.sstables.sort(SSTableReader.maxTimestampDescending);
view.sortSSTablesByMaxTimestampDescending();
ClusteringIndexFilter filter = clusteringIndexFilter();
long minTimestamp = Long.MAX_VALUE;
long mostRecentPartitionTombstone = Long.MIN_VALUE;
Expand Down Expand Up @@ -673,7 +673,7 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
* In other words, iterating in descending maxTimestamp order allow to do our mostRecentPartitionTombstone
* elimination in one pass, and minimize the number of sstables for which we read a partition tombstone.
*/
view.sstables.sort(SSTableReader.maxTimestampDescending);
view.sortSSTablesByMaxTimestampDescending();
int nonIntersectingSSTables = 0;
int includedDueToTombstones = 0;

Expand Down Expand Up @@ -903,7 +903,7 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
}

/* add the SSTables on disk */
view.sstables.sort(SSTableReader.maxTimestampDescending);
view.sortSSTablesByMaxTimestampDescending();
// read sorted sstables
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
for (SSTableReader sstable : view.sstables)
Expand Down
7 changes: 6 additions & 1 deletion src/java/org/apache/cassandra/index/sai/QueryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,14 @@ public FilterSortOrder filterSortOrder()
return filterSortOrder;
}

public long approximateRemainingTimeNs()
{
return executionQuotaNano - totalQueryTimeNs();
}

public void checkpoint()
{
if (totalQueryTimeNs() >= executionQuotaNano && !DISABLE_TIMEOUT)
if (approximateRemainingTimeNs() < 0 && !DISABLE_TIMEOUT)
{
addQueryTimeouts(1);
throw new AbortedOperationException();
Expand Down
7 changes: 4 additions & 3 deletions src/java/org/apache/cassandra/index/sai/plan/QueryView.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@

public class QueryView implements AutoCloseable
{
final ColumnFamilyStore.RefViewFragment view;
// We use a SortedRefViewFragment because it can be safely shared across multiple threads when materializing rows.
final ColumnFamilyStore.SortedRefViewFragment view;
final Set<SSTableIndex> referencedIndexes;
final Set<MemtableIndex> memtableIndexes;
final IndexContext indexContext;

public QueryView(ColumnFamilyStore.RefViewFragment view,
public QueryView(ColumnFamilyStore.SortedRefViewFragment view,
Set<SSTableIndex> referencedIndexes,
Set<MemtableIndex> memtableIndexes,
IndexContext indexContext)
Expand Down Expand Up @@ -180,7 +181,7 @@ else if (MonotonicClock.approxTime.now() - failingSince > TimeUnit.MILLISECONDS.
// freeze referencedIndexes and memtableIndexes, so we can safely give access to them
// without risking something messes them up
// (this was added after KeyRangeTermIterator messed them up which led to a bug)
return new QueryView(refViewFragment,
return new QueryView(ColumnFamilyStore.SortedRefViewFragment.sortThenCreateFrom(refViewFragment),
Collections.unmodifiableSet(referencedIndexes),
Collections.unmodifiableSet(memtableIndexes),
indexContext);
Expand Down
Loading