-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC4J-606 DFSClient Allow ReadBuffer size to be set #788
base: candidate-9.10.x
Are you sure you want to change the base?
Conversation
- Created new CircularByteBuffer class - Created CircularByteBufferTest - Made changes to allow read buffer size to be smaller than the read request size Signed-off-by: James McMullan [email protected]
Jira Issue: https://hpccsystems.atlassian.net/browse/HPCC4J-606 Jirabot Action Result: |
dfsclient/src/main/java/org/hpccsystems/dfs/client/CircularByteBuffer.java
Fixed
Show resolved
Hide resolved
@@ -0,0 +1,217 @@ | |||
/******************************************************************************* | |||
* HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2025?
|
||
public CircularByteBuffer(int bufferSize) | ||
{ | ||
buffer = new byte[bufferSize]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are 0 and negative ints ok here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we consider an upper limit as well?
buffer = new byte[bufferSize]; | ||
} | ||
|
||
public int getCurrentNumberOfBytes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the meaning of current bytes? I assume this is the number of occupied bytes ? is there a more self documenting name for the method and the member?
return getSpace() > 0; | ||
} | ||
|
||
public int getSpace() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getAvailableByteCount?
{ | ||
if (currentNumberOfBytes + length > buffer.length) | ||
{ | ||
throw new IOException("Not enough space available"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who is the target of the message, and will this be enough information to trivially identify the problem, and formulate a solution?
@@ -1190,28 +1179,6 @@ private void readDataInFetch() | |||
// Loop here while data is being consumed quickly enough | |||
while (remainingDataInCurrentRequest > 0) | |||
{ | |||
if (CompileTimeConstants.PROFILE_CODE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what replaces this removed logic? L1219?
dis.readFully(this.readBuffer, currentBufferLen, bytesToRead); | ||
synchronized (readBuffer) | ||
{ | ||
// int contiguousCapacity = readBuffer.getContiguousSpace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we plan to remove the comments?
{ | ||
availBytes = readBuffer.getCurrentNumberOfBytes(); | ||
} | ||
|
||
// Do the check for closed first here to avoid data races |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this old comment mean you shouldn't getcurrentnumberofbytes first?
@@ -0,0 +1,163 @@ | |||
/******************************************************************************* | |||
* HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor, but 2025?
import org.junit.experimental.categories.Category; | ||
|
||
@Category(org.hpccsystems.commons.annotations.RemoteTests.class) | ||
@FixMethodOrder(MethodSorters.NAME_ASCENDING) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just confirming we need to execute the tests in alphabetical order
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jpmcmu we're very close, I had a couple of questions.
@@ -737,6 +738,15 @@ private void readIntoScratchBuffer(int offset, int dataLen) throws IOException | |||
throw e; | |||
} | |||
|
|||
if (bytesRead == 0) | |||
{ | |||
try |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this just hoping we'll start getting data after the sleep?
Also, is it possible to get stuck in this loop if it doesn't receive any data for a long time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could get stuck in this loop until the socket timeout kicks in. After which the call to read() will either throw an exception or return -1 which will cause an exception to be thrown. (Which case depends on how the socket was closed.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To Rodrigo's point should add an indicator / counter if we get stuck in this loop for a while
|
||
public CircularByteBuffer(int bufferSize) | ||
{ | ||
buffer = new byte[bufferSize]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we consider an upper limit as well?
if (availBytes == 0) | ||
{ | ||
String prefix = "RowServiceInputStream.available(), file " + dataPart.getFileName() + " part " + dataPart.getThisPart() + " on IP " + getIP() + ":"; | ||
|
||
// this.bufferWriteMutex.release(); | ||
IOException wrappedException = new IOException(prefix + "End of input stream, bufferLen:" + bufferLen + ", this.readPos:" + this.readPos + ", availableBytes=0"); | ||
IOException wrappedException = new IOException(prefix + "End of input stream"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this enough information for debugger to understand the problem and how to fix?
@@ -1583,50 +1508,11 @@ public void close() throws IOException | |||
@Override | |||
public void mark(int readLim) | |||
{ | |||
if (CompileTimeConstants.PROFILE_CODE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we dropping all profiling code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the other profiling code will remain, I removed code that tracking if the buffer mutex was being held for too long because it was no longer useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jpmcmu approved
Signed-off-by: James McMullan [email protected]
Type of change:
Checklist:
Testing: