diff --git a/docs/en/connector-v2/source/Paimon.md b/docs/en/connector-v2/source/Paimon.md index 32155abde02..e586a4fd9d8 100644 --- a/docs/en/connector-v2/source/Paimon.md +++ b/docs/en/connector-v2/source/Paimon.md @@ -9,7 +9,7 @@ Read data from Apache Paimon. ## Key features - [x] [batch](../../concept/connector-v2-features.md) -- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [stream](../../concept/connector-v2-features.md) - [ ] [exactly-once](../../concept/connector-v2-features.md) - [ ] [column projection](../../concept/connector-v2-features.md) - [ ] [parallelism](../../concept/connector-v2-features.md) @@ -157,9 +157,30 @@ source { ``` ## Changelog +If you want to read the changelog of this connector, your sink table of paimon which mast has the options named `changelog-producer=input`, then you can refer to [Paimon changelog](https://paimon.apache.org/docs/master/primary-key-table/changelog-producer/). +Currently, we only support the `input` and `none` mode of changelog producer. If the changelog producer is `input`, the streaming read of the connector will generate -U,+U,+I,+D data. But if the changelog producer is `none`, the streaming read of the connector will generate +I,+U,+D data. -### next version +### Streaming read example +```hocon +env { + parallelism = 1 + job.mode = "Streaming" +} -- Add Paimon Source Connector -- Support projection for Paimon Source +source { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test" + } +} +sink { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test_sink" + paimon.table.primary-keys = "c_tinyint" + } +} +``` diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java index d0a0c4a7937..d5c31ff2354 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSource.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.source; +import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; @@ -26,18 +27,23 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.constants.JobMode; import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalog; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSourceConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.source.converter.SqlToPaimonPredicateConverter; +import org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.PaimonBatchSourceSplitEnumerator; +import org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator.PaimonStreamSourceSplitEnumerator; import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowTypeConverter; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.types.RowType; import net.sf.jsqlparser.statement.select.PlainSelect; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Objects; @@ -58,12 +64,12 @@ public class PaimonSource private Table paimonTable; - private Predicate predicate; - - private int[] projectionIndex; + private JobContext jobContext; private CatalogTable catalogTable; + protected final ReadBuilder readBuilder; + public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) { this.readonlyConfig = readonlyConfig; PaimonSourceConfig paimonSourceConfig = new PaimonSourceConfig(readonlyConfig); @@ -76,17 +82,22 @@ public PaimonSource(ReadonlyConfig readonlyConfig, PaimonCatalog paimonCatalog) PlainSelect plainSelect = convertToPlainSelect(filterSql); RowType paimonRowType = this.paimonTable.rowType(); String[] filedNames = paimonRowType.getFieldNames().toArray(new String[0]); + + Predicate predicate = null; + int[] projectionIndex = null; if (!Objects.isNull(plainSelect)) { - this.projectionIndex = convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect); + projectionIndex = convertSqlSelectToPaimonProjectionIndex(filedNames, plainSelect); if (!Objects.isNull(projectionIndex)) { this.catalogTable = paimonCatalog.getTableWithProjection(tablePath, projectionIndex); } - this.predicate = + predicate = SqlToPaimonPredicateConverter.convertSqlWhereToPaimonPredicate( paimonRowType, plainSelect); } - seaTunnelRowType = RowTypeConverter.convert(paimonRowType, projectionIndex); + this.seaTunnelRowType = RowTypeConverter.convert(paimonRowType, projectionIndex); + this.readBuilder = + paimonTable.newReadBuilder().withProjection(projectionIndex).withFilter(predicate); } @Override @@ -99,23 +110,34 @@ public List getProducedCatalogTables() { return Collections.singletonList(catalogTable); } + @Override + public void setJobContext(JobContext jobContext) { + this.jobContext = jobContext; + } + @Override public Boundedness getBoundedness() { - return Boundedness.BOUNDED; + return JobMode.BATCH.equals(jobContext.getJobMode()) + ? Boundedness.BOUNDED + : Boundedness.UNBOUNDED; } @Override public SourceReader createReader( SourceReader.Context readerContext) throws Exception { return new PaimonSourceReader( - readerContext, paimonTable, seaTunnelRowType, predicate, projectionIndex); + readerContext, paimonTable, seaTunnelRowType, readBuilder.newRead()); } @Override public SourceSplitEnumerator createEnumerator( SourceSplitEnumerator.Context enumeratorContext) throws Exception { - return new PaimonSourceSplitEnumerator( - enumeratorContext, paimonTable, predicate, projectionIndex); + if (getBoundedness() == Boundedness.BOUNDED) { + return new PaimonBatchSourceSplitEnumerator( + enumeratorContext, new LinkedList<>(), null, readBuilder.newScan(), 1); + } + return new PaimonStreamSourceSplitEnumerator( + enumeratorContext, new LinkedList<>(), null, readBuilder.newStreamScan(), 1); } @Override @@ -123,7 +145,19 @@ public SourceSplitEnumerator restoreEnumer SourceSplitEnumerator.Context enumeratorContext, PaimonSourceState checkpointState) throws Exception { - return new PaimonSourceSplitEnumerator( - enumeratorContext, paimonTable, checkpointState, predicate, projectionIndex); + if (getBoundedness() == Boundedness.BOUNDED) { + return new PaimonBatchSourceSplitEnumerator( + enumeratorContext, + checkpointState.getAssignedSplits(), + checkpointState.getCurrentSnapshotId(), + readBuilder.newScan(), + 1); + } + return new PaimonStreamSourceSplitEnumerator( + enumeratorContext, + checkpointState.getAssignedSplits(), + checkpointState.getCurrentSnapshotId(), + readBuilder.newStreamScan(), + 1); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java index 3cfa5ee8b90..50de479b4b2 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceReader.java @@ -17,18 +17,21 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.source; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.source.SourceReader; +import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowConverter; +import org.apache.seatunnel.connectors.seatunnel.paimon.utils.RowKindConverter; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.TableRead; import lombok.extern.slf4j.Slf4j; @@ -48,20 +51,14 @@ public class PaimonSourceReader implements SourceReader output) throws Exception { if (Objects.nonNull(split)) { // read logic try (final RecordReader reader = - table.newReadBuilder() - .withProjection(projection) - .withFilter(predicate) - .newRead() - .executeFilter() - .createReader(split.getSplit())) { + tableRead.executeFilter().createReader(split.getSplit())) { final RecordReaderIterator rowIterator = new RecordReaderIterator<>(reader); while (rowIterator.hasNext()) { @@ -94,16 +86,31 @@ public void pollNext(Collector output) throws Exception { final SeaTunnelRow seaTunnelRow = RowConverter.convert( row, seaTunnelRowType, ((FileStoreTable) table).schema()); + if (Boundedness.UNBOUNDED.equals(context.getBoundedness())) { + RowKind rowKind = + RowKindConverter.convertPaimonRowKind2SeatunnelRowkind( + row.getRowKind()); + if (rowKind != null) { + seaTunnelRow.setRowKind(rowKind); + } + } output.collect(seaTunnelRow); } } - } else if (noMoreSplit && sourceSplits.isEmpty()) { + } + + if (noMoreSplit + && sourceSplits.isEmpty() + && Boundedness.BOUNDED.equals(context.getBoundedness())) { // signal to the source that we have reached the end of the data. - log.info("Closed the bounded flink table store source"); + log.info("Closed the bounded table store source"); context.signalNoMoreElement(); } else { - log.warn("Waiting for flink table source split, sleeping 1s"); - Thread.sleep(1000L); + context.sendSplitRequest(); + if (sourceSplits.isEmpty()) { + log.debug("Waiting for table source split, sleeping 1s"); + Thread.sleep(1000L); + } } } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java index d7b7b96f48f..eba167eadd0 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplit.java @@ -21,22 +21,22 @@ import org.apache.paimon.table.source.Split; +import lombok.AllArgsConstructor; +import lombok.Getter; + /** Paimon source split, wrapped the {@link Split} of paimon table. */ +@Getter +@AllArgsConstructor public class PaimonSourceSplit implements SourceSplit { private static final long serialVersionUID = 1L; - private final Split split; + /** The unique ID of the split. Unique within the scope of this source. */ + private final String id; - public PaimonSourceSplit(Split split) { - this.split = split; - } + private final Split split; @Override public String splitId() { return split.toString(); } - - public Split getSplit() { - return split; - } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java deleted file mode 100644 index 7b0f14c3ab8..00000000000 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.paimon.source; - -import org.apache.seatunnel.api.source.SourceSplitEnumerator; - -import org.apache.paimon.predicate.Predicate; -import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.Split; - -import lombok.extern.slf4j.Slf4j; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -/** Paimon source split enumerator, used to calculate the splits for every reader. */ -@Slf4j -public class PaimonSourceSplitEnumerator - implements SourceSplitEnumerator { - - /** Source split enumerator context */ - private final Context context; - - /** The splits that has assigned */ - private final Set assignedSplit; - - /** The splits that have not assigned */ - private Set pendingSplit; - - /** The table that wants to read */ - private final Table table; - - private final Predicate predicate; - - private int[] projection; - - public PaimonSourceSplitEnumerator( - Context context, - Table table, - Predicate predicate, - int[] projection) { - this.context = context; - this.table = table; - this.assignedSplit = new HashSet<>(); - this.predicate = predicate; - this.projection = projection; - } - - public PaimonSourceSplitEnumerator( - Context context, - Table table, - PaimonSourceState sourceState, - Predicate predicate, - int[] projection) { - this.context = context; - this.table = table; - this.assignedSplit = sourceState.getAssignedSplits(); - this.predicate = predicate; - this.projection = projection; - } - - @Override - public void open() { - this.pendingSplit = new HashSet<>(); - } - - @Override - public void run() throws Exception { - // do nothing - } - - @Override - public void close() throws IOException { - // do nothing - } - - @Override - public void addSplitsBack(List splits, int subtaskId) { - if (!splits.isEmpty()) { - pendingSplit.addAll(splits); - assignSplit(subtaskId); - } - } - - @Override - public int currentUnassignedSplitSize() { - return pendingSplit.size(); - } - - @Override - public void registerReader(int subtaskId) { - pendingSplit = getTableSplits(); - assignSplit(subtaskId); - } - - @Override - public PaimonSourceState snapshotState(long checkpointId) throws Exception { - return new PaimonSourceState(assignedSplit); - } - - @Override - public void notifyCheckpointComplete(long checkpointId) throws Exception { - // do nothing - } - - @Override - public void handleSplitRequest(int subtaskId) { - // do nothing - } - - /** Assign split by reader task id */ - private void assignSplit(int taskId) { - ArrayList currentTaskSplits = new ArrayList<>(); - if (context.currentParallelism() == 1) { - // if parallelism == 1, we should assign all the splits to reader - currentTaskSplits.addAll(pendingSplit); - } else { - // if parallelism > 1, according to hashCode of split's id to determine whether to - // allocate the current task - for (PaimonSourceSplit fileSourceSplit : pendingSplit) { - final int splitOwner = - getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism()); - if (splitOwner == taskId) { - currentTaskSplits.add(fileSourceSplit); - } - } - } - // assign splits - context.assignSplit(taskId, currentTaskSplits); - // save the state of assigned splits - assignedSplit.addAll(currentTaskSplits); - // remove the assigned splits from pending splits - currentTaskSplits.forEach(split -> pendingSplit.remove(split)); - log.info( - "SubTask {} is assigned to [{}]", - taskId, - currentTaskSplits.stream() - .map(PaimonSourceSplit::splitId) - .collect(Collectors.joining(","))); - context.signalNoMoreSplits(taskId); - } - - /** Get all splits of table */ - private Set getTableSplits() { - final Set tableSplits = new HashSet<>(); - final List splits = - table.newReadBuilder() - .withProjection(projection) - .withFilter(predicate) - .newScan() - .plan() - .splits(); - splits.forEach(split -> tableSplits.add(new PaimonSourceSplit(split))); - return tableSplits; - } - - /** Hash algorithm for assigning splits to readers */ - private static int getSplitOwner(String tp, int numReaders) { - return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; - } -} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java new file mode 100644 index 00000000000..93b64c13a21 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitGenerator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.source; + +import org.apache.paimon.table.source.TableScan; + +import java.util.List; +import java.util.stream.Collectors; + +public class PaimonSourceSplitGenerator { + /** + * The current Id as a mutable string representation. This covers more values than the integer + * value range, so we should never overflow. + */ + private final char[] currentId = "0000000000".toCharArray(); + + public List createSplits(TableScan.Plan plan) { + return plan.splits().stream() + .map(s -> new PaimonSourceSplit(getNextId(), s)) + .collect(Collectors.toList()); + } + + protected final String getNextId() { + // because we just increment numbers, we increment the char representation directly, + // rather than incrementing an integer and converting it to a string representation + // every time again (requires quite some expensive conversion logic). + incrementCharArrayByOne(currentId, currentId.length - 1); + return new String(currentId); + } + + private static void incrementCharArrayByOne(char[] array, int pos) { + if (pos < 0) { + throw new RuntimeException("Produce too many splits."); + } + + char c = array[pos]; + c++; + + if (c > '9') { + c = '0'; + incrementCharArrayByOne(array, pos - 1); + } + array[pos] = c; + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java index c8336b0d03c..db6392520c3 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java @@ -17,21 +17,22 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.source; +import lombok.AllArgsConstructor; +import lombok.Getter; + +import javax.annotation.Nullable; + import java.io.Serializable; -import java.util.Set; +import java.util.Deque; /** Paimon connector source state, saves the splits has assigned to readers. */ +@Getter +@AllArgsConstructor public class PaimonSourceState implements Serializable { private static final long serialVersionUID = 1L; - private final Set assignedSplits; - - public PaimonSourceState(Set assignedSplits) { - this.assignedSplits = assignedSplits; - } + private final Deque assignedSplits; - public Set getAssignedSplits() { - return assignedSplits; - } + private final @Nullable Long currentSnapshotId; } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java new file mode 100644 index 00000000000..278381a24a9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/AbstractSplitEnumerator.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator; + +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplitGenerator; +import org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState; + +import org.apache.paimon.table.source.EndOfScanException; +import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.table.source.TableScan; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +@Slf4j +public abstract class AbstractSplitEnumerator + implements SourceSplitEnumerator { + + /** Source split enumerator context */ + protected final Context context; + + protected final Set readersAwaitingSplit; + + protected final PaimonSourceSplitGenerator splitGenerator; + + /** The splits that have not assigned */ + protected Deque pendingSplits; + + protected final TableScan tableScan; + + private final int splitMaxNum; + + @Nullable protected Long nextSnapshotId; + + protected boolean finished = false; + + private ExecutorService executorService; + + public AbstractSplitEnumerator( + Context context, + Deque pendingSplits, + @Nullable Long nextSnapshotId, + TableScan tableScan, + int splitMaxPerTask) { + this.context = context; + this.pendingSplits = new LinkedList<>(pendingSplits); + this.nextSnapshotId = nextSnapshotId; + this.readersAwaitingSplit = new LinkedHashSet<>(); + this.splitGenerator = new PaimonSourceSplitGenerator(); + this.tableScan = tableScan; + this.splitMaxNum = context.currentParallelism() * splitMaxPerTask; + this.executorService = + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setNameFormat("Seatunnel-PaimonSourceSplitEnumerator-%d") + .build()); + if (tableScan instanceof StreamTableScan && nextSnapshotId != null) { + ((StreamTableScan) tableScan).restore(nextSnapshotId); + } + } + + @Override + public void open() {} + + @Override + public void run() throws Exception { + loadNewSplits(); + } + + @Override + public void close() throws IOException { + if (Objects.nonNull(executorService) && !executorService.isShutdown()) { + executorService.shutdown(); + } + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + log.debug("Paimon Source Enumerator adds splits back: {}", splits); + this.pendingSplits.addAll(splits); + if (context.registeredReaders().contains(subtaskId)) { + assignSplits(); + } + } + + @Override + public int currentUnassignedSplitSize() { + return pendingSplits.size(); + } + + @Override + public void registerReader(int subtaskId) { + readersAwaitingSplit.add(subtaskId); + } + + @Override + public PaimonSourceState snapshotState(long checkpointId) throws Exception { + return new PaimonSourceState(pendingSplits, nextSnapshotId); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} + + private void addSplits(Collection newSplits) { + this.pendingSplits.addAll(newSplits); + } + + /** + * Method should be synchronized because {@link #handleSplitRequest} and {@link + * #processDiscoveredSplits} have thread conflicts. + */ + protected synchronized void assignSplits() { + Iterator pendingReaderIterator = readersAwaitingSplit.iterator(); + while (pendingReaderIterator.hasNext()) { + Integer pendingReader = pendingReaderIterator.next(); + if (!context.registeredReaders().contains(pendingReader)) { + pendingReaderIterator.remove(); + continue; + } + LinkedList assignedTaskSplits = new LinkedList<>(); + for (PaimonSourceSplit fileSourceSplit : pendingSplits) { + final int splitOwner = + getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism()); + if (splitOwner == pendingReader) { + assignedTaskSplits.add(fileSourceSplit); + } + } + + if (!assignedTaskSplits.isEmpty()) { + log.info("Assign splits {} to reader {}", assignedTaskSplits, pendingReader); + try { + context.assignSplit(pendingReader, assignedTaskSplits); + // remove the assigned splits from pending splits + assignedTaskSplits.forEach(pendingSplits::remove); + } catch (Exception e) { + log.error( + "Failed to assign splits {} to reader {}", + assignedTaskSplits, + pendingReader, + e); + pendingSplits.addAll(assignedTaskSplits); + } + } + } + } + + protected void loadNewSplits() { + CompletableFuture.supplyAsync(this::scanNextSnapshot, executorService) + .whenComplete(this::processDiscoveredSplits); + } + + /** Hash algorithm for assigning splits to readers */ + protected static int getSplitOwner(String tp, int numReaders) { + return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; + } + + // ------------------------------------------------------------------------ + + // This need to be synchronized because scan object is not thread safe. handleSplitRequest and + // CompletableFuture.supplyAsync will invoke this. + protected synchronized Optional scanNextSnapshot() { + if (pendingSplits.size() >= splitMaxNum) { + return Optional.empty(); + } + TableScan.Plan plan = tableScan.plan(); + Long nextSnapshotId = null; + if (tableScan instanceof StreamTableScan) { + nextSnapshotId = ((StreamTableScan) tableScan).checkpoint(); + } + return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId)); + } + + // This method could not be synchronized, because it runs in coordinatorThread, which will make + // it serializable execution. + protected void processDiscoveredSplits( + Optional planWithNextSnapshotIdOptional, Throwable error) { + if (error != null) { + if (error instanceof EndOfScanException) { + log.debug("Catching EndOfStreamException, the stream is finished."); + finished = true; + assignSplits(); + } else { + log.error("Failed to enumerate files", error); + throw new SeaTunnelException(error); + } + return; + } + if (!planWithNextSnapshotIdOptional.isPresent()) { + return; + } + PlanWithNextSnapshotId planWithNextSnapshotId = planWithNextSnapshotIdOptional.get(); + nextSnapshotId = planWithNextSnapshotId.nextSnapshotId; + TableScan.Plan plan = planWithNextSnapshotId.plan; + + if (plan.splits().isEmpty()) { + return; + } + + addSplits(splitGenerator.createSplits(plan)); + assignSplits(); + } + + /** The result of scan. */ + @Getter + protected static class PlanWithNextSnapshotId { + private final TableScan.Plan plan; + private final Long nextSnapshotId; + + public PlanWithNextSnapshotId(TableScan.Plan plan, Long nextSnapshotId) { + this.plan = plan; + this.nextSnapshotId = nextSnapshotId; + } + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java new file mode 100644 index 00000000000..b00b38587a8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonBatchSourceSplitEnumerator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator; + +import org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit; +import org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceState; + +import org.apache.paimon.table.source.TableScan; + +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nullable; + +import java.util.Deque; +import java.util.Set; + +/** Paimon source split enumerator, used to calculate the splits for every reader. */ +@Slf4j +public class PaimonBatchSourceSplitEnumerator extends AbstractSplitEnumerator { + + public PaimonBatchSourceSplitEnumerator( + Context context, + Deque pendingSplits, + @Nullable Long nextSnapshotId, + TableScan tableScan, + int splitMaxPerTask) { + super(context, pendingSplits, nextSnapshotId, tableScan, splitMaxPerTask); + } + + @Override + public void run() throws Exception { + this.processDiscoveredSplits(this.scanNextSnapshot(), null); + Set readers = context.registeredReaders(); + log.debug( + "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); + } + + @Override + public PaimonSourceState snapshotState(long checkpointId) throws Exception { + return new PaimonSourceState(pendingSplits, null); + } + + @Override + public void handleSplitRequest(int subtaskId) { + // do nothing + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java new file mode 100644 index 00000000000..2cce57be931 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/enumerator/PaimonStreamSourceSplitEnumerator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.source.enumerator; + +import org.apache.seatunnel.connectors.seatunnel.paimon.source.PaimonSourceSplit; + +import org.apache.paimon.table.source.TableScan; + +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nullable; + +import java.util.Deque; + +/** Paimon source split enumerator, used to calculate the splits for every reader. */ +@Slf4j +public class PaimonStreamSourceSplitEnumerator extends AbstractSplitEnumerator { + + public PaimonStreamSourceSplitEnumerator( + Context context, + Deque pendingSplits, + @Nullable Long nextSnapshotId, + TableScan tableScan, + int splitMaxPerTask) { + super(context, pendingSplits, nextSnapshotId, tableScan, splitMaxPerTask); + } + + @Override + public void handleSplitRequest(int subtaskId) { + readersAwaitingSplit.add(subtaskId); + assignSplits(); + if (readersAwaitingSplit.contains(subtaskId)) { + loadNewSplits(); + } + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java index adb77c637da..4a3833e6a00 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/utils/RowKindConverter.java @@ -19,12 +19,11 @@ import org.apache.seatunnel.api.table.type.RowKind; -import org.apache.paimon.data.InternalRow; - public class RowKindConverter { /** - * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link InternalRow} + * Convert SeaTunnel RowKind {@link RowKind} to Paimon RowKind {@link + * org.apache.paimon.types.RowKind} * * @param seaTunnelRowKind The kind of change that a row describes in a changelog. * @return @@ -44,4 +43,27 @@ public static org.apache.paimon.types.RowKind convertSeaTunnelRowKind2PaimonRowK return null; } } + + /** + * Convert Paimon RowKind {@link org.apache.paimon.types.RowKind} to SeaTunnel RowKind {@link + * RowKind} + * + * @param paimonRowKind + * @return + */ + public static RowKind convertPaimonRowKind2SeatunnelRowkind( + org.apache.paimon.types.RowKind paimonRowKind) { + switch (paimonRowKind) { + case DELETE: + return RowKind.DELETE; + case UPDATE_AFTER: + return RowKind.UPDATE_AFTER; + case UPDATE_BEFORE: + return RowKind.UPDATE_BEFORE; + case INSERT: + return RowKind.INSERT; + default: + return null; + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java new file mode 100644 index 00000000000..7da7afaa0b4 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonRecordWithFullType.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.e2e.connector.paimon; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.Timestamp; + +import lombok.AllArgsConstructor; +import lombok.Data; + +import java.util.Map; + +@Data +@AllArgsConstructor +public class PaimonRecordWithFullType { + public Map c_map; + public int[] c_array; + public BinaryString c_string; + public boolean c_boolean; + public short c_tinyint; + public short c_smallint; + public int c_int; + public long c_bigint; + public float c_float; + public double c_double; + public Decimal c_decimal; + public BinaryString c_bytes; + public int c_date; + public Timestamp c_timestamp; +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java index 0168cc8f534..dc6bfc9eba3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkCDCIT.java @@ -637,7 +637,7 @@ private List loadPaimonData(String dbName, String tbName) throws E return result; } - private Table getTable(String dbName, String tbName) { + protected Table getTable(String dbName, String tbName) { try { return getCatalog().getTable(getIdentifier(dbName, tbName)); } catch (Catalog.TableNotExistException e) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java new file mode 100644 index 00000000000..ede9f7c3b38 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonStreamReadIT.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.paimon; + +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; + +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.given; + +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = + "Spark and Flink engine can not auto create paimon table on worker node in local file(e.g flink tm) by savemode feature which can lead error") +@Slf4j +public class PaimonStreamReadIT extends PaimonSinkCDCIT { + + @TestTemplate + public void testStreamReadPaimon(TestContainer container) throws Exception { + Container.ExecResult writeResult = + container.executeJob("/fake_to_paimon_with_full_type.conf"); + Assertions.assertEquals(0, writeResult.getExitCode()); + + CompletableFuture.runAsync( + () -> { + try { + container.executeJob("/paimon_to_paimon.conf"); + } catch (Exception e) { + throw new SeaTunnelException(e); + } + }); + + given().ignoreExceptions() + .await() + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(400L, TimeUnit.SECONDS) + .untilAsserted( + () -> { + container.executeExtraCommands(containerExtendedFactory); + List paimonSourceRecords = + loadPaimonDataWithFullType("full_type", "st_test"); + List paimonSinkRecords = + loadPaimonDataWithFullType("full_type", "st_test_sink"); + Assertions.assertEquals( + paimonSourceRecords.size(), paimonSinkRecords.size()); + Assertions.assertIterableEquals(paimonSourceRecords, paimonSinkRecords); + }); + + // write cdc data + Container.ExecResult writeResult1 = + container.executeJob("/fake_to_paimon_with_full_type_cdc_data.conf"); + Assertions.assertEquals(0, writeResult1.getExitCode()); + + given().ignoreExceptions() + .await() + .atLeast(100L, TimeUnit.MILLISECONDS) + .atMost(400L, TimeUnit.SECONDS) + .untilAsserted( + () -> { + container.executeExtraCommands(containerExtendedFactory); + List paimonSourceRecords = + loadPaimonDataWithFullType("full_type", "st_test"); + List paimonSinkRecords = + loadPaimonDataWithFullType("full_type", "st_test_sink"); + Assertions.assertEquals( + paimonSourceRecords.size(), paimonSinkRecords.size()); + Assertions.assertIterableEquals(paimonSourceRecords, paimonSinkRecords); + }); + } + + protected List loadPaimonDataWithFullType( + String dbName, String tbName) { + FileStoreTable table = (FileStoreTable) getTable(dbName, tbName); + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + TableRead tableRead = readBuilder.newRead(); + List result = new ArrayList<>(); + try (RecordReader reader = tableRead.createReader(plan)) { + reader.forEachRemaining( + row -> { + InternalMap internalMap = row.getMap(0); + InternalArray keyArray = internalMap.keyArray(); + InternalArray valueArray = internalMap.valueArray(); + HashMap map = new HashMap<>(internalMap.size()); + for (int i = 0; i < internalMap.size(); i++) { + map.put(keyArray.getString(i), valueArray.getString(i)); + } + InternalArray internalArray = row.getArray(1); + int[] intArray = internalArray.toIntArray(); + PaimonRecordWithFullType paimonRecordWithFullType = + new PaimonRecordWithFullType( + map, + intArray, + row.getString(2), + row.getBoolean(3), + row.getShort(4), + row.getShort(5), + row.getInt(6), + row.getLong(7), + row.getFloat(8), + row.getDouble(9), + row.getDecimal(10, 30, 8), + row.getString(11), + row.getInt(12), + row.getTimestamp(13, 6)); + result.add(paimonRecordWithFullType); + }); + } catch (IOException e) { + throw new SeaTunnelException(e); + } + return result; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf new file mode 100644 index 00000000000..c5b881c2ce6 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon_with_full_type_cdc_data.conf @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + primaryKey { + name = "c_tinyint" + columnNames = [c_tinyint] + } + } + rows = [ + { + kind = INSERT + fields = [{"a": "b"}, [101], "c_string", true, 121, 15987, 563873951, 7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", "bWlJWmo=", "2023-04-21", "2023-04-21T23:20:58"] + } + { + kind = INSERT + fields = [{"a": "b"}, [101], "c_string1", true, 122, 15987, 563873952, 7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", "bWlJWmo=", "2023-04-25", "2023-04-25T23:20:58"] + } + { + kind = UPDATE_BEFORE + fields = [{"a": "c"}, [102], "c_string2", true, 117, 15987, 563873953, 7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212", "bWlJWmo=", "2023-04-26", "2023-04-26T23:20:58"] + } + { + kind = UPDATE_AFTER + fields = [{"a": "e"}, [103], "c_string3", false, 117, 15989, 563873951, 7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", "bWlJWmo=", "2023-04-27", "2023-04-27T23:20:58"] + } + { + kind = DELETE + fields = [{"a": "e"}, [103], "c_string2", true, 119, 15987, 563873953, 7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", "bWlJWmo=", "2023-04-23", "2023-04-23T23:20:58"] + } + ] + result_table_name = "fake" + } +} + +sink { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf new file mode 100644 index 00000000000..50728871afc --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_paimon.conf @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "Streaming" +} + +source { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test" + } +} + +sink { + Paimon { + warehouse = "/tmp/paimon" + database = "full_type" + table = "st_test_sink" + paimon.table.primary-keys = "c_tinyint" + } +}