Skip to content

Commit

Permalink
feat: implement fast-failover for MessageRecvManager and DataClientMa…
Browse files Browse the repository at this point in the history
…nager (#243)
  • Loading branch information
Radeity authored May 25, 2023
1 parent 31b63b2 commit f7d10a8
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ public void onChannelInactive(ConnectionId connectionId) {
@Override
public void exceptionCaught(TransportException cause,
ConnectionId connectionId) {
// TODO: implement failover
LOG.error("Channel for connectionId {} occurred exception",
connectionId, cause);
DataClientManager.this.connManager.closeClient(connectionId);
DataClientManager.this.sender.transportExceptionCaught(cause, connectionId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void onDone(Channel channel, ChannelFuture future) {
}
}

public void onSuccess(Channel channel, ChannelFuture future) {
public void onSuccess(Channel channel, ChannelFuture future) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully send data to '{}'",
TransportUtil.remoteAddress(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.hugegraph.computer.core.receiver;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.Constants;
Expand Down Expand Up @@ -60,7 +63,9 @@ public class MessageRecvManager implements Manager, MessageHandler {

private int workerCount;
private int expectedFinishMessages;
private CountDownLatch finishMessagesLatch;
private CompletableFuture<Void> finishMessagesFuture;
private AtomicInteger finishMessagesCount;

private long waitFinishMessagesTimeout;
private long superstep;

Expand All @@ -71,6 +76,7 @@ public MessageRecvManager(ComputerContext context,
this.fileManager = fileManager;
this.sortManager = sortManager;
this.superstep = Constants.INPUT_SUPERSTEP;
this.finishMessagesCount = new AtomicInteger();
}

@Override
Expand All @@ -90,8 +96,9 @@ public void init(Config config) {
this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT);
// One for vertex and one for edge.
this.expectedFinishMessages = this.workerCount * 2;
this.finishMessagesLatch = new CountDownLatch(
this.expectedFinishMessages);
this.finishMessagesFuture = new CompletableFuture<>();
this.finishMessagesCount.set(this.expectedFinishMessages);

this.waitFinishMessagesTimeout = config.get(
ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT);
}
Expand All @@ -103,8 +110,9 @@ public void beforeSuperstep(Config config, int superstep) {
this.messagePartitions = new ComputeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.expectedFinishMessages = this.workerCount;
this.finishMessagesLatch = new CountDownLatch(
this.expectedFinishMessages);
this.finishMessagesFuture = new CompletableFuture<>();
this.finishMessagesCount.set(this.expectedFinishMessages);

this.superstep = superstep;

if (this.superstep == Constants.INPUT_SUPERSTEP + 1) {
Expand Down Expand Up @@ -138,35 +146,21 @@ public void onChannelInactive(ConnectionId connectionId) {
@Override
public void exceptionCaught(TransportException cause,
ConnectionId connectionId) {
// TODO: implement failover
LOG.warn("Exception caught for connection:{}, root cause:",
LOG.error("Exception caught for connection:{}, root cause:",
connectionId, cause);
this.finishMessagesFuture.completeExceptionally(cause);
}

public void waitReceivedAllMessages() {
try {
boolean status = this.finishMessagesLatch.await(
this.waitFinishMessagesTimeout,
TimeUnit.MILLISECONDS);
if (!status) {
throw new ComputerException(
"Expect %s finish-messages received in %s ms, " +
"%s absence in superstep %s",
this.expectedFinishMessages,
this.waitFinishMessagesTimeout,
this.finishMessagesLatch.getCount(),
this.superstep);
}
} catch (InterruptedException e) {
throw new ComputerException(
"Thread is interrupted while waiting %s " +
"finish-messages received in %s ms, " +
"%s absence in superstep %s",
e,
this.expectedFinishMessages,
this.waitFinishMessagesTimeout,
this.finishMessagesLatch.getCount(),
this.superstep);
this.finishMessagesFuture.get(this.waitFinishMessagesTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new ComputerException("Time out while waiting %s finish-messages " +
"received in %s ms in superstep %s",
this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e);
} catch (InterruptedException | ExecutionException e) {
throw new ComputerException("Error while waiting %s finish-messages in superstep %s",
this.expectedFinishMessages, this.superstep, e);
}
}

Expand Down Expand Up @@ -214,7 +208,10 @@ public void onStarted(ConnectionId connectionId) {
@Override
public void onFinished(ConnectionId connectionId) {
LOG.debug("ConnectionId {} finished", connectionId);
this.finishMessagesLatch.countDown();
int currentCount = this.finishMessagesCount.decrementAndGet();
if (currentCount == 0) {
this.finishMessagesFuture.complete(null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void startSend(MessageType type) {
}

/**
* Finsih send message, send the last buffer and put an END signal
* Finish send message, send the last buffer and put an END signal
* into queue
* @param type the message type
*/
Expand Down Expand Up @@ -277,10 +277,10 @@ private void sendControlMessageToWorkers(Set<Integer> workerIds,
}
} catch (TimeoutException e) {
throw new ComputerException("Timeout(%sms) to wait for " +
"controling message(%s) to finished",
"controlling message(%s) to finished",
e, timeout, type);
} catch (InterruptedException | ExecutionException e) {
throw new ComputerException("Failed to wait for controling " +
throw new ComputerException("Failed to wait for controlling " +
"message(%s) to finished", e, type);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.concurrent.CompletableFuture;

import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.message.MessageType;

public interface MessageSender {
Expand All @@ -37,4 +39,10 @@ CompletableFuture<Void> send(int workerId, MessageType type)
* @param message message payload
*/
void send(int workerId, QueuedMessage message) throws InterruptedException;

/**
* Invoked when the channel associated with the given connectionId has
* an exception is thrown processing message.
*/
void transportExceptionCaught(TransportException cause, ConnectionId connectionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.TransportClient;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.concurrent.BarrierEvent;
Expand Down Expand Up @@ -103,6 +104,15 @@ public void send(int workerId, QueuedMessage message)
channel.queue.put(message);
}

@Override
public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) {
for (WorkerChannel channel : this.channels) {
if (channel.client.connectionId().equals(connectionId)) {
channel.futureRef.get().completeExceptionally(cause);
}
}
}

public Runnable notBusyNotifier() {
/*
* DataClientHandler.sendAvailable() will call it when client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.concurrent.CompletableFuture;

import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.sender.MessageSender;
import org.apache.hugegraph.computer.core.sender.QueuedMessage;
Expand All @@ -36,4 +38,9 @@ public CompletableFuture<Void> send(int workerId, MessageType type) {
public void send(int workerId, QueuedMessage message) {
// pass
}

@Override
public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) {
// pass
}
}

0 comments on commit f7d10a8

Please sign in to comment.