Skip to content

Commit

Permalink
[issue-416] Retry close when InterruptedException is caught in Praveg…
Browse files Browse the repository at this point in the history
…a reader (#421)

Signed-off-by: Brian Zhou <[email protected]>
  • Loading branch information
crazyzhou authored Nov 6, 2020
1 parent cf5a5e2 commit bdc6d5a
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jacocoVersion=0.8.2

# Version and base tags can be overridden at build time.
connectorVersion=0.8.1-SNAPSHOT
pravegaVersion=0.8.0
pravegaVersion=0.8.1-2644.ca6290d-SNAPSHOT
apacheCommonsVersion=3.7

# flag to indicate if Pravega sub-module should be used instead of the version defined in 'pravegaVersion'
Expand Down
44 changes: 37 additions & 7 deletions src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package io.pravega.connectors.flink;

import io.pravega.client.EventStreamClientFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import io.pravega.client.ClientConfig;
import io.pravega.client.admin.ReaderGroupManager;
Expand Down Expand Up @@ -360,19 +361,48 @@ public void open(Configuration parameters) throws Exception {

@Override
public void close() throws Exception {
Throwable ex = null;
if (eventStreamClientFactory != null) {
log.info("Closing Pravega eventStreamClientFactory");
eventStreamClientFactory.close();
try {
log.info("Closing Pravega eventStreamClientFactory");
eventStreamClientFactory.close();
} catch (Throwable e) {
if (e instanceof InterruptedException) {
log.warn("Interrupted while waiting for eventStreamClientFactory to close, retrying ...");
eventStreamClientFactory.close();
} else {
ex = ExceptionUtils.firstOrSuppressed(e, ex);
}
}
}

if (readerGroupManager != null) {
log.info("Closing Pravega ReaderGroupManager");
readerGroupManager.close();
try {
readerGroupManager.close();
} catch (Throwable e) {
if (e instanceof InterruptedException) {
log.warn("Interrupted while waiting for ReaderGroupManager to close, retrying ...");
readerGroupManager.close();
} else {
ex = ExceptionUtils.firstOrSuppressed(e, ex);
}
}
}

if (readerGroup != null) {
log.info("Closing Pravega ReaderGroup");
readerGroup.close();
try {
log.info("Closing Pravega ReaderGroup");
readerGroup.close();
} catch (Throwable e) {
if (e instanceof InterruptedException) {
log.warn("Interrupted while waiting for ReaderGroup to close, retrying ...");
readerGroup.close();
} else {
ex = ExceptionUtils.firstOrSuppressed(e, ex);
}
}
}
if (ex != null && ex instanceof Exception) {
throw (Exception) ex;
}
}

Expand Down

0 comments on commit bdc6d5a

Please sign in to comment.