Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSANDRA-15452: Read Ahead for Compaction, Repair, and Range Queries #3606

Open
wants to merge 8 commits into
base: cassandra-5.0
Choose a base branch
from

Conversation

jrwest
Copy link
Contributor

@jrwest jrwest commented Oct 8, 2024

No description provided.

@jrwest jrwest requested a review from maedhroz October 8, 2024 23:01
@jrwest jrwest force-pushed the jwest/15452-5.0 branch 2 times, most recently from 2bd7757 to 9f815c8 Compare October 9, 2024 17:03
@Override
public CompressedChunkReader setMode(boolean isScan)
{
if (readAheadBuffer != null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Quick comment on the fact that isScan is there so we don't try to use the RAB on point reads

int blockNo = (int) (realPosition / blockLength);
long blockPosition = blockNo * blockLength;

if (storedBlockNo.get() != blockNo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Quick comment on this being here/necessary to skip when we've already read 256KiB worth of 16KiB chunks and just need to advance blockBuffer


private final FastThreadLocal<ByteBuffer> blockBufferHolder;

private final FastThreadLocal<Integer> storedBlockNo;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bike-shedding: could do something like currentBlock to indicate this is kind of a pointer/cursor, but not hard to figure out what it is either way after looking for 2 minutes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will rename

}

@Override
public CompressedChunkReader setMode(boolean isScan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style guide recommends to avoid boolean in favor of enums to help make the code more expressive

Copy link
Contributor Author

@jrwest jrwest Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will create a ReadMode enum in CompressedChunkReader

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How strongly do you feel about this? I would have to plumb it through several layers (up to RebuffererFactory)

@@ -117,8 +141,23 @@ public void readChunk(long position, ByteBuffer uncompressed)
{
ByteBuffer compressed = bufferHolder.getBuffer(length);

if (channel.read(compressed, chunk.offset) != length)
throw new CorruptBlockException(channel.filePath(), chunk);
if (readAheadBuffer != null && readAheadBuffer.hasBuffer())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you only handle this block? There are 2 blocks that logically would want to do this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcapwell apologies, not sure I totally grok this one either. Do you mean why do we only read one block at a time?

{
int copied = 0;
while (copied < length) {
readAheadBuffer.fill(chunk.offset + copied);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels dangerous as you don't track the offsets... you are asked for position, and you assume the buffer is read from and mutated... this feels like it would be very easy to cause data corruption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcapwell open to suggested improvements. not totally sure I understand what you mean by "track" the offsets. i largely based it on the existing code that reads directly from the FileChannel and how the compressed buffer is treated in the original code to minimize changes.

else
copied += readAheadBuffer.read(compressed, readAheadBuffer.remaining());
}
} else
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
} else
}
else

if (readAheadBuffer != null && readAheadBuffer.hasBuffer())
{
int copied = 0;
while (copied < length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if I do the following test everything should blow up

  1. read ahead of 4k
  2. chunk size of 64k

I don't see any place where you check that the amount of data to read is safe to read?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There need to be configuration checks to make sure that read ahead is either disabled or some minimum multiple of chunk size, right? Even if your situation could be made to not explode, it’s non-sensical?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that brings up an interesting point in terms of validation. the read ahead buffer size is a cassandra.yaml setting but chunk size is table level. Some thoughts/questions:

  • Do we make the buffer size a table level setting with a default in the YAML? I don't love it as a table level parameter because its more driven by the disks than the data shape.
  • if we leave it how it is what do we do if someone creates or modifies a table such that chunk size > buffer size. This should be rare because buffer size should be 64k or greate but still we have to handle it. One option I see is to log and warn.

Thoughts @maedhroz @dcapwell @rustyrazorblade

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 to making it a table level option. No benefit and adds unnecessary complexity.

I think the problem here stems from allowing the user to specify a small buffer size. It doesn't make sense to use a buffer less than 256KB. This isn't just because of the way EBS works (mentioned in the ticket) but because of the way every disk works.

The buffer should be > chunk size but also >= 256 and using less is an unnecessary configuration option that introduces both edge cases and sub-optimal performance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rustyrazorblade @maedhroz would you prefer a boolean to turn it on and off and a fixed buffer size or validation to ensure its >= 256? if we go with the the latter we still have the chunk size / buffer size validation challenge because the max chunk size is 2GB

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya unfortunately whether people use it, which would be surprising, we allow it so we have to handle it. I do like having it be a size. I also like ensuring it’s not smaller than some value. And I think I agree if the buffer size is < chunk size just don’t take the new code path (as chunk size will be sufficient anyways since it’s so large in that case). Only open question then is do we want to log when it happens? And do we log when the table metadata is changed or when the scanner is created (I lean towards the former)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... hard for me to say. I was previously under the impression that there wasn't a good reason to use a larger size for LZ4 because it had some upper limit on what it would actually compress, but when I looked at the docs it gave me the impression that it can handle fairly large buffers, so maybe there's a good reason to use it. If you've got some fairly large partitions and you're only slicing them, there could be a benefit to it. I lean towards no warning, but if you feel strongly about it I'm not opposed. I don't know what action a user would take if they have a valid use case for the large buffer, and there shouldn't be any real perf loss from not hitting the internal RA code path if the reads are large enough.

Copy link
Contributor

@rustyrazorblade rustyrazorblade Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you prefer a boolean to turn it on and off and a fixed buffer size.

I think it could be useful to test larger sizes. We might find that a 1MB buffer works a little better in some situations, such as spinning disks or a slower NAS / SAN. The higher the latency the bigger the buffer, if you will.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok i think we will go with:

  • config is a buffer size we validate to be >= 256k
  • if chunk size >= buffer size we will just skip the scan reader silently bc its not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also made it a hot property

@jrwest jrwest force-pushed the jwest/15452-5.0 branch 2 times, most recently from e11affc to 5fe763c Compare December 27, 2024 01:41

private final FastThreadLocal<ByteBuffer> blockBufferHolder;

private final FastThreadLocal<Integer> currentBlock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is about grouping currentBlock and blockBufferHolder values into a POJO stored in a single thread local and do a single thread local lookup instead of 2? It will also save one thread local set to update currentBlock value + we will be able to use int instead of Integer for value and avoid boxing types..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that. Will take a stab at it hopefully later today and get another test run going.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@netudima added a83fae3 if you want to take a look. Still need to address your other comment if you prefer to wait.

this.channel = channel;
this.bufferSize = bufferSize;
this.bufferType = bufferType;
blockBufferHolders.putIfAbsent(channel.filePath(), new FastThreadLocal<>()
Copy link
Contributor

@netudima netudima Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I afraid we have a kind of leak here: for every constructor call we create a new instance of FastThreadLocal. FastThreadLocal allocates an index (to store itself in a thread-local array): io.netty.util.concurrent.FastThreadLocal#index
Eventually we will consume all values for this int and get IllegalStateException("too many thread-local indexed variables") from io.netty.util.internal.InternalThreadLocalMap#nextVariableIndex

Dynamic allocation of FastThreadLocal instances is a dangerous thing, probably we should reconsider the logic in a way to use only a static number of FastThreadLocal instances..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you actually hit that error testing this or is theoretical? Let me think about cleaning this up. I was looking at it yesterday as I addressed your other comment ironically.

Thinking about it out loud: For the case where the thread local is absent we really need the thread local created so its ok in that case. For the case where the thread local is not absent the created thread local instance would immediately be a candidate for garbage collection after the constructor exits. We also clean up the map when a scanner / file is closed (although last night I was also noting I should double check this logic for leaks unless this is what you are referring to). So we would need Integer.MAX_VALUE files open (or a true leak) to hit the case you are talking about.

Thinking outloud more: we could allocate it for the first time when we allocate the buffer instead of in the constructor....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io.netty.util.internal.InternalThreadLocalMap#nextVariableIndex is only incrementing, so it does not matter how many FastThreadLocal instances we have alive (non-GCed).
The following check:

            for (long i = 0; i < Integer.MAX_VALUE + 1L; i++)
            {
                new FastThreadLocal<>();
            }

eventually produces:

java.lang.IllegalStateException: too many thread-local indexed variables

	at io.netty.util.internal.InternalThreadLocalMap.nextVariableIndex(InternalThreadLocalMap.java:140)
	at io.netty.util.concurrent.FastThreadLocal.<init>(FastThreadLocal.java:128)
	at org.apache.cassandra.io.util.ThreadLocalReadAheadBufferTest.test(ThreadLocalReadAheadBufferTest.java:43)

Copy link
Contributor

@netudima netudima Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud about it too :-)
From one side, It does not look like a leak which will kill the server.. If we create ThreadLocalReadAheadBuffer instance only when we open CompressedChunkReader for an SSTable and let's assume it happens even every second, then to consume all index values it will take: MAX_INT / (3600 * 24 * 365) ~ 68 years.
Originally, I thought that we create ThreadLocalReadAheadBuffer for every read access and the creation rate ca be very high but I was wrong.

From another side, every time when we increment the index and then use FastThreadLocal to set a value we also expand the capacity of io.netty.util.internal.InternalThreadLocalMap#indexedVariables array. So, it will be a kind of slow memory leak through extending of this array which is kept permanently by every thread which accesses the FastThreadLocal..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A possible way in case of static FastThreadLocal:
static FastThreadLocal<Map<ThreadLocalReadAheadBuffer, BufferState>> threadLocalBufferState
threadLocalBufferState.get().get(this)
the Map can be non thread-safe
BufferState - the POJO which I suggested previously
the downside of this approach - an extra hash map lookup for every buffer access.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with all of the above. It would take a LONG time to hit it but since we've identified it lets try to address it by adjusting where the thread local is. I will try these proposals later this week (likely thursday or friday). I also have changes for @netudima's other comment locally I will push by then as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 133fd90

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought: do we have any case when the same thread opens the same SSTable for read more than once before closing the previous attempt? If it is possible then filepath as a map key can be unsafe..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wouldn't be unsafe it would just be a waste of reading as we would throw away a block (because we always fill before we read). I don't believe there is a case like that anyways as each thread opens a single data reader for each SST.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right, even if close is invoked we still re-create a buffer on fill step, so it is safe even if it happened


private static File writeFile(int seed, int length)
{
String fileName = JAVA_IO_TMPDIR.getString() + "data+" + length + ".bin";
Copy link
Contributor

@netudima netudima Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I were very-very unlucky :-) and got the same random number twice while running the test, in this case the test fails due to repeating file names, it would be better to use a sequence to name the files:

at org.apache.cassandra.io.util.ThreadLocalReadAheadBufferTest.cleanup(ThreadLocalReadAheadBufferTest.java:67)
...
Caused by: java.nio.file.NoSuchFileException: /var/folders/d2/pzjmpmzd71j7qp55ry4b98qw0000gn/T/data+2097151.bin
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:244)
	at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
	at java.nio.file.Files.delete(Files.java:1126)
	at org.apache.cassandra.io.util.PathUtils.delete(PathUtils.java:246)
	... 21 more

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could just catch and ignore this exception. IF we get two files with the same length we just have a bit less entropy in the test because there will be one less file. we just need to account for it happening.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 38d47aa

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think catching and ignoring the exception is ok. I would suggest also to log seed used by the Random to be able to reproduce other test errors if we get them in future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added log

{
String fileName = JAVA_IO_TMPDIR.getString() + "data+" + length + ".bin";

byte[] data = new byte[(int) length];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just an idea: it could be a bit safer from OOM point of view and slightly faster to use a constant size buffer to fill the test data, like:

        byte[] dataChunk = new byte[4096 * 8];
        java.util.Random random = new Random(seed);
        int writtenData = 0;

        File file = new File(fileName);
        try (FileOutputStream fos = new FileOutputStream(file.toJavaIOFile()))
        {
            while (writtenData < length)
            {
                random.nextBytes(dataChunk);
                int toWrite = Math.min((length - writtenData), dataChunk.length);
                fos.write(dataChunk, 0, toWrite);
                writtenData += toWrite;
            }
            fos.flush();
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 38d47aa

{
Block block = getBlock();
ByteBuffer blockBuffer = block.buffer;
long channelSize = channel.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

channel.size() causes a system call, assuming that we have a read-only file we can get it once and store as a field of ThreadLocalReadAheadBuffer

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see 38d47aa

ByteBuffer compressed = bufferHolder.getBuffer(length);

int copied = 0;
while (copied < length) {
Copy link
Contributor

@netudima netudima Feb 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last idea, please fill free to ignore it :-)
we can move the whole loop into the readAheadBuffer logic and do a single thread local lookup for a block in such case (currently we do 3 or 4 lookups per loop iteration). It also will remove a duplicated code from a test.
like:

    public void read(ByteBuffer target, long offset, int length)
    {
        int copied = 0;
        Block block = getBlock();
        while (copied < length)
        {
            fill(block, offset + copied);
            int leftToRead = length - copied;
            if (block.buffer.remaining() >= leftToRead)
                copied += read(block, target, leftToRead);
            else
                copied += read(block, target, block.buffer.remaining());
        }
    }

@netudima
Copy link
Contributor

netudima commented Feb 2, 2025

LGTM (non-binding). Actually, I have taken this logic to my fork for 4.1 to get this feature earlier. @jrwest thank you a lot for addressing the comments !

Copy link
Contributor

@maedhroz maedhroz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a quick conversation on a couple items w/ @jrwest, but assuming those are tied up, LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants