Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CELEBORN-1846] Fix the StreamHandler usage in fetching chunk when task attempt is odd #3079

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import org.apache.celeborn.client.compress.Decompressor;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.network.client.TransportClient;
import org.apache.celeborn.common.network.client.TransportClientFactory;
import org.apache.celeborn.common.network.protocol.TransportMessage;
import org.apache.celeborn.common.protocol.*;
import org.apache.celeborn.common.unsafe.Platform;
import org.apache.celeborn.common.util.ExceptionMaker;
Expand Down Expand Up @@ -322,14 +324,10 @@ private boolean isExcluded(PartitionLocation location) {

private PartitionReader createReaderWithRetry(
PartitionLocation location, PbStreamHandler pbStreamHandler) throws IOException {
// For the first time, the location will be selected according to attemptNumber
if (fetchChunkRetryCnt == 0 && attemptNumber % 2 == 1 && location.hasPeer()) {
location = location.getPeer();
logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
}
Exception lastException = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, It would be better keep this, switch peers based on the attemptNumber, may avoid failure PartitionLocation previously.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a problem with the primary location, then it is likely that it has already been changed to the peer in the last task attempt but still failed. In this case, it is not so relevant which one to use to start fetching in the new task attempt.
If there is no problem with the primary location and the task is retried due to other problems, this situation is even less relevant.
So I think we could always fetch chunk by starting from the primary location. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should open streams for both primary and replica locations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a problem with the primary location, then it is likely that it has already been changed to the peer in the last task attempt but still failed. In this case, it is not so relevant which one to use to start fetching in the new task attempt. If there is no problem with the primary location and the task is retried due to other problems, this situation is even less relevant. So I think we could always fetch chunk by starting from the primary location. WDYT?

Sound reasonable. And if we change location to peer here would cause pbStreamHandler and location inconsistent when createReader,there may be issues in some shuffle scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should open streams for both primary and replica locations?

This would be a bit wasteful because most tasks need not change to replica location if the cluster is stable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should open streams for both primary and replica locations?

This would be a bit wasteful because most tasks need not change to replica location if the cluster is stable.

Sounds reasonable.

while (fetchChunkRetryCnt < fetchChunkMaxRetry) {
try {
logger.debug("Create reader for location {}", location);
if (isExcluded(location)) {
throw new CelebornIOException("Fetch data from excluded worker! " + location);
}
Expand All @@ -350,6 +348,28 @@ private PartitionReader createReaderWithRetry(
fetchChunkMaxRetry,
location,
e);
if (pbStreamHandler != null) {
try {
TransportClient client =
clientFactory.createClient(location.getHost(), location.getFetchPort());
TransportMessage bufferStreamEnd =
new TransportMessage(
MessageType.BUFFER_STREAM_END,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need send BUFFER STREAM END to replicate location?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what will happen if we send buffer stream end message to a celeborn worker which did not open the stream. Because we may reach here when location is excluded without open stream first.

PbBufferStreamEnd.newBuilder()
.setStreamType(StreamType.ChunkStream)
.setStreamId(pbStreamHandler.getStreamId())
.build()
.toByteArray());
client.sendRpc(bufferStreamEnd.toByteBuffer());
} catch (InterruptedException | IOException ex) {
logger.warn(
"Close {} stream {} failed",
location.hostAndFetchPort(),
pbStreamHandler.getStreamId(),
ex);
}
pbStreamHandler = null;
}
location = location.getPeer();
} else {
logger.warn(
Expand Down Expand Up @@ -422,7 +442,6 @@ private PartitionReader createReader(
int fetchChunkRetryCnt,
int fetchChunkMaxRetry)
throws IOException, InterruptedException {
logger.debug("Create reader for location {}", location);

StorageInfo storageInfo = location.getStorageInfo();
switch (storageInfo.getType()) {
Expand Down
Loading