Skip to content

Commit

Permalink
chore: send one EOF response only for reduce stream (#102)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Mar 26, 2024
1 parent 502262f commit 3dae164
Show file tree
Hide file tree
Showing 13 changed files with 77 additions and 71 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ on:

jobs:
docker_publish:
# run it only on numaproj/numaflow-java repository
# forked repositories normally don't have the proper permission setup.
if: ${{ github.repository }} == "numaproj/numaflow-java"
name: Build, Tag, and Push Image
runs-on: ubuntu-latest

Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Java SDK for Numaflow

[![Build](https://github.com/numaproj/numaflow-java/actions/workflows/ci.yaml/badge.svg?branch=main)](https://github.com/numaproj/numaflow-java/actions/workflows/ci.yaml)
[![Build](https://github.com/numaproj/numaflow-java/actions/workflows/run-tests.yaml/badge.svg?branch=main)](https://github.com/numaproj/numaflow-java/actions/workflows/run-tests.yaml)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](LICENSE)
[![Release Version](https://img.shields.io/github/v/release/numaproj/numaflow-java?label=numaflow-java)](https://github.com/numaproj/numaflow-java/releases/latest)
[![Maven Central](https://img.shields.io/maven-central/v/io.numaproj.numaflow/numaflow-java.svg?label=Maven%20Central)](https://central.sonatype.com/search?q=numaflow+java&smo=true)
Expand Down Expand Up @@ -57,6 +57,8 @@ mvn clean install
* [MapStream](examples/src/main/java/io/numaproj/numaflow/examples/mapstream/flatmapstream)
* [Map](examples/src/main/java/io/numaproj/numaflow/examples/map)
* [Reduce](examples/src/main/java/io/numaproj/numaflow/examples/reduce)
* [ReduceStream](examples/src/main/java/io/numaproj/numaflow/examples/reducestreamer)
* [SessionReduce](examples/src/main/java/io/numaproj/numaflow/examples/reducesession)

* **User Defined Sink(UDSink)**
* [Sink](examples/src/main/java/io/numaproj/numaflow/examples/sink/simple)
Expand Down
6 changes: 3 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ you can take the desired image and test it in a pipeline

If you want to build and push all the example images at once, you can run:
```shell
./hack/update_examples -bp -t <tag>
./hack/update_examples.sh -bp -t <tag>
```
The default tag is `stable`, but it is recommended you specify your own for testing purposes, as the Github Actions CI uses the `stable` tag.
This consistent tag name is used so that the tags in the [E2E test pipelines](https://github.com/numaproj/numaflow/tree/main/test) do not need to be
updated each time an SDK change is made.
updated each time an SDK change is made.

You can alternatively build and push a specific example image by running the following:
```shell
./hack/update_examples -bpe <example-execution-id> -t <tag>
./hack/update_examples.sh -bpe <example-execution-id> -t <tag>
```
Both `-bpe` and `-bp` first build a local image with the naming convention
`numaflow-java-examples/<example-execution-id>:<tag>`, which then gets pushed as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,17 @@ private void responseListener(ActorResponse actorResponse) {
if there are no entries in the map, that means processing is
done we can close the stream.
*/
responseObserver.onNext(actorResponse.getResponse());
if (actorResponse.getResponse().getEOF()) {
actorsMap.remove(actorResponse.getUniqueIdentifier());
if (actorsMap.isEmpty()) {
// only send the last EOF to the response gRPC output stream.
responseObserver.onNext(actorResponse.getResponse());
responseObserver.onCompleted();
getContext().getSystem().stop(getSelf());
}
} else {
// send non-EOF responses to the output stream.
responseObserver.onNext(actorResponse.getResponse());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,12 @@

/**
* The actor response holds the final EOF response for a particular key set.
* <p>
* The isLast attribute indicates whether the response is globally the last one to be sent to
* the output gRPC stream, if set to true, it means the response is the very last response among
* all key sets. When output stream actor receives an isLast response, it sends the response and immediately
* closes the output stream.
*/
@Getter
@Setter
@AllArgsConstructor
class ActorResponse {
ReduceOuterClass.ReduceResponse response;
boolean isLast;

// TODO - do we need to include window information in the id?
// for aligned reducer, there is always single window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Receive createReceive() {
}

private void handleResponse(ActorResponse actorResponse) {
if (actorResponse.isLast()) {
if (actorResponse.getResponse().getEOF()) {
// send the very last response.
responseObserver.onNext(actorResponse.getResponse());
// close the output stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ private ActorResponse buildResponse(Message message) {
.addAllTags(
message.getTags() == null ? new ArrayList<>():List.of(message.getTags()))
.build());
return new ActorResponse(responseBuilder.build(), false);
return new ActorResponse(responseBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ private ActorResponse buildEOFResponse() {
.newBuilder()
.addAllKeys(List.of(this.keys))
.build());
return new ActorResponse(responseBuilder.build(), false);
return new ActorResponse(responseBuilder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public Receive createReceive() {
.create()
.match(ActorRequest.class, this::invokeActor)
.match(String.class, this::sendEOF)
.match(ActorResponse.class, this::handleActorResponse)
.match(ActorResponse.class, this::handleActorEOFResponse)
.build();
}

Expand Down Expand Up @@ -114,16 +114,15 @@ private void sendEOF(String EOF) {
}
}

private void handleActorResponse(ActorResponse actorResponse) {
private void handleActorEOFResponse(ActorResponse actorResponse) {
// when the supervisor receives an actor response, it means the corresponding
// reduce streamer actor has finished its job.
// we remove the entry from the actors map.
actorsMap.remove(actorResponse.getActorUniqueIdentifier());
if (actorsMap.isEmpty()) {
// since the actors map is empty, this particular actor response is the last response to forward to output gRPC stream.
actorResponse.setLast(true);
this.outputActor.tell(actorResponse, getSelf());
} else {
// for reduce streamer, we only send to output stream one single EOF response, which is the last one.
// we don't care about per-key-set EOFs.
this.outputActor.tell(actorResponse, getSelf());
}
}
Expand Down
34 changes: 17 additions & 17 deletions src/test/java/io/numaproj/numaflow/reducer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;

import static io.numaproj.numaflow.shared.GrpcServerUtils.WIN_END_KEY;
Expand Down Expand Up @@ -111,7 +110,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.addAllKeys(Arrays.asList(reduceKey))
.addAllKeys(List.of(reduceKey))
.build())
.build();
inputStreamObserver.onNext(request);
Expand All @@ -123,31 +122,28 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
// sum of first 10 numbers 1 to 10 -> 55
ByteString expectedValue = ByteString.copyFromUtf8(String.valueOf(55));
while (!outputStreamObserver.completed.get()) ;
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();

// Expect 2 responses, one containing the aggregated data and the other indicating EOF.
assertEquals(2, outputStreamObserver.resultDatum.get().size());
assertEquals(2, result.size());
assertEquals(
expectedKeys,
outputStreamObserver.resultDatum
.get()
expectedKeys, result
.get(0)
.getResult()
.getKeysList()
.toArray(new String[0]));
assertEquals(
expectedValue,
outputStreamObserver.resultDatum
.get()
expectedValue, result
.get(0)
.getResult()
.getValue());
assertTrue(outputStreamObserver.resultDatum.get().get(1).getEOF());
assertTrue(result.get(1).getEOF());
}

@Test
public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then_requestsGetAggregatedSeparately() {
String reduceKey = "reduce-key";
int keyCount = 3;
int keyCount = 10;

Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
Expand All @@ -167,7 +163,7 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload.newBuilder()
.addAllKeys(Arrays.asList(reduceKey + j))
.addAllKeys(List.of(reduceKey + j))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
Expand All @@ -182,10 +178,14 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then

while (!outputStreamObserver.completed.get()) ;
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the outputStreamObserver should have observed 2*keyCount responses, because for each key set, one response for the aggregated result, the other for EOF.
assertEquals(keyCount * 2, result.size());
result.forEach(response -> {
assertTrue(response.getResult().getValue().equals(expectedValue) || response.getEOF());
});

// the outputStreamObserver should have observed keyCount+ 1 responses, one with real output sum data per key, one as the final single EOF response.
assertEquals(keyCount + 1, result.size());
for (int i = 0; i < keyCount; i++) {
ReduceOuterClass.ReduceResponse response = result.get(i);
assertEquals(response.getResult().getValue(), expectedValue);
}
// verify the last one is the EOF.
assertTrue(result.get(keyCount).getEOF());
}
}
33 changes: 17 additions & 16 deletions src/test/java/io/numaproj/numaflow/reducer/SupervisorActorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

Expand All @@ -22,7 +23,7 @@ public class SupervisorActorTest {
@Test
public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then_onlyOneReducerActorGetsCreatedAndAggregatesAllRequests() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-1");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
CompletableFuture<Void> completableFuture = new CompletableFuture<>();

ActorRef shutdownActor = actorSystem
.actorOf(ReduceShutdownActor
Expand Down Expand Up @@ -53,16 +54,15 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then

try {
completableFuture.get();
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the observer should receive 2 messages, one is the aggregated result, the other is the EOF response.
assertEquals(2, outputStreamObserver.resultDatum.get().size());
assertEquals("10", outputStreamObserver.resultDatum
.get()
assertEquals(2, result.size());
assertEquals("10", result
.get(0)
.getResult()
.getValue()
.toStringUtf8());
assertEquals(true, outputStreamObserver.resultDatum
.get()
assertTrue(result
.get(1)
.getEOF());
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -73,7 +73,8 @@ public void given_inputRequestsShareSameKeys_when_supervisorActorBroadcasts_then
@Test
public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcasts_then_multipleReducerActorsHandleKeySetsSeparately() throws RuntimeException {
final ActorSystem actorSystem = ActorSystem.create("test-system-2");
CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
int keyCount = 10;

ActorRef shutdownActor = actorSystem
.actorOf(ReduceShutdownActor
Expand All @@ -92,7 +93,7 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
outputStreamObserver)
);

for (int i = 1; i <= 10; i++) {
for (int i = 1; i <= keyCount; i++) {
ActorRequest reduceRequest = new ActorRequest(ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload
Expand All @@ -108,15 +109,15 @@ public void given_inputRequestsHaveDifferentKeySets_when_supervisorActorBroadcas
supervisorActor.tell(Constants.EOF, ActorRef.noSender());
try {
completableFuture.get();
// each reduce request generates two reduce responses, one containing the data and the other one indicating EOF.
assertEquals(20, outputStreamObserver.resultDatum.get().size());
for (int i = 0; i < 20; i++) {
ReduceOuterClass.ReduceResponse response = outputStreamObserver.resultDatum
.get()
.get(i);
assertTrue(response.getResult().getValue().toStringUtf8().equals("1")
|| response.getEOF());
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// expect keyCount number of responses with data, plus one final EOF response.
assertEquals(keyCount + 1, result.size());
for (int i = 0; i < keyCount; i++) {
ReduceOuterClass.ReduceResponse response = result.get(i);
assertEquals("1", response.getResult().getValue().toStringUtf8());
}
// verify the last one is the EOF.
assertTrue(result.get(keyCount).getEOF());
} catch (InterruptedException | ExecutionException e) {
fail("Expected the future to complete without exception");
}
Expand Down
21 changes: 11 additions & 10 deletions src/test/java/io/numaproj/numaflow/reducestreamer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
.setPayload(ReduceOuterClass.ReduceRequest.Payload
.newBuilder()
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.addAllKeys(Arrays.asList(reduceKey))
.addAllKeys(List.of(reduceKey))
.build())
.build();
inputStreamObserver.onNext(request);
Expand Down Expand Up @@ -169,7 +169,7 @@ public void given_inputReduceRequestsShareSameKey_when_serverStarts_then_allRequ
@Test
public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then_requestsGetAggregatedSeparately() {
String reduceKey = "reduce-key";
int keyCount = 3;
int keyCount = 10;

Metadata metadata = new Metadata();
metadata.put(Metadata.Key.of(WIN_START_KEY, Metadata.ASCII_STRING_MARSHALLER), "60000");
Expand All @@ -189,7 +189,7 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then
ReduceOuterClass.ReduceRequest request = ReduceOuterClass.ReduceRequest
.newBuilder()
.setPayload(ReduceOuterClass.ReduceRequest.Payload.newBuilder()
.addAllKeys(Arrays.asList(reduceKey + j))
.addAllKeys(List.of(reduceKey + j))
.setValue(ByteString.copyFromUtf8(String.valueOf(i)))
.build())
.build();
Expand All @@ -206,14 +206,15 @@ public void given_inputReduceRequestsHaveDifferentKeySets_when_serverStarts_then

while (!outputStreamObserver.completed.get()) ;
List<ReduceOuterClass.ReduceResponse> result = outputStreamObserver.resultDatum.get();
// the outputStreamObserver should have observed 3*keyCount responses, 2 with real output sum data, one as EOF.
assertEquals(keyCount * 3, result.size());
result.forEach(response -> {
// the outputStreamObserver should have observed (keyCount * 2 + 1) responses, 2 with real output sum data per key, 1 as the final single EOF response.
assertEquals(keyCount * 2 + 1, result.size());
for (int i = 0; i < keyCount * 2; i++) {
ReduceOuterClass.ReduceResponse response = result.get(i);
assertTrue(response.getResult().getValue().equals(expectedFirstResponse) ||
response.getResult().getValue().equals(expectedSecondResponse)
|| response.getEOF());

});
response.getResult().getValue().equals(expectedSecondResponse));
}
// verify the last one is the EOF.
assertTrue(result.get(keyCount * 2).getEOF());
}

public static class ReduceStreamerTestFactory extends ReduceStreamerFactory<ServerTest.ReduceStreamerTestFactory.TestReduceStreamHandler> {
Expand Down
Loading

0 comments on commit 3dae164

Please sign in to comment.