From c24f0bcaae26f99972a03306d1a470e24fae4795 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 5 Nov 2024 20:10:36 +0800 Subject: [PATCH 1/2] [Fix][Zeta] Remove duplicate `LIFECYCLE_WRITER_CLOSE` events --- .../FakeSourceToConsoleWithEventReportIT.java | 29 ++++++++++++------- .../server/task/flow/SinkFlowLifeCycle.java | 2 -- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java index 8e45bbf9de5..79a4dbe1f81 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleWithEventReportIT.java @@ -44,7 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -109,16 +109,23 @@ public void testEventReport() throws IOException, InterruptedException { arrayNode.elements().forEachRemaining(jsonNode -> events.add(jsonNode)); } } - Assertions.assertEquals(10, events.size()); - Set eventTypes = - events.stream().map(e -> e.get("eventType").asText()).collect(Collectors.toSet()); + Map eventMap = + events.stream() + .map(e -> e.get("eventType").asText()) + .collect(Collectors.groupingBy(e -> e, Collectors.summingInt(e -> 1))); Assertions.assertTrue( - eventTypes.containsAll( - Arrays.asList( - EventType.LIFECYCLE_ENUMERATOR_OPEN.name(), - EventType.LIFECYCLE_ENUMERATOR_CLOSE.name(), - EventType.LIFECYCLE_READER_OPEN.name(), - EventType.LIFECYCLE_READER_CLOSE.name(), - EventType.LIFECYCLE_WRITER_CLOSE.name()))); + eventMap.keySet() + .containsAll( + Arrays.asList( + EventType.LIFECYCLE_ENUMERATOR_OPEN.name(), + EventType.LIFECYCLE_ENUMERATOR_CLOSE.name(), + EventType.LIFECYCLE_READER_OPEN.name(), + EventType.LIFECYCLE_READER_CLOSE.name(), + EventType.LIFECYCLE_WRITER_CLOSE.name()))); + Assertions.assertEquals(2, eventMap.get(EventType.LIFECYCLE_READER_OPEN.name())); + Assertions.assertEquals(1, eventMap.get(EventType.LIFECYCLE_ENUMERATOR_OPEN.name())); + Assertions.assertEquals(1, eventMap.get(EventType.LIFECYCLE_ENUMERATOR_CLOSE.name())); + Assertions.assertEquals(2, eventMap.get(EventType.LIFECYCLE_READER_CLOSE.name())); + Assertions.assertEquals(2, eventMap.get(EventType.LIFECYCLE_WRITER_CLOSE.name())); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 295328d8210..cf056df4612 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -25,7 +25,6 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SinkWriter.Context; import org.apache.seatunnel.api.sink.SupportResourceShare; -import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -175,7 +174,6 @@ private Address getCommitterTaskAddress() throws ExecutionException, Interrupted public void close() throws IOException { super.close(); writer.close(); - writerContext.getEventListener().onEvent(new WriterCloseEvent()); try { if (resourceManager != null) { resourceManager.close(); From 32907bd3973f08f6a1f4013bc3e44b68f2488665 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 5 Nov 2024 20:23:18 +0800 Subject: [PATCH 2/2] [Fix][Zeta] Remove duplicate `LIFECYCLE_WRITER_CLOSE` events --- .../api/sink/multitablesink/MultiTableSinkWriter.java | 5 ----- .../seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java | 2 ++ 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java index f5b30be5370..e1e773ff061 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkWriter.java @@ -20,7 +20,6 @@ import org.apache.seatunnel.api.sink.MultiTableResourceManager; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; -import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.table.event.SchemaChangeEvent; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.tracing.MDCTracer; @@ -318,10 +317,6 @@ public void close() throws IOException { (identifier, sinkWriter) -> { try { sinkWriter.close(); - sinkWritersContext - .get(identifier) - .getEventListener() - .onEvent(new WriterCloseEvent()); } catch (Throwable e) { if (firstE[0] == null) { firstE[0] = e; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index cf056df4612..295328d8210 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SinkWriter.Context; import org.apache.seatunnel.api.sink.SupportResourceShare; +import org.apache.seatunnel.api.sink.event.WriterCloseEvent; import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; @@ -174,6 +175,7 @@ private Address getCommitterTaskAddress() throws ExecutionException, Interrupted public void close() throws IOException { super.close(); writer.close(); + writerContext.getEventListener().onEvent(new WriterCloseEvent()); try { if (resourceManager != null) { resourceManager.close();