-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CASSANDRA-15452: Read Ahead for Compaction, Repair, and Range Queries #3606
base: cassandra-5.0
Are you sure you want to change the base?
Conversation
2bd7757
to
9f815c8
Compare
@Override | ||
public CompressedChunkReader setMode(boolean isScan) | ||
{ | ||
if (readAheadBuffer != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Quick comment on the fact that isScan
is there so we don't try to use the RAB on point reads
int blockNo = (int) (realPosition / blockLength); | ||
long blockPosition = blockNo * blockLength; | ||
|
||
if (storedBlockNo.get() != blockNo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Quick comment on this being here/necessary to skip when we've already read 256KiB worth of 16KiB chunks and just need to advance blockBuffer
|
||
private final FastThreadLocal<ByteBuffer> blockBufferHolder; | ||
|
||
private final FastThreadLocal<Integer> storedBlockNo; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bike-shedding: could do something like currentBlock
to indicate this is kind of a pointer/cursor, but not hard to figure out what it is either way after looking for 2 minutes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename
} | ||
|
||
@Override | ||
public CompressedChunkReader setMode(boolean isScan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: style guide recommends to avoid boolean
in favor of enums to help make the code more expressive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will create a ReadMode
enum in CompressedChunkReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How strongly do you feel about this? I would have to plumb it through several layers (up to RebuffererFactory
)
@@ -117,8 +141,23 @@ public void readChunk(long position, ByteBuffer uncompressed) | |||
{ | |||
ByteBuffer compressed = bufferHolder.getBuffer(length); | |||
|
|||
if (channel.read(compressed, chunk.offset) != length) | |||
throw new CorruptBlockException(channel.filePath(), chunk); | |||
if (readAheadBuffer != null && readAheadBuffer.hasBuffer()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you only handle this block? There are 2 blocks that logically would want to do this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dcapwell apologies, not sure I totally grok this one either. Do you mean why do we only read one block at a time?
{ | ||
int copied = 0; | ||
while (copied < length) { | ||
readAheadBuffer.fill(chunk.offset + copied); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this feels dangerous as you don't track the offsets... you are asked for position
, and you assume the buffer is read from and mutated... this feels like it would be very easy to cause data corruption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dcapwell open to suggested improvements. not totally sure I understand what you mean by "track" the offsets. i largely based it on the existing code that reads directly from the FileChannel
and how the compressed
buffer is treated in the original code to minimize changes.
else | ||
copied += readAheadBuffer.read(compressed, readAheadBuffer.remaining()); | ||
} | ||
} else |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} else | |
} | |
else |
if (readAheadBuffer != null && readAheadBuffer.hasBuffer()) | ||
{ | ||
int copied = 0; | ||
while (copied < length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I do the following test everything should blow up
- read ahead of 4k
- chunk size of 64k
I don't see any place where you check that the amount of data to read is safe to read?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There need to be configuration checks to make sure that read ahead is either disabled or some minimum multiple of chunk size, right? Even if your situation could be made to not explode, it’s non-sensical?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that brings up an interesting point in terms of validation. the read ahead buffer size is a cassandra.yaml setting but chunk size is table level. Some thoughts/questions:
- Do we make the buffer size a table level setting with a default in the YAML? I don't love it as a table level parameter because its more driven by the disks than the data shape.
- if we leave it how it is what do we do if someone creates or modifies a table such that chunk size > buffer size. This should be rare because buffer size should be 64k or greate but still we have to handle it. One option I see is to log and warn.
Thoughts @maedhroz @dcapwell @rustyrazorblade
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1 to making it a table level option. No benefit and adds unnecessary complexity.
I think the problem here stems from allowing the user to specify a small buffer size. It doesn't make sense to use a buffer less than 256KB. This isn't just because of the way EBS works (mentioned in the ticket) but because of the way every disk works.
The buffer should be > chunk size but also >= 256 and using less is an unnecessary configuration option that introduces both edge cases and sub-optimal performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rustyrazorblade @maedhroz would you prefer a boolean to turn it on and off and a fixed buffer size or validation to ensure its >= 256? if we go with the the latter we still have the chunk size / buffer size validation challenge because the max chunk size is 2GB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya unfortunately whether people use it, which would be surprising, we allow it so we have to handle it. I do like having it be a size. I also like ensuring it’s not smaller than some value. And I think I agree if the buffer size is < chunk size just don’t take the new code path (as chunk size will be sufficient anyways since it’s so large in that case). Only open question then is do we want to log when it happens? And do we log when the table metadata is changed or when the scanner is created (I lean towards the former)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... hard for me to say. I was previously under the impression that there wasn't a good reason to use a larger size for LZ4 because it had some upper limit on what it would actually compress, but when I looked at the docs it gave me the impression that it can handle fairly large buffers, so maybe there's a good reason to use it. If you've got some fairly large partitions and you're only slicing them, there could be a benefit to it. I lean towards no warning, but if you feel strongly about it I'm not opposed. I don't know what action a user would take if they have a valid use case for the large buffer, and there shouldn't be any real perf loss from not hitting the internal RA code path if the reads are large enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would you prefer a boolean to turn it on and off and a fixed buffer size.
I think it could be useful to test larger sizes. We might find that a 1MB buffer works a little better in some situations, such as spinning disks or a slower NAS / SAN. The higher the latency the bigger the buffer, if you will.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok i think we will go with:
- config is a buffer size we validate to be >= 256k
- if chunk size >= buffer size we will just skip the scan reader silently bc its not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also made it a hot property
53ba213
to
24b938b
Compare
e11affc
to
5fe763c
Compare
Patch by Jordan West, Jon Haddad; Reviewed by David Capwell, Caleb Rackliffe for CASSANDRA-15452
5fe763c
to
1739afa
Compare
|
||
private final FastThreadLocal<ByteBuffer> blockBufferHolder; | ||
|
||
private final FastThreadLocal<Integer> currentBlock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is about grouping currentBlock and blockBufferHolder values into a POJO stored in a single thread local and do a single thread local lookup instead of 2? It will also save one thread local set to update currentBlock value + we will be able to use int instead of Integer for value and avoid boxing types..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that. Will take a stab at it hopefully later today and get another test run going.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.channel = channel; | ||
this.bufferSize = bufferSize; | ||
this.bufferType = bufferType; | ||
blockBufferHolders.putIfAbsent(channel.filePath(), new FastThreadLocal<>() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I afraid we have a kind of leak here: for every constructor call we create a new instance of FastThreadLocal. FastThreadLocal allocates an index (to store itself in a thread-local array): io.netty.util.concurrent.FastThreadLocal#index
Eventually we will consume all values for this int and get IllegalStateException("too many thread-local indexed variables") from io.netty.util.internal.InternalThreadLocalMap#nextVariableIndex
Dynamic allocation of FastThreadLocal instances is a dangerous thing, probably we should reconsider the logic in a way to use only a static number of FastThreadLocal instances..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you actually hit that error testing this or is theoretical? Let me think about cleaning this up. I was looking at it yesterday as I addressed your other comment ironically.
Thinking about it out loud: For the case where the thread local is absent we really need the thread local created so its ok in that case. For the case where the thread local is not absent the created thread local instance would immediately be a candidate for garbage collection after the constructor exits. We also clean up the map when a scanner / file is closed (although last night I was also noting I should double check this logic for leaks unless this is what you are referring to). So we would need Integer.MAX_VALUE
files open (or a true leak) to hit the case you are talking about.
Thinking outloud more: we could allocate it for the first time when we allocate the buffer instead of in the constructor....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
io.netty.util.internal.InternalThreadLocalMap#nextVariableIndex is only incrementing, so it does not matter how many FastThreadLocal instances we have alive (non-GCed).
The following check:
for (long i = 0; i < Integer.MAX_VALUE + 1L; i++)
{
new FastThreadLocal<>();
}
eventually produces:
java.lang.IllegalStateException: too many thread-local indexed variables
at io.netty.util.internal.InternalThreadLocalMap.nextVariableIndex(InternalThreadLocalMap.java:140)
at io.netty.util.concurrent.FastThreadLocal.<init>(FastThreadLocal.java:128)
at org.apache.cassandra.io.util.ThreadLocalReadAheadBufferTest.test(ThreadLocalReadAheadBufferTest.java:43)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud about it too :-)
From one side, It does not look like a leak which will kill the server.. If we create ThreadLocalReadAheadBuffer instance only when we open CompressedChunkReader for an SSTable and let's assume it happens even every second, then to consume all index values it will take: MAX_INT / (3600 * 24 * 365) ~ 68 years.
Originally, I thought that we create ThreadLocalReadAheadBuffer for every read access and the creation rate ca be very high but I was wrong.
From another side, every time when we increment the index and then use FastThreadLocal to set a value we also expand the capacity of io.netty.util.internal.InternalThreadLocalMap#indexedVariables array. So, it will be a kind of slow memory leak through extending of this array which is kept permanently by every thread which accesses the FastThreadLocal..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A possible way in case of static FastThreadLocal:
static FastThreadLocal<Map<ThreadLocalReadAheadBuffer, BufferState>> threadLocalBufferState
threadLocalBufferState.get().get(this)
the Map can be non thread-safe
BufferState - the POJO which I suggested previously
the downside of this approach - an extra hash map lookup for every buffer access.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with all of the above. It would take a LONG time to hit it but since we've identified it lets try to address it by adjusting where the thread local is. I will try these proposals later this week (likely thursday or friday). I also have changes for @netudima's other comment locally I will push by then as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See 133fd90
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought: do we have any case when the same thread opens the same SSTable for read more than once before closing the previous attempt? If it is possible then filepath as a map key can be unsafe..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wouldn't be unsafe it would just be a waste of reading as we would throw away a block (because we always fill before we read). I don't believe there is a case like that anyways as each thread opens a single data reader for each SST.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, you are right, even if close is invoked we still re-create a buffer on fill step, so it is safe even if it happened
|
||
private static File writeFile(int seed, int length) | ||
{ | ||
String fileName = JAVA_IO_TMPDIR.getString() + "data+" + length + ".bin"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I were very-very unlucky :-) and got the same random number twice while running the test, in this case the test fails due to repeating file names, it would be better to use a sequence to name the files:
at org.apache.cassandra.io.util.ThreadLocalReadAheadBufferTest.cleanup(ThreadLocalReadAheadBufferTest.java:67)
...
Caused by: java.nio.file.NoSuchFileException: /var/folders/d2/pzjmpmzd71j7qp55ry4b98qw0000gn/T/data+2097151.bin
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:244)
at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
at java.nio.file.Files.delete(Files.java:1126)
at org.apache.cassandra.io.util.PathUtils.delete(PathUtils.java:246)
... 21 more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could just catch and ignore this exception. IF we get two files with the same length we just have a bit less entropy in the test because there will be one less file. we just need to account for it happening.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See 38d47aa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I think catching and ignoring the exception is ok. I would suggest also to log seed used by the Random to be able to reproduce other test errors if we get them in future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added log
{ | ||
String fileName = JAVA_IO_TMPDIR.getString() + "data+" + length + ".bin"; | ||
|
||
byte[] data = new byte[(int) length]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just an idea: it could be a bit safer from OOM point of view and slightly faster to use a constant size buffer to fill the test data, like:
byte[] dataChunk = new byte[4096 * 8];
java.util.Random random = new Random(seed);
int writtenData = 0;
File file = new File(fileName);
try (FileOutputStream fos = new FileOutputStream(file.toJavaIOFile()))
{
while (writtenData < length)
{
random.nextBytes(dataChunk);
int toWrite = Math.min((length - writtenData), dataChunk.length);
fos.write(dataChunk, 0, toWrite);
writtenData += toWrite;
}
fos.flush();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See 38d47aa
{ | ||
Block block = getBlock(); | ||
ByteBuffer blockBuffer = block.buffer; | ||
long channelSize = channel.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see 38d47aa
ByteBuffer compressed = bufferHolder.getBuffer(length); | ||
|
||
int copied = 0; | ||
while (copied < length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the last idea, please fill free to ignore it :-)
we can move the whole loop into the readAheadBuffer logic and do a single thread local lookup for a block in such case (currently we do 3 or 4 lookups per loop iteration). It also will remove a duplicated code from a test.
like:
public void read(ByteBuffer target, long offset, int length)
{
int copied = 0;
Block block = getBlock();
while (copied < length)
{
fill(block, offset + copied);
int leftToRead = length - copied;
if (block.buffer.remaining() >= leftToRead)
copied += read(block, target, leftToRead);
else
copied += read(block, target, block.buffer.remaining());
}
}
LGTM (non-binding). Actually, I have taken this logic to my fork for 4.1 to get this feature earlier. @jrwest thank you a lot for addressing the comments ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a quick conversation on a couple items w/ @jrwest, but assuming those are tied up, LGTM
No description provided.