Skip to content

Commit

Permalink
feat: container-type level version compatibility check
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Sep 26, 2024
1 parent f66f67c commit cfbe276
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 3 deletions.
40 changes: 40 additions & 0 deletions src/main/java/io/numaproj/numaflow/info/ContainerType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.numaproj.numaflow.info;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* the string content matches the corresponding server info file name.
* DO NOT change it unless the server info file name is changed.
*/
public enum ContainerType {
Sourcer("sourcer"),
Sourcetransformer("sourcetransformer"),
Sinker("sinker"),
Mapper("mapper"),
Reducer("reducer"),
Reducestreamer("reducestreamer"),
Sessionreducer("sessionreducer"),
Sideinput("sideinput"),
Fbsinker("fb-sinker"),
Unknown("unknown");

private final String name;

ContainerType(String name) {
this.name = name;
}

@JsonValue
public String getName() {
return name;

Check warning on line 29 in src/main/java/io/numaproj/numaflow/info/ContainerType.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/info/ContainerType.java#L29

Added line #L29 was not covered by tests
}

public static ContainerType fromString(String text) {
for (ContainerType b : ContainerType.values()) {
if (b.name.equalsIgnoreCase(text)) {
return b;
}
}
return Unknown;

Check warning on line 38 in src/main/java/io/numaproj/numaflow/info/ContainerType.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/numaproj/numaflow/info/ContainerType.java#L38

Added line #L38 was not covered by tests
}
}
15 changes: 14 additions & 1 deletion src/main/java/io/numaproj/numaflow/info/ServerInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import java.util.Map;

import static java.util.Map.entry;

/**
* Server Information to be used by client to determine:
* - protocol: what is right protocol to use (UDS or TCP)
Expand All @@ -24,7 +26,18 @@ public class ServerInfo {
// Specify the minimum Numaflow version required by the current SDK version
// To update this value, please follow the instructions for MINIMUM_NUMAFLOW_VERSION in
// https://github.com/numaproj/numaflow-rs/blob/main/src/shared.rs
public static final String MINIMUM_NUMAFLOW_VERSION = "1.3.1-z";
public static final Map<ContainerType, String> MINIMUM_NUMAFLOW_VERSION = Map.ofEntries(
entry(ContainerType.Sourcer, "1.3.1-z"),
entry(ContainerType.Sourcetransformer, "1.3.1-z"),
entry(ContainerType.Sinker, "1.3.1-z"),
entry(ContainerType.Mapper, "1.3.1-z"),
entry(ContainerType.Reducer, "1.3.1-z"),
entry(ContainerType.Reducestreamer, "1.3.1-z"),
entry(ContainerType.Sessionreducer, "1.3.1-z"),
entry(ContainerType.Sideinput, "1.3.1-z"),
entry(ContainerType.Fbsinker, "1.3.1-z"),
entry(ContainerType.Unknown, "1.3.1-z")
);
@JsonProperty("protocol")
private Protocol protocol;
@JsonProperty("language")
Expand Down
22 changes: 21 additions & 1 deletion src/main/java/io/numaproj/numaflow/shared/GrpcServerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerDomainSocketChannel;
import io.netty.channel.unix.DomainSocketAddress;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.Language;
import io.numaproj.numaflow.info.Protocol;
import io.numaproj.numaflow.info.ServerInfo;
Expand All @@ -30,6 +31,8 @@
import java.util.HashMap;
import java.util.Map;

import static io.numaproj.numaflow.info.ServerInfo.MINIMUM_NUMAFLOW_VERSION;

/**
* GrpcServerUtils is the utility class for netty server channel.
*/
Expand Down Expand Up @@ -111,7 +114,7 @@ public static void writeServerInfo(
ServerInfo serverInfo = new ServerInfo(
Protocol.UDS_PROTOCOL,
Language.JAVA,
ServerInfo.MINIMUM_NUMAFLOW_VERSION,
MINIMUM_NUMAFLOW_VERSION.get(getContainerType(infoFilePath)),
serverInfoAccessor.getSDKVersion(),
metaData);
log.info("Writing server info {} to {}", serverInfo, infoFilePath);
Expand Down Expand Up @@ -185,4 +188,21 @@ private void handleException(
"netty-worker"))
.intercept(interceptor);
}

/**
* Returns the container type from the server info file path.
* serverInfoFilePath is in the format of "/var/run/numaflow/{ContainerType}-server-info"
*
* @param serverInfoFilePath the file path from which to extract the container type
*
* @return the ContainerType derived from the file path
*/
public static ContainerType getContainerType(String serverInfoFilePath) {
String fileName = Paths.get(serverInfoFilePath).getFileName().toString();
if (fileName.endsWith("-server-info")) {
String containerTypeName = fileName.substring(0, fileName.indexOf("-server-info"));
return ContainerType.fromString(containerTypeName);
}
return ContainerType.Unknown;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void given_writeServerInfo_when_read_then_returnExactSame() {
ServerInfo testServerInfo = new ServerInfo(
Protocol.TCP_PROTOCOL,
Language.JAVA,
ServerInfo.MINIMUM_NUMAFLOW_VERSION,
"1.3.1-z",
"0.4.3",
new HashMap<>() {{
put("key1", "value1");
Expand Down
10 changes: 10 additions & 0 deletions src/test/java/io/numaproj/numaflow/shared/GrpcServerUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.grpc.ServerBuilder;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.numaproj.numaflow.info.ContainerType;
import io.numaproj.numaflow.info.ServerInfoAccessor;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -35,6 +36,15 @@ public void testWriteServerInfo() throws Exception {
.write(Mockito.any(), Mockito.eq("infoFilePath"));
}

@Test
public void testGetContainerType() {
ContainerType expectMapper = GrpcServerUtils.getContainerType(
"/var/run/numaflow/mapper-server-info");
Assert.assertEquals(ContainerType.Mapper, expectMapper);
ContainerType expectUnknown = GrpcServerUtils.getContainerType("/var/run/numaflow/malformed");
Assert.assertEquals(ContainerType.Unknown, expectUnknown);
}

@Test
public void testCreateServerBuilder() {
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
Expand Down

0 comments on commit cfbe276

Please sign in to comment.