Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-15452: Read Ahead for Compaction, Repair, and Range Queries #3606

Open
wants to merge 8 commits into
base: cassandra-5.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 107 additions & 107 deletions .circleci/config.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/cache/ChunkCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ public void invalidate(long position)
}

@Override
public Rebufferer instantiateRebufferer()
public Rebufferer instantiateRebufferer(boolean isScan)
{
return this;
}
Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ public MemtableOptions()
@Replaces(oldName = "min_free_space_per_drive_in_mb", converter = Converters.MEBIBYTES_DATA_STORAGE_INT, deprecated = true)
public DataStorageSpec.IntMebibytesBound min_free_space_per_drive = new DataStorageSpec.IntMebibytesBound("50MiB");

public DataStorageSpec.IntKibibytesBound compressed_read_ahead_buffer_size = new DataStorageSpec.IntKibibytesBound("256KiB");

// fraction of free disk space available for compaction after min free space is subtracted
public volatile Double max_space_usable_for_compactions_in_percentage = .95;

Expand Down
21 changes: 21 additions & 0 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -934,6 +934,9 @@ else if (conf.max_value_size.toMebibytes() >= 2048)
break;
}

if (conf.compressed_read_ahead_buffer_size.toKibibytes() > 0 && conf.compressed_read_ahead_buffer_size.toKibibytes() < 256)
throw new ConfigurationException("compressed_read_ahead_buffer_size must be at least 256KiB (set to 0 to disable), but was " + conf.compressed_read_ahead_buffer_size, false);

if (conf.server_encryption_options != null)
{
conf.server_encryption_options.applyConfig();
Expand Down Expand Up @@ -2520,6 +2523,24 @@ public static void setConcurrentViewBuilders(int value)
conf.concurrent_materialized_view_builders = value;
}

public static int getCompressedReadAheadBufferSize()
{
return conf.compressed_read_ahead_buffer_size.toBytes();
}

public static int getCompressedReadAheadBufferSizeInKB()
{
return conf.compressed_read_ahead_buffer_size.toKibibytes();
}

public static void setCompressedReadAheadBufferSizeInKb(int sizeInKb)
{
if (sizeInKb < 256)
throw new IllegalArgumentException("compressed_read_ahead_buffer_size_in_kb must be at least 256KiB");

conf.compressed_read_ahead_buffer_size = createIntKibibyteBoundAndEnsureItIsValidForByteConversion(sizeInKb, "compressed_read_ahead_buffer_size");
}

public static long getMinFreeSpacePerDriveInMebibytes()
{
return conf.min_free_space_per_drive.toMebibytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,11 @@ public RandomAccessReader openDataReader()
return dfile.createReader();
}

public RandomAccessReader openDataReaderForScan()
{
return dfile.createReaderForScan();
}

public void trySkipFileCacheBefore(DecoratedKey key)
{
long position = getPosition(key, SSTableReader.Operator.GE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected SSTableScanner(S sstable,
{
assert sstable != null;

this.dfile = sstable.openDataReader();
this.dfile = sstable.openDataReaderForScan();
this.sstable = sstable;
this.columns = columns;
this.dataRange = dataRange;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ protected BufferManagingRebufferer(ChunkReader wrapped)
public void closeReader()
{
BufferPools.forChunkCache().put(buffer);
source.releaseUnderlyingResources();
offset = -1;
}

Expand Down
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/io/util/ChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ public interface ChunkReader extends RebuffererFactory
* This is not guaranteed to be fulfilled.
*/
BufferType preferredBufferType();

default void releaseUnderlyingResources() {}
}
201 changes: 173 additions & 28 deletions src/java/org/apache/cassandra/io/util/CompressedChunkReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.io.compress.CorruptBlockException;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.utils.ChecksumType;
import org.apache.cassandra.utils.Closeable;

public abstract class CompressedChunkReader extends AbstractReaderFileProxy implements ChunkReader
{
Expand All @@ -47,6 +49,11 @@ protected CompressedChunkReader(ChannelProxy channel, CompressionMetadata metada
assert Integer.bitCount(metadata.chunkLength()) == 1; //must be a power of two
}

protected CompressedChunkReader forScan()
{
return this;
}

@VisibleForTesting
public double getCrcCheckChance()
{
Expand Down Expand Up @@ -83,20 +90,167 @@ public BufferType preferredBufferType()
}

@Override
public Rebufferer instantiateRebufferer()
public Rebufferer instantiateRebufferer(boolean isScan)
{
return new BufferManagingRebufferer.Aligned(this);
return new BufferManagingRebufferer.Aligned(isScan ? forScan() : this);
}

public static class Standard extends CompressedChunkReader
protected interface CompressedReader extends Closeable
{
default void allocateResources()
{
}

default void deallocateResources()
{
}

default boolean allocated()
{
return false;
}

default void close()
{

}


ByteBuffer read(CompressionMetadata.Chunk chunk, boolean shouldCheckCrc) throws CorruptBlockException;
}

private static class RandomAccessCompressedReader implements CompressedReader
{
private final ChannelProxy channel;
private final ThreadLocalByteBufferHolder bufferHolder;

private RandomAccessCompressedReader(ChannelProxy channel, CompressionMetadata metadata)
{
this.channel = channel;
this.bufferHolder = new ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
}

@Override
public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean shouldCheckCrc) throws CorruptBlockException
{
int length = shouldCheckCrc ? chunk.length + Integer.BYTES // compressed length + checksum length
: chunk.length;
ByteBuffer compressed = bufferHolder.getBuffer(length);
if (channel.read(compressed, chunk.offset) != length)
throw new CorruptBlockException(channel.filePath(), chunk);
compressed.flip();
compressed.limit(chunk.length);

if (shouldCheckCrc)
{
int checksum = (int) ChecksumType.CRC32.of(compressed);
compressed.limit(length);
if (compressed.getInt() != checksum)
throw new CorruptBlockException(channel.filePath(), chunk);
compressed.position(0).limit(chunk.length);
}
return compressed;
}
}

private static class ScanCompressedReader implements CompressedReader
{
// we read the raw compressed bytes into this buffer, then uncompressed them into the provided one.
private final ChannelProxy channel;
private final ThreadLocalByteBufferHolder bufferHolder;
private final ThreadLocalReadAheadBuffer readAheadBuffer;

private ScanCompressedReader(ChannelProxy channel, CompressionMetadata metadata, int readAheadBufferSize)
{
this.channel = channel;
this.bufferHolder = new ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
this.readAheadBuffer = new ThreadLocalReadAheadBuffer(channel, readAheadBufferSize, metadata.compressor().preferredBufferType());
}

@Override
public ByteBuffer read(CompressionMetadata.Chunk chunk, boolean shouldCheckCrc) throws CorruptBlockException
{
int length = shouldCheckCrc ? chunk.length + Integer.BYTES // compressed length + checksum length
: chunk.length;
ByteBuffer compressed = bufferHolder.getBuffer(length);

int copied = 0;
while (copied < length) {
Copy link
Contributor

@netudima netudima Feb 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last idea, please fill free to ignore it :-)
we can move the whole loop into the readAheadBuffer logic and do a single thread local lookup for a block in such case (currently we do 3 or 4 lookups per loop iteration). It also will remove a duplicated code from a test.
like:

    public void read(ByteBuffer target, long offset, int length)
    {
        int copied = 0;
        Block block = getBlock();
        while (copied < length)
        {
            fill(block, offset + copied);
            int leftToRead = length - copied;
            if (block.buffer.remaining() >= leftToRead)
                copied += read(block, target, leftToRead);
            else
                copied += read(block, target, block.buffer.remaining());
        }
    }

readAheadBuffer.fill(chunk.offset + copied);
int leftToRead = length - copied;
if (readAheadBuffer.remaining() >= leftToRead)
copied += readAheadBuffer.read(compressed, leftToRead);
else
copied += readAheadBuffer.read(compressed, readAheadBuffer.remaining());
}

compressed.flip();
compressed.limit(chunk.length);

if (shouldCheckCrc)
{
int checksum = (int) ChecksumType.CRC32.of(compressed);
compressed.limit(length);
if (compressed.getInt() != checksum)
throw new CorruptBlockException(channel.filePath(), chunk);
compressed.position(0).limit(chunk.length);
}
return compressed;
}

@Override
public void allocateResources()
{
readAheadBuffer.getBlockBuffer();
}

@Override
public void deallocateResources()
{
readAheadBuffer.clear(true);
}

@Override
public boolean allocated()
{
return readAheadBuffer.hasBuffer();
}

public void close()
{
readAheadBuffer.close();
}
}

public static class Standard extends CompressedChunkReader
{

private final CompressedReader reader;
private final CompressedReader scanReader;

public Standard(ChannelProxy channel, CompressionMetadata metadata, Supplier<Double> crcCheckChanceSupplier)
{
super(channel, metadata, crcCheckChanceSupplier);
bufferHolder = new ThreadLocalByteBufferHolder(metadata.compressor().preferredBufferType());
reader = new RandomAccessCompressedReader(channel, metadata);

int readAheadBufferSize = DatabaseDescriptor.getCompressedReadAheadBufferSize();
scanReader = (readAheadBufferSize > 0 && readAheadBufferSize > metadata.chunkLength())
? new ScanCompressedReader(channel, metadata, readAheadBufferSize) : null;

}

protected CompressedChunkReader forScan()
{
if (scanReader != null)
scanReader.allocateResources();

return this;
}

@Override
public void releaseUnderlyingResources()
{
if (scanReader != null)
scanReader.deallocateResources();
}

@Override
Expand All @@ -110,31 +264,13 @@ public void readChunk(long position, ByteBuffer uncompressed)

CompressionMetadata.Chunk chunk = metadata.chunkFor(position);
boolean shouldCheckCrc = shouldCheckCrc();
int length = shouldCheckCrc ? chunk.length + Integer.BYTES // compressed length + checksum length
: chunk.length;

CompressedReader readFrom = (scanReader != null && scanReader.allocated()) ? scanReader : reader;
if (chunk.length < maxCompressedLength)
{
ByteBuffer compressed = bufferHolder.getBuffer(length);

if (channel.read(compressed, chunk.offset) != length)
throw new CorruptBlockException(channel.filePath(), chunk);

compressed.flip();
compressed.limit(chunk.length);
ByteBuffer compressed = readFrom.read(chunk, shouldCheckCrc);
uncompressed.clear();

if (shouldCheckCrc)
{
int checksum = (int) ChecksumType.CRC32.of(compressed);

compressed.limit(length);
if (compressed.getInt() != checksum)
throw new CorruptBlockException(channel.filePath(), chunk);

compressed.position(0).limit(chunk.length);
}

try
{
metadata.compressor().uncompress(compressed, uncompressed);
Expand All @@ -155,10 +291,9 @@ public void readChunk(long position, ByteBuffer uncompressed)
uncompressed.flip();
int checksum = (int) ChecksumType.CRC32.of(uncompressed);

ByteBuffer scratch = bufferHolder.getBuffer(Integer.BYTES);

ByteBuffer scratch = ByteBuffer.allocate(Integer.BYTES);
if (channel.read(scratch, chunk.offset + chunk.length) != Integer.BYTES
|| scratch.getInt(0) != checksum)
|| scratch.getInt(0) != checksum)
throw new CorruptBlockException(channel.filePath(), chunk);
}
}
Expand All @@ -171,6 +306,16 @@ public void readChunk(long position, ByteBuffer uncompressed)
throw new CorruptSSTableException(e, channel.filePath());
}
}

@Override
public void close()
{
reader.close();
if (scanReader != null)
scanReader.close();

super.close();
}
}

public static class Mmap extends CompressedChunkReader
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/io/util/EmptyRebufferer.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void closeReader()
}

@Override
public Rebufferer instantiateRebufferer()
public Rebufferer instantiateRebufferer(boolean isScan)
{
return this;
}
Expand Down
Loading