-
Notifications
You must be signed in to change notification settings - Fork 375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1846] Fix the StreamHandler usage in fetching chunk when task attempt is odd #3079
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,7 +38,9 @@ | |
import org.apache.celeborn.client.compress.Decompressor; | ||
import org.apache.celeborn.common.CelebornConf; | ||
import org.apache.celeborn.common.exception.CelebornIOException; | ||
import org.apache.celeborn.common.network.client.TransportClient; | ||
import org.apache.celeborn.common.network.client.TransportClientFactory; | ||
import org.apache.celeborn.common.network.protocol.TransportMessage; | ||
import org.apache.celeborn.common.protocol.*; | ||
import org.apache.celeborn.common.unsafe.Platform; | ||
import org.apache.celeborn.common.util.ExceptionMaker; | ||
|
@@ -322,14 +324,10 @@ private boolean isExcluded(PartitionLocation location) { | |
|
||
private PartitionReader createReaderWithRetry( | ||
PartitionLocation location, PbStreamHandler pbStreamHandler) throws IOException { | ||
// For the first time, the location will be selected according to attemptNumber | ||
if (fetchChunkRetryCnt == 0 && attemptNumber % 2 == 1 && location.hasPeer()) { | ||
location = location.getPeer(); | ||
logger.debug("Read peer {} for attempt {}.", location, attemptNumber); | ||
} | ||
Exception lastException = null; | ||
while (fetchChunkRetryCnt < fetchChunkMaxRetry) { | ||
try { | ||
logger.debug("Create reader for location {}", location); | ||
if (isExcluded(location)) { | ||
throw new CelebornIOException("Fetch data from excluded worker! " + location); | ||
} | ||
|
@@ -351,6 +349,28 @@ private PartitionReader createReaderWithRetry( | |
location, | ||
e); | ||
location = location.getPeer(); | ||
if (pbStreamHandler != null) { | ||
try { | ||
TransportClient client = | ||
clientFactory.createClient(location.getHost(), location.getFetchPort()); | ||
TransportMessage bufferStreamEnd = | ||
new TransportMessage( | ||
MessageType.BUFFER_STREAM_END, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we need send BUFFER STREAM END to replicate location? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, fixed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder what will happen if we send buffer stream end message to a celeborn worker which did not open the stream. Because we may reach here when location is excluded without open stream first. |
||
PbBufferStreamEnd.newBuilder() | ||
.setStreamType(StreamType.ChunkStream) | ||
.setStreamId(pbStreamHandler.getStreamId()) | ||
.build() | ||
.toByteArray()); | ||
client.sendRpc(bufferStreamEnd.toByteBuffer()); | ||
} catch (InterruptedException | IOException ex) { | ||
logger.warn( | ||
"Close {} stream {} failed", | ||
location.hostAndFetchPort(), | ||
pbStreamHandler.getStreamId(), | ||
ex); | ||
} | ||
pbStreamHandler = null; | ||
} | ||
} else { | ||
logger.warn( | ||
"CreatePartitionReader failed {}/{} times for location {}, retry the same location", | ||
|
@@ -422,7 +442,6 @@ private PartitionReader createReader( | |
int fetchChunkRetryCnt, | ||
int fetchChunkMaxRetry) | ||
throws IOException, InterruptedException { | ||
logger.debug("Create reader for location {}", location); | ||
|
||
StorageInfo storageInfo = location.getStorageInfo(); | ||
switch (storageInfo.getType()) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, It would be better keep this, switch peers based on the attemptNumber, may avoid failure PartitionLocation previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is a problem with the primary location, then it is likely that it has already been changed to the peer in the last task attempt but still failed. In this case, it is not so relevant which one to use to start fetching in the new task attempt.
If there is no problem with the primary location and the task is retried due to other problems, this situation is even less relevant.
So I think we could always fetch chunk by starting from the primary location. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should open streams for both primary and replica locations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sound reasonable. And if we change location to peer here would cause pbStreamHandler and location inconsistent when createReader,there may be issues in some shuffle scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be a bit wasteful because most tasks need not change to replica location if the cluster is stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable.