Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Sep 22, 2024
1 parent b8c595f commit acf0406
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package io.numaproj.numaflow.examples.mapstream.flatmapstream;

import io.numaproj.numaflow.mapstreamer.*;
import io.numaproj.numaflow.mapstreamer.Datum;
import io.numaproj.numaflow.mapstreamer.MapStreamer;
import io.numaproj.numaflow.mapstreamer.Message;
import io.numaproj.numaflow.mapstreamer.OutputObserver;
import io.numaproj.numaflow.mapstreamer.Server;


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void ack(AckRequest request) {

@Override
public long getPending() {
// pending messages will be zero for a simple source
// number of messages not acknowledged yet
return messages.size();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import io.numaproj.numaflow.examples.map.flatmap.FlatMapFunction;
import io.numaproj.numaflow.examples.reduce.sum.SumFactory;
import io.numaproj.numaflow.examples.sink.simple.SimpleSink;
import io.numaproj.numaflow.examples.source.simple.SimpleSource;
import io.numaproj.numaflow.examples.sourcetransformer.eventtimefilter.EventTimeFilterFunction;
import io.numaproj.numaflow.mapper.MapperTestKit;
import io.numaproj.numaflow.mapper.Message;
Expand All @@ -14,17 +13,14 @@
import io.numaproj.numaflow.sinker.Response;
import io.numaproj.numaflow.sinker.ResponseList;
import io.numaproj.numaflow.sinker.SinkerTestKit;
import io.numaproj.numaflow.sourcer.SourcerTestKit;
import io.numaproj.numaflow.sourcetransformer.SourceTransformerTestKit;
import lombok.extern.slf4j.Slf4j;
import org.junit.Ignore;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package io.numaproj.numaflow.examples.source.simple;

import com.google.common.primitives.Longs;
import io.numaproj.numaflow.sourcer.Message;
import io.numaproj.numaflow.sourcer.Offset;
import io.numaproj.numaflow.sourcer.SourcerTestKit;
import org.junit.Ignore;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -59,18 +57,8 @@ public void test_ReadAndAck() {
@Test
public void testPending() {
SimpleSource simpleSource = new SimpleSource();
// simple source getPending always returns 0.
// since we haven't read any messages, the pending should be 0
Assertions.assertEquals(0, simpleSource.getPending());
}

@Test
public void testLong() {
Long l = 1L;
byte[] bytes = Longs.toByteArray(l);

Long x = Longs.fromByteArray(bytes);
Assertions.assertEquals(l, x);
}

}

2 changes: 1 addition & 1 deletion src/main/java/io/numaproj/numaflow/sourcer/AckRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
public interface AckRequest {
/**
* @return the offsets to be acknowledged
* @return the offset to be acknowledged
*/
Offset getOffset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
Expand Down
46 changes: 42 additions & 4 deletions src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;


public class ServerTest {
Expand Down Expand Up @@ -65,6 +66,11 @@ public void tearDown() throws Exception {
public void TestSourcer() {
var stub = SourceGrpc.newStub(inProcessChannel);

// Create a handshake request
SourceOuterClass.ReadRequest handshakeRequest = SourceOuterClass.ReadRequest.newBuilder()
.setHandshake(SourceOuterClass.Handshake.newBuilder().setSot(true).build())
.build();

// Test readFn, source has 10 messages
// we read 5 messages, ack them, then read another 5 messages
SourceOuterClass.ReadRequest request = SourceOuterClass.ReadRequest.newBuilder()
Expand All @@ -74,14 +80,23 @@ public void TestSourcer() {
.setTimeoutInMs(1000)
.build())
.build();

List<SourceOuterClass.AckRequest> ackRequests = new ArrayList<>();

StreamObserver<SourceOuterClass.ReadRequest> readRequestObserver = stub.readFn(new StreamObserver<>() {
int count = 0;

boolean handshake = false;
boolean eot = false;
@Override
public void onNext(SourceOuterClass.ReadResponse readResponse) {
// Handle handshake response
if (readResponse.hasHandshake() && readResponse.getHandshake().getSot()) {
handshake = true;
return;
}
if (readResponse.getStatus().getEot()) {
eot = true;
return;
}
count++;
SourceOuterClass.Offset offset = readResponse.getResult().getOffset();
SourceOuterClass.AckRequest.Request ackRequest = SourceOuterClass.AckRequest
Expand All @@ -103,16 +118,31 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
// we should have read 10 messages and 2 eot messages
assertEquals(12, count);
// we should have read 10 messages
assertEquals(10, count);
assertTrue(handshake);
assertTrue(eot);
}
});

// Send handshake request
readRequestObserver.onNext(handshakeRequest);

// Send other read requests
readRequestObserver.onNext(request);

List<SourceOuterClass.AckResponse> ackResponses = new ArrayList<>();
StreamObserver<SourceOuterClass.AckRequest> ackRequestObserver = stub.ackFn(new StreamObserver<>() {
boolean handshake = false;
int count = 0;
@Override
public void onNext(SourceOuterClass.AckResponse ackResponse) {
if (ackResponse.hasHandshake() && ackResponse.getHandshake().getSot()) {
handshake = true;
return;
}
count++;
ackResponses.add(ackResponse);
}

@Override
Expand All @@ -121,9 +151,17 @@ public void onError(Throwable throwable) {

@Override
public void onCompleted() {
assertEquals(5, count);
assertTrue(handshake);
}
});

// Send handshake request
ackRequestObserver.onNext(SourceOuterClass.AckRequest.newBuilder()
.setHandshake(SourceOuterClass.Handshake.newBuilder().setSot(true).build())
.build());

// Send other ack requests
ackRequests.forEach(ackRequestObserver::onNext);

// get pending messages
Expand Down

0 comments on commit acf0406

Please sign in to comment.