From f7d10a845de68cd84fe01f247b8fc3d887edbb67 Mon Sep 17 00:00:00 2001 From: Aaron Wang Date: Thu, 25 May 2023 09:24:52 +0800 Subject: [PATCH] feat: implement fast-failover for MessageRecvManager and DataClientManager (#243) --- .../core/network/DataClientManager.java | 3 +- .../netty/ChannelFutureListenerOnWrite.java | 2 +- .../core/receiver/MessageRecvManager.java | 59 +++++++++---------- .../core/sender/MessageSendManager.java | 6 +- .../computer/core/sender/MessageSender.java | 8 +++ .../core/sender/QueuedMessageSender.java | 10 ++++ .../core/compute/MockMessageSender.java | 7 +++ 7 files changed, 58 insertions(+), 37 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java index 34f7043af..8e90c7946 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/DataClientManager.java @@ -118,10 +118,9 @@ public void onChannelInactive(ConnectionId connectionId) { @Override public void exceptionCaught(TransportException cause, ConnectionId connectionId) { - // TODO: implement failover LOG.error("Channel for connectionId {} occurred exception", connectionId, cause); - DataClientManager.this.connManager.closeClient(connectionId); + DataClientManager.this.sender.transportExceptionCaught(cause, connectionId); } } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java index c37155740..9e54a1693 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/network/netty/ChannelFutureListenerOnWrite.java @@ -55,7 +55,7 @@ public void onDone(Channel channel, ChannelFuture future) { } } - public void onSuccess(Channel channel, ChannelFuture future) { + public void onSuccess(Channel channel, ChannelFuture future) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully send data to '{}'", TransportUtil.remoteAddress(channel)); diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java index 5e51320ad..b77ffa807 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/receiver/MessageRecvManager.java @@ -18,8 +18,11 @@ package org.apache.hugegraph.computer.core.receiver; import java.util.Map; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hugegraph.computer.core.common.ComputerContext; import org.apache.hugegraph.computer.core.common.Constants; @@ -60,7 +63,9 @@ public class MessageRecvManager implements Manager, MessageHandler { private int workerCount; private int expectedFinishMessages; - private CountDownLatch finishMessagesLatch; + private CompletableFuture finishMessagesFuture; + private AtomicInteger finishMessagesCount; + private long waitFinishMessagesTimeout; private long superstep; @@ -71,6 +76,7 @@ public MessageRecvManager(ComputerContext context, this.fileManager = fileManager; this.sortManager = sortManager; this.superstep = Constants.INPUT_SUPERSTEP; + this.finishMessagesCount = new AtomicInteger(); } @Override @@ -90,8 +96,9 @@ public void init(Config config) { this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT); // One for vertex and one for edge. this.expectedFinishMessages = this.workerCount * 2; - this.finishMessagesLatch = new CountDownLatch( - this.expectedFinishMessages); + this.finishMessagesFuture = new CompletableFuture<>(); + this.finishMessagesCount.set(this.expectedFinishMessages); + this.waitFinishMessagesTimeout = config.get( ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT); } @@ -103,8 +110,9 @@ public void beforeSuperstep(Config config, int superstep) { this.messagePartitions = new ComputeMessageRecvPartitions( this.context, fileGenerator, this.sortManager); this.expectedFinishMessages = this.workerCount; - this.finishMessagesLatch = new CountDownLatch( - this.expectedFinishMessages); + this.finishMessagesFuture = new CompletableFuture<>(); + this.finishMessagesCount.set(this.expectedFinishMessages); + this.superstep = superstep; if (this.superstep == Constants.INPUT_SUPERSTEP + 1) { @@ -138,35 +146,21 @@ public void onChannelInactive(ConnectionId connectionId) { @Override public void exceptionCaught(TransportException cause, ConnectionId connectionId) { - // TODO: implement failover - LOG.warn("Exception caught for connection:{}, root cause:", + LOG.error("Exception caught for connection:{}, root cause:", connectionId, cause); + this.finishMessagesFuture.completeExceptionally(cause); } public void waitReceivedAllMessages() { try { - boolean status = this.finishMessagesLatch.await( - this.waitFinishMessagesTimeout, - TimeUnit.MILLISECONDS); - if (!status) { - throw new ComputerException( - "Expect %s finish-messages received in %s ms, " + - "%s absence in superstep %s", - this.expectedFinishMessages, - this.waitFinishMessagesTimeout, - this.finishMessagesLatch.getCount(), - this.superstep); - } - } catch (InterruptedException e) { - throw new ComputerException( - "Thread is interrupted while waiting %s " + - "finish-messages received in %s ms, " + - "%s absence in superstep %s", - e, - this.expectedFinishMessages, - this.waitFinishMessagesTimeout, - this.finishMessagesLatch.getCount(), - this.superstep); + this.finishMessagesFuture.get(this.waitFinishMessagesTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + throw new ComputerException("Time out while waiting %s finish-messages " + + "received in %s ms in superstep %s", + this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e); + } catch (InterruptedException | ExecutionException e) { + throw new ComputerException("Error while waiting %s finish-messages in superstep %s", + this.expectedFinishMessages, this.superstep, e); } } @@ -214,7 +208,10 @@ public void onStarted(ConnectionId connectionId) { @Override public void onFinished(ConnectionId connectionId) { LOG.debug("ConnectionId {} finished", connectionId); - this.finishMessagesLatch.countDown(); + int currentCount = this.finishMessagesCount.decrementAndGet(); + if (currentCount == 0) { + this.finishMessagesFuture.complete(null); + } } /** diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java index ee981c025..48fa687fd 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSendManager.java @@ -154,7 +154,7 @@ public void startSend(MessageType type) { } /** - * Finsih send message, send the last buffer and put an END signal + * Finish send message, send the last buffer and put an END signal * into queue * @param type the message type */ @@ -277,10 +277,10 @@ private void sendControlMessageToWorkers(Set workerIds, } } catch (TimeoutException e) { throw new ComputerException("Timeout(%sms) to wait for " + - "controling message(%s) to finished", + "controlling message(%s) to finished", e, timeout, type); } catch (InterruptedException | ExecutionException e) { - throw new ComputerException("Failed to wait for controling " + + throw new ComputerException("Failed to wait for controlling " + "message(%s) to finished", e, type); } } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java index 864951e82..a700b22c9 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/MessageSender.java @@ -19,6 +19,8 @@ import java.util.concurrent.CompletableFuture; +import org.apache.hugegraph.computer.core.common.exception.TransportException; +import org.apache.hugegraph.computer.core.network.ConnectionId; import org.apache.hugegraph.computer.core.network.message.MessageType; public interface MessageSender { @@ -37,4 +39,10 @@ CompletableFuture send(int workerId, MessageType type) * @param message message payload */ void send(int workerId, QueuedMessage message) throws InterruptedException; + + /** + * Invoked when the channel associated with the given connectionId has + * an exception is thrown processing message. + */ + void transportExceptionCaught(TransportException cause, ConnectionId connectionId); } diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java index 810d3b9a6..b2006b886 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/sender/QueuedMessageSender.java @@ -24,6 +24,7 @@ import org.apache.hugegraph.computer.core.common.exception.TransportException; import org.apache.hugegraph.computer.core.config.ComputerOptions; import org.apache.hugegraph.computer.core.config.Config; +import org.apache.hugegraph.computer.core.network.ConnectionId; import org.apache.hugegraph.computer.core.network.TransportClient; import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.concurrent.BarrierEvent; @@ -103,6 +104,15 @@ public void send(int workerId, QueuedMessage message) channel.queue.put(message); } + @Override + public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) { + for (WorkerChannel channel : this.channels) { + if (channel.client.connectionId().equals(connectionId)) { + channel.futureRef.get().completeExceptionally(cause); + } + } + } + public Runnable notBusyNotifier() { /* * DataClientHandler.sendAvailable() will call it when client diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java index 3535fd8a7..a19692008 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/MockMessageSender.java @@ -19,6 +19,8 @@ import java.util.concurrent.CompletableFuture; +import org.apache.hugegraph.computer.core.common.exception.TransportException; +import org.apache.hugegraph.computer.core.network.ConnectionId; import org.apache.hugegraph.computer.core.network.message.MessageType; import org.apache.hugegraph.computer.core.sender.MessageSender; import org.apache.hugegraph.computer.core.sender.QueuedMessage; @@ -36,4 +38,9 @@ public CompletableFuture send(int workerId, MessageType type) { public void send(int workerId, QueuedMessage message) { // pass } + + @Override + public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) { + // pass + } }