From dbfebe545487f86cf9d4a311f9cbbdacb3e74f0e Mon Sep 17 00:00:00 2001 From: zou Date: Thu, 20 Apr 2023 17:51:07 +0800 Subject: [PATCH] upgrade flink to 1.15.1 --- pom.xml | 20 +++---- .../connectors/redis/table/SQLTest.java | 55 +++++++++++++++++++ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/pom.xml b/pom.xml index f472381..cdaa64f 100644 --- a/pom.xml +++ b/pom.xml @@ -28,7 +28,7 @@ under the License. ${java.version} UTF-8 UTF-8 - 1.14.4 + 1.15.1 2.12 1.7.15 3.2.4 @@ -37,26 +37,26 @@ under the License. io.github.jeff-zou flink-connector-redis - 1.2.6 + 1.2.7 jar org.apache.flink - flink-test-utils_${scala.binary.version} + flink-test-utils ${flink.version} test org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} provided org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-table-api-java-bridge ${flink.version} provided @@ -71,7 +71,7 @@ under the License. org.apache.flink - flink-streaming-java_${scala.binary.version} + flink-streaming-java ${flink.version} test test-jar @@ -84,12 +84,6 @@ under the License. ${flink.version} provided - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided - org.apache.flink flink-streaming-scala_${scala.binary.version} @@ -106,7 +100,7 @@ under the License. org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} + flink-table-api-java-bridge ${flink.version} provided diff --git a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java index c045cde..94a0ec9 100644 --- a/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java +++ b/src/test/java/org/apache/flink/streaming/connectors/redis/table/SQLTest.java @@ -567,4 +567,59 @@ public void testSinkValueWithExpire() throws Exception { Thread.sleep(wait * 1000); Preconditions.condition(clusterCommands.exists("1") == 0, ""); } + + @Test + public void testIncryBy() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String dim = + "create table sink_redis(name varchar, level bigint) with ( 'connector'='redis', " + + "'cluster-nodes'='" + + CLUSTERNODES + + "','redis-mode'='cluster', 'password'='" + + CLUSTER_PASSWORD + + "','" + + REDIS_COMMAND + + "'='" + + RedisCommand.INCRBY + + "' )"; + + tEnv.executeSql(dim); + String sql = " insert into sink_redis select * from (values ('1', 1))"; + tEnv.executeSql(sql); + TableResult tableResult = tEnv.executeSql(sql); + tableResult.getJobClient().get().getJobExecutionResult().get(); + System.out.println(sql); + + Preconditions.condition(clusterCommands.get("1").toString().equals("2"), ""); + } + + @Test + public void testIncryBy2() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String dim = + "create table sink_redis(name varchar, level bigint) with ( 'connector'='redis', " + + "'cluster-nodes'='" + + CLUSTERNODES + + "','redis-mode'='cluster', 'password'='" + + CLUSTER_PASSWORD + + "','" + + REDIS_COMMAND + + "'='" + + RedisCommand.INCRBY + + "' )"; + + tEnv.executeSql(dim); + String sql = dim + "; insert into sink_redis select * from (values ('1', 1));"; + System.out.println(tEnv.explainSql(sql)); + tEnv.executeSql(sql); + TableResult tableResult = tEnv.executeSql(sql); + tableResult.getJobClient().get().getJobExecutionResult().get(); + System.out.println(sql); + + Preconditions.condition(clusterCommands.get("1").toString().equals("2"), ""); + } }