Skip to content

Commit

Permalink
upgrade flink to 1.15.1
Browse files Browse the repository at this point in the history
  • Loading branch information
jszouxue committed Apr 20, 2023
1 parent 07280f8 commit dbfebe5
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 13 deletions.
20 changes: 7 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ under the License.
<maven.compiler.target>${java.version}</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<flink.version>1.14.4</flink.version>
<flink.version>1.15.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.15</slf4j.version>
<maven.shade.version>3.2.4</maven.shade.version>
Expand All @@ -37,26 +37,26 @@ under the License.

<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.2.6</version>
<version>1.2.7</version>
<packaging>jar</packaging>


<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>flink-test-utils</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand All @@ -71,7 +71,7 @@ under the License.

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
Expand All @@ -84,12 +84,6 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
Expand All @@ -106,7 +100,7 @@ under the License.
<!-- Either... -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,4 +567,59 @@ public void testSinkValueWithExpire() throws Exception {
Thread.sleep(wait * 1000);
Preconditions.condition(clusterCommands.exists("1") == 0, "");
}

@Test
public void testIncryBy() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String dim =
"create table sink_redis(name varchar, level bigint) with ( 'connector'='redis', "
+ "'cluster-nodes'='"
+ CLUSTERNODES
+ "','redis-mode'='cluster', 'password'='"
+ CLUSTER_PASSWORD
+ "','"
+ REDIS_COMMAND
+ "'='"
+ RedisCommand.INCRBY
+ "' )";

tEnv.executeSql(dim);
String sql = " insert into sink_redis select * from (values ('1', 1))";
tEnv.executeSql(sql);
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println(sql);

Preconditions.condition(clusterCommands.get("1").toString().equals("2"), "");
}

@Test
public void testIncryBy2() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String dim =
"create table sink_redis(name varchar, level bigint) with ( 'connector'='redis', "
+ "'cluster-nodes'='"
+ CLUSTERNODES
+ "','redis-mode'='cluster', 'password'='"
+ CLUSTER_PASSWORD
+ "','"
+ REDIS_COMMAND
+ "'='"
+ RedisCommand.INCRBY
+ "' )";

tEnv.executeSql(dim);
String sql = dim + "; insert into sink_redis select * from (values ('1', 1));";
System.out.println(tEnv.explainSql(sql));
tEnv.executeSql(sql);
TableResult tableResult = tEnv.executeSql(sql);
tableResult.getJobClient().get().getJobExecutionResult().get();
System.out.println(sql);

Preconditions.condition(clusterCommands.get("1").toString().equals("2"), "");
}
}

0 comments on commit dbfebe5

Please sign in to comment.