Skip to content

Commit

Permalink
address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Sep 19, 2024
1 parent d03b0bc commit d95d1b5
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,20 @@ synchronized Optional<ExecutableWork> completeWorkAndGetNextWorkForKey(
@Nullable Queue<ExecutableWork> workQueue = activeWork.get(shardedKey);
if (workQueue == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId);
LOG.warn(
"Unable to complete inactive work for key={} and token={}. Work queue for key does not exist.",
shardedKey,
workId);
return Optional.empty();
}

removeCompletedWorkFromQueue(workQueue, shardedKey, workId);
return getNextWork(workQueue, shardedKey);
}

private synchronized void removeCompletedWorkFromQueue(
Queue<ExecutableWork> workQueue, ShardedKey shardedKey, WorkId workId) {
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
ExecutableWork completedWork = workQueue.peek();
@Nullable ExecutableWork completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Active key {} without work, expected token {}", shardedKey, workId);
Expand Down Expand Up @@ -346,7 +348,7 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) {
+ "<th>Active For</th>"
+ "<th>State</th>"
+ "<th>State Active For</th>"
+ "<th>Produced By</th>"
+ "<th>Backend</th>"
+ "</tr>");
// Use StringBuilder because we are appending in loop.
StringBuilder activeWorkStatus = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,59 +269,53 @@ public synchronized void shutdown() {
channelCachingStubFactory.shutdown();
}

private void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) {
CompletableFuture<Void> closeStaleStreams;

synchronized (this) {
LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
closeStaleStreams =
closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams());
ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
createAndStartNewStreams(newWindmillConnections.values()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
.setWindmillConnections(newWindmillConnections)
.setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
LOG.info(
"Setting new connections: {}. Previous connections: {}.",
newConnectionsState,
connections.get());
connections.set(newConnectionsState);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
}

// Close the streams outside the lock.
closeStaleStreams.join();
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) {
LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
createNewWindmillConnections(newWindmillEndpoints.windmillEndpoints());
closeStaleStreams(newWindmillConnections.values(), connections.get().windmillStreams());
ImmutableMap<WindmillConnection, WindmillStreamSender> newStreams =
createAndStartNewStreams(newWindmillConnections.values()).join();
StreamingEngineConnectionState newConnectionsState =
StreamingEngineConnectionState.builder()
.setWindmillConnections(newWindmillConnections)
.setWindmillStreams(newStreams)
.setGlobalDataStreams(
createNewGlobalDataStreams(newWindmillEndpoints.globalDataEndpoints()))
.build();
LOG.info(
"Setting new connections: {}. Previous connections: {}.",
newConnectionsState,
connections.get());
connections.set(newConnectionsState);
getWorkBudgetDistributor.distributeBudget(newStreams.values(), totalGetWorkBudget);
}

/** Close the streams that are no longer valid asynchronously. */
private CompletableFuture<Void> closeStaleStreams(
@SuppressWarnings("FutureReturnValueIgnored")
private void closeStaleStreams(
Collection<WindmillConnection> newWindmillConnections,
ImmutableMap<WindmillConnection, WindmillStreamSender> currentStreams) {
return CompletableFuture.allOf(
currentStreams.entrySet().stream()
.filter(
connectionAndStream ->
!newWindmillConnections.contains(connectionAndStream.getKey()))
.map(
entry ->
CompletableFuture.runAsync(
() -> {
LOG.debug("Closing streams to {}", entry);
entry.getValue().closeAllStreams();
entry
.getKey()
.directEndpoint()
.ifPresent(channelCachingStubFactory::remove);
LOG.debug("Successfully closed streams to {}", entry);
},
windmillStreamManager))
.toArray(CompletableFuture[]::new));
currentStreams.entrySet().stream()
.filter(
connectionAndStream -> !newWindmillConnections.contains(connectionAndStream.getKey()))
.forEach(
entry ->
CompletableFuture.runAsync(
() -> {
LOG.debug("Closing streams to {}", entry);
try {
entry.getValue().closeAllStreams();
entry
.getKey()
.directEndpoint()
.ifPresent(channelCachingStubFactory::remove);
LOG.debug("Successfully closed streams to {}", entry);
} catch (Exception e) {
LOG.error("Error closing streams to {}", entry);
}
},
windmillStreamManager));
}

private synchronized CompletableFuture<ImmutableMap<WindmillConnection, WindmillStreamSender>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void adjustBudget(long itemsDelta, long bytesDelta) {
GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build();
getWorkBudget.getAndSet(adjustment);
if (started.get()) {
getWorkStream.get().adjustBudget(adjustment);
getWorkStream.get().setBudget(adjustment);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
private final String backendWorkerToken;
private final ResettableRequestObserver<RequestT> requestObserver;
private final AtomicBoolean isShutdown;
private final AtomicReference<DateTime> shutdownTime;

/**
* Indicates if the current {@link ResettableRequestObserver} was closed by calling {@link
Expand Down Expand Up @@ -140,6 +141,7 @@ protected AbstractWindmillStream(
new AbstractWindmillStream<RequestT, ResponseT>.ResponseObserver()));
this.sleeper = Sleeper.DEFAULT;
this.logger = logger;
this.shutdownTime = new AtomicReference<>();
}

private static String createThreadName(String streamType, String backendWorkerToken) {
Expand Down Expand Up @@ -293,11 +295,14 @@ public final void appendSummaryHtml(PrintWriter writer) {
writer.format(", %dms backoff remaining", sleepLeft);
}
writer.format(
", current stream is %dms old, last send %dms, last response %dms, closed: %s",
", current stream is %dms old, last send %dms, last response %dms, closed: %s, "
+ "isShutdown: %s, shutdown time: %s",
debugDuration(nowMs, startTimeMs.get()),
debugDuration(nowMs, lastSendTimeMs.get()),
debugDuration(nowMs, lastResponseTimeMs.get()),
streamClosed.get());
streamClosed.get(),
isShutdown.get(),
shutdownTime.get());
}

/**
Expand All @@ -307,7 +312,7 @@ public final void appendSummaryHtml(PrintWriter writer) {
protected abstract void appendSpecificHtml(PrintWriter writer);

@Override
public final void halfClose() {
public final synchronized void halfClose() {
clientClosed.set(true);
requestObserver.onCompleted();
streamClosed.set(true);
Expand Down Expand Up @@ -336,6 +341,7 @@ public final void shutdown() {
requestObserver()
.onError(new WindmillStreamShutdownException("Explicit call to shutdown stream."));
shutdownInternal();
shutdownTime.set(DateTime.now());
}
}

Expand All @@ -362,7 +368,7 @@ private static class ResettableRequestObserver<RequestT> implements StreamObserv
private final Supplier<StreamObserver<RequestT>> requestObserverSupplier;

@GuardedBy("this")
private volatile @Nullable StreamObserver<RequestT> delegateRequestObserver;
private @Nullable StreamObserver<RequestT> delegateRequestObserver;

private ResettableRequestObserver(Supplier<StreamObserver<RequestT>> requestObserverSupplier) {
this.requestObserverSupplier = requestObserverSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ public interface WindmillStream {
@ThreadSafe
interface GetWorkStream extends WindmillStream {
/** Adjusts the {@link GetWorkBudget} for the stream. */
void adjustBudget(long itemsDelta, long bytesDelta);
void setBudget(long itemsDelta, long bytesDelta);

default void adjustBudget(GetWorkBudget newBudget) {
adjustBudget(newBudget.items(), newBudget.bytes());
default void setBudget(GetWorkBudget newBudget) {
setBudget(newBudget.items(), newBudget.bytes());
}

/** Returns the remaining in-flight {@link GetWorkBudget}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ protected boolean hasPendingRequests() {
public void appendSpecificHtml(PrintWriter writer) {
// Number of buffers is same as distinct workers that sent work on this stream.
writer.format(
"GetWorkStream: %d buffers, in-flight budget: %s; last sent request: %s.",
workItemAssemblers.size(), inFlightBudget.get(), lastRequest.get());
"GetWorkStream: %d buffers, max budget: %s, in-flight budget: %s, last sent request: %s.",
workItemAssemblers.size(), maxGetWorkBudget.get(), inFlightBudget.get(), lastRequest.get());
}

@Override
Expand Down Expand Up @@ -283,7 +283,7 @@ protected void startThrottleTimer() {
}

@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
public void setBudget(long itemsDelta, long bytesDelta) {
maxGetWorkBudget.getAndSet(
GetWorkBudget.builder().setItems(itemsDelta).setBytes(bytesDelta).build());
sendRequestExtension();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,15 +342,15 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<Respon
} catch (AppendableInputStream.InvalidInputStreamStateException
| VerifyException
| CancellationException e) {
handleShutdown(request);
handleShutdown(request, e);
if (!(e instanceof CancellationException)) {
throw e;
}
} catch (IOException e) {
LOG.error("Parsing GetData response failed: ", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleShutdown(request);
handleShutdown(request, e);
throw new RuntimeException(e);
} finally {
pending.remove(request.id());
Expand All @@ -363,10 +363,13 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<Respon
"Cannot send request=[" + request + "] on closed stream.");
}

private void handleShutdown(QueuedRequest request) {
private void handleShutdown(QueuedRequest request, Throwable cause) {
if (isShutdown()) {
throw new WindmillStreamShutdownException(
"Cannot send request=[" + request + "] on closed stream.");
WindmillStreamShutdownException shutdownException =
new WindmillStreamShutdownException(
"Cannot send request=[" + request + "] on closed stream.");
shutdownException.addSuppressed(cause);
throw shutdownException;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationGetDataRequest;
Expand All @@ -37,6 +36,7 @@
/** Utility data classes for {@link GrpcGetDataStream}. */
final class GrpcGetDataStreamRequests {
private static final Logger LOG = LoggerFactory.getLogger(GrpcGetDataStreamRequests.class);
private static final int STREAM_CANCELLED_ERROR_LOG_LIMIT = 3;

private GrpcGetDataStreamRequests() {}

Expand Down Expand Up @@ -154,7 +154,7 @@ void waitForSendOrFailNotification() throws InterruptedException {
LOG.error("Requests failed for the following batches: {}", cancelledRequests);
throw new WindmillStreamShutdownException(
"Requests failed for batch containing "
+ cancelledRequests.stream().limit(3).collect(Collectors.joining(", "))
+ String.join(", ", cancelledRequests)
+ " ... requests. This is most likely due to the stream being explicitly closed"
+ " which happens when the work is marked as invalid on the streaming"
+ " backend when key ranges shuffle around. This is transient and corresponding"
Expand Down Expand Up @@ -186,6 +186,7 @@ ImmutableList<String> createStreamCancelledErrorMessage() {
throw new IllegalStateException();
}
})
.limit(STREAM_CANCELLED_ERROR_LOG_LIMIT)
.collect(toImmutableList());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ protected void startThrottleTimer() {
}

@Override
public void adjustBudget(long itemsDelta, long bytesDelta) {
public void setBudget(long itemsDelta, long bytesDelta) {
// no-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.PrintWriter;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkerMetadataResponse;
Expand Down Expand Up @@ -171,13 +169,9 @@ protected void sendHealthCheck() {
@Override
protected void appendSpecificHtml(PrintWriter writer) {
synchronized (metadataLock) {
List<String> backendWorkerTokens =
latestResponse.getWorkEndpointsList().stream()
.map(WorkerMetadataResponse.Endpoint::getBackendWorkerToken)
.collect(Collectors.toList());
writer.format(
"GetWorkerMetadataStream: job_header=[%s], current_metadata=[%s]",
workerMetadataRequest.getHeader(), backendWorkerTokens);
workerMetadataRequest.getHeader(), latestResponse);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.auto.value.AutoBuilder;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.Timer;
Expand Down Expand Up @@ -325,7 +324,6 @@ private StreamObserverFactory newStreamObserverFactory(long deadline) {
public void appendSummaryHtml(PrintWriter writer) {
writer.write("Active Streams:<br>");
streamRegistry.stream()
.sorted(Comparator.comparing(AbstractWindmillStream::backendWorkerToken))
.collect(
toImmutableListMultimap(
AbstractWindmillStream::backendWorkerToken, Function.identity()))
Expand Down
Loading

0 comments on commit d95d1b5

Please sign in to comment.