diff --git a/gradle.properties b/gradle.properties index b3434b40..f8a1d348 100644 --- a/gradle.properties +++ b/gradle.properties @@ -30,7 +30,7 @@ jacocoVersion=0.8.2 # Version and base tags can be overridden at build time. connectorVersion=0.8.1-SNAPSHOT -pravegaVersion=0.8.0 +pravegaVersion=0.8.1-2644.ca6290d-SNAPSHOT apacheCommonsVersion=3.7 # flag to indicate if Pravega sub-module should be used instead of the version defined in 'pravegaVersion' diff --git a/pravega b/pravega index e4c436ba..ca6290d7 160000 --- a/pravega +++ b/pravega @@ -1 +1 @@ -Subproject commit e4c436ba95f235ca4dfcfe4b5ec8c6c28c72f699 +Subproject commit ca6290d73e1de2aaf77b04b73012396d809c24e5 diff --git a/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java b/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java index fe6c578c..7b68d2e2 100644 --- a/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java +++ b/src/main/java/io/pravega/connectors/flink/FlinkPravegaReader.java @@ -10,6 +10,7 @@ package io.pravega.connectors.flink; import io.pravega.client.EventStreamClientFactory; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import io.pravega.client.ClientConfig; import io.pravega.client.admin.ReaderGroupManager; @@ -360,19 +361,48 @@ public void open(Configuration parameters) throws Exception { @Override public void close() throws Exception { + Throwable ex = null; if (eventStreamClientFactory != null) { - log.info("Closing Pravega eventStreamClientFactory"); - eventStreamClientFactory.close(); + try { + log.info("Closing Pravega eventStreamClientFactory"); + eventStreamClientFactory.close(); + } catch (Throwable e) { + if (e instanceof InterruptedException) { + log.warn("Interrupted while waiting for eventStreamClientFactory to close, retrying ..."); + eventStreamClientFactory.close(); + } else { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + } } - if (readerGroupManager != null) { log.info("Closing Pravega ReaderGroupManager"); - readerGroupManager.close(); + try { + readerGroupManager.close(); + } catch (Throwable e) { + if (e instanceof InterruptedException) { + log.warn("Interrupted while waiting for ReaderGroupManager to close, retrying ..."); + readerGroupManager.close(); + } else { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + } } - if (readerGroup != null) { - log.info("Closing Pravega ReaderGroup"); - readerGroup.close(); + try { + log.info("Closing Pravega ReaderGroup"); + readerGroup.close(); + } catch (Throwable e) { + if (e instanceof InterruptedException) { + log.warn("Interrupted while waiting for ReaderGroup to close, retrying ..."); + readerGroup.close(); + } else { + ex = ExceptionUtils.firstOrSuppressed(e, ex); + } + } + } + if (ex != null && ex instanceof Exception) { + throw (Exception) ex; } }