From 8fd43693df4ceb14002ca0c3961ea4c74a631f03 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 13 Sep 2024 18:25:56 +0800 Subject: [PATCH] [Fix][Connector-V2] Fix some throwable error not be caught --- .../external/IncrementalSourceScanFetcher.java | 2 +- .../external/IncrementalSourceStreamFetcher.java | 2 +- .../rocketmq/source/RocketMqSourceReader.java | 4 ++-- .../seatunnel/sls/source/SlsSourceReader.java | 15 +++------------ .../storage/api/AbstractCheckpointStorage.java | 2 +- 5 files changed, 8 insertions(+), 17 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java index da048b47e54..7f927af5878 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -93,7 +93,7 @@ public void submitTask(FetchTask fetchTask) { currentSnapshotSplit, taskContext.isExactlyOnce()); snapshotSplitReadTask.execute(taskContext); - } catch (Exception e) { + } catch (Throwable e) { log.error( String.format( "Execute snapshot read task for snapshot split %s fail", diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 338cb657b3b..16e45376566 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -104,7 +104,7 @@ public void submitTask(FetchTask fetchTask) { currentIncrementalSplit, taskContext.isExactlyOnce()); streamFetchTask.execute(taskContext); - } catch (Exception e) { + } catch (Throwable e) { log.error( String.format( "Execute stream read task for incremental split %s fail", diff --git a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java index 90bc8f32315..0dfa0b179d1 100644 --- a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java +++ b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java @@ -98,7 +98,7 @@ public void pollNext(Collector output) throws Exception { Thread.sleep(THREAD_WAIT_TIME); return; } - while (pendingPartitionsQueue.size() != 0) { + while (!pendingPartitionsQueue.isEmpty()) { sourceSplits.add(pendingPartitionsQueue.poll()); } sourceSplits.forEach( @@ -166,7 +166,7 @@ record -> // just for bounded mode sourceSplit.setEndOffset(lastOffset); } - } catch (Exception e) { + } catch (Throwable e) { completableFuture.completeExceptionally(e); } completableFuture.complete(null); diff --git a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java index 43cb75328a1..819b3b07d60 100644 --- a/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java +++ b/seatunnel-connectors-v2/connector-sls/src/main/java/org/apache/seatunnel/connectors/seatunnel/sls/source/SlsSourceReader.java @@ -127,12 +127,8 @@ public void pollNext(Collector collector) throws Exception { sourceSplit.setStartCursor( response.getNextCursor()); completableFuture.complete(true); - } catch (LogException e) { - e.printStackTrace(); - completableFuture.completeExceptionally(e); - throw new RuntimeException(e); - } catch (IOException e) { - e.printStackTrace(); + } catch (Throwable e) { + log.error("pull logs failed", e); completableFuture.completeExceptionally(e); throw new RuntimeException(e); } @@ -141,11 +137,7 @@ public void pollNext(Collector collector) throws Exception { if (completableFuture.get()) { finishedSplits.add(sourceSplit); } - } catch (InterruptedException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - e.printStackTrace(); + } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); @@ -215,7 +207,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { slsSourceSplit .getStartCursor()); } catch (LogException e) { - e.printStackTrace(); log.error( "LogException: commit cursor to sls failed", e); diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java index a466408d9c0..ca912ee2b70 100644 --- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-api/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/api/AbstractCheckpointStorage.java @@ -195,7 +195,7 @@ public void asyncStoreCheckPoint(PipelineState state) { () -> { try { storeCheckPoint(state); - } catch (Exception e) { + } catch (Throwable e) { log.error( String.format( "store checkpoint failed, job id : %s, pipeline id : %d",