Skip to content

Commit

Permalink
close connection with join
Browse files Browse the repository at this point in the history
  • Loading branch information
jeff-zou committed Nov 5, 2024
1 parent 6ac8f3b commit 942ab18
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 31 deletions.
6 changes: 3 additions & 3 deletions README-en.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ After executing mvn package on the command line, import the generated package fl


<br/>
The project depends on Lettuce(6.2.1) and netty-transport-native-epoll(4.1.82.Final),flink-connection-redis-1.4.2.jar if these packages are available.
Otherwise, use flink-connector-redis-1.4.2-jar-with-dependencies.jar.
The project depends on Lettuce(6.2.1) and netty-transport-native-epoll(4.1.82.Final),flink-connection-redis-1.4.3.jar if these packages are available.
Otherwise, use flink-connector-redis-1.4.3-jar-with-dependencies.jar.
<br/>

Development environment engineering direct reference:
Expand All @@ -55,7 +55,7 @@ Development environment engineering direct reference:
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.4.2</version>
<version>1.4.3</version>
<!-- When the Lettuce netty-transport-native-epoll dependency is not imported separately -->
<!-- <classifier>jar-with-dependencies</classifier>-->
</dependency>
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@

# 2 使用方法:
## 2.1 工程直接引用
项目依赖Lettuce(6.2.1)及netty-transport-native-epoll(4.1.82.Final),如flink环境有这两个包,则使用flink-connector-redis-1.4.2.jar,
否则使用flink-connector-redis-1.4.2-jar-with-dependencies.jar。
项目依赖Lettuce(6.2.1)及netty-transport-native-epoll(4.1.82.Final),如flink环境有这两个包,则使用flink-connector-redis-1.4.3.jar,
否则使用flink-connector-redis-1.4.3-jar-with-dependencies.jar。
<br/>
```
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<!-- 没有单独引入项目依赖Lettuce netty-transport-native-epoll依赖时 -->
<!-- <classifier>jar-with-dependencies</classifier>-->
<version>1.4.2</version>
<version>1.4.3</version>
</dependency>
```
## 2.2 自行打包
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@

package org.apache.flink.streaming.connectors.redis.container;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.lettuce.core.Range;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/** Redis command container if we want to connect to a Redis cluster. */
public class RedisClusterContainer implements RedisCommandsContainer, Closeable {
Expand Down Expand Up @@ -67,13 +66,7 @@ public void open() {
/** Closes the {@link RedisClusterClient}. */
@Override
public void close() {
try {
CompletableFuture completableFuture = this.connection.closeAsync();
completableFuture.get();
LOG.info("close async connection success!");
} catch (Exception e) {
LOG.error("close async connection error!", e);
}
this.connection.close();
this.redisClusterClient.shutdown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@

package org.apache.flink.streaming.connectors.redis.container;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.lettuce.core.Range;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Redis command container if we want to connect to a single Redis server or to Redis sentinels If
Expand All @@ -43,8 +42,7 @@ public class RedisContainer implements RedisCommandsContainer, Closeable {
private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(RedisContainer.class);

private transient RedisClient redisClient;
private final transient RedisClient redisClient;
protected transient StatefulRedisConnection<String, String> connection;
protected transient RedisAsyncCommands asyncCommands;

Expand All @@ -60,14 +58,8 @@ public RedisContainer(RedisClient redisClient) {
/** Closes the redisClient instances. */
@Override
public void close() {
try {
CompletableFuture completableFuture = connection.closeAsync();
completableFuture.get();
LOG.info("close async connection success!");
} catch (Exception e) {
LOG.info("close async connection error!", e);
}
redisClient.shutdown();
this.connection.close();
this.redisClient.shutdown();
}

@Override
Expand Down

0 comments on commit 942ab18

Please sign in to comment.