diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/ContinuousStreamSplitEnumeratorContext.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/ContinuousStreamSplitEnumeratorContext.java new file mode 100644 index 000000000000..3ca5b1cc7dab --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/ContinuousStreamSplitEnumeratorContext.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.spark.source.partition.continuous; + +import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext; +import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.event.EventListener; +import org.apache.seatunnel.api.source.SourceEvent; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class ContinuousStreamSplitEnumeratorContext + implements SourceSplitEnumerator.Context { + private final int parallelism; + private final EventListener eventListener; + private final Set readers = new HashSet<>(); + + public ContinuousStreamSplitEnumeratorContext(int parallelism, EventListener eventListener) { + this.parallelism = parallelism; + this.eventListener = eventListener; + } + + @Override + public int currentParallelism() { + return this.parallelism; + } + + @Override + public Set registeredReaders() { + return this.readers; + } + + @Override + public void assignSplit(int subtaskId, List splits) {} + + @Override + public void signalNoMoreSplits(int subtask) {} + + @Override + public void sendEventToSourceReader(int subtaskId, SourceEvent event) { + throw new UnsupportedOperationException( + "Flink ParallelSource don't support sending SourceEvent. " + + "Please implement the `SupportCoordinate` marker interface on the SeaTunnel source."); + } + + @Override + public MetricsContext getMetricsContext() { + // TODO Waiting for Flink and Spark to implement MetricsContext + // https://github.com/apache/seatunnel/issues/3431 + return new AbstractMetricsContext() {}; + } + + @Override + public EventListener getEventListener() { + return eventListener; + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousPartitionReader.java index 4f0fc401607b..84d203fc1bf4 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousPartitionReader.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousPartitionReader.java @@ -25,10 +25,15 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.Handover; import org.apache.seatunnel.translation.spark.serialization.InternalMultiRowCollector; +import org.apache.seatunnel.translation.spark.source.partition.continuous.source.rpc.RpcSourceReaderContext; +import org.apache.seatunnel.translation.spark.source.partition.continuous.source.rpc.RpcSplitEnumeratorContext; +import org.apache.spark.SparkEnv; +import org.apache.spark.rpc.RpcEndpointRef; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReader; import org.apache.spark.sql.connector.read.streaming.PartitionOffset; +import org.apache.spark.util.RpcUtils; import java.io.IOException; import java.io.Serializable; @@ -37,42 +42,39 @@ public class SeaTunnelContinuousPartitionReader< SplitT extends SourceSplit, StateT extends Serializable> implements ContinuousPartitionReader { - private final SeaTunnelSource source; - private final String jobId; - private final Integer subtaskId; - private final Integer parallelism; - private final SourceSplitEnumerator splitEnumerator; - private final SourceSplitEnumerator.Context splitEnumeratorCtx; - protected final List restoredSplitState; - protected final SourceReader reader; + private final SeaTunnelSource source; + private final SeaTunnelInputPartition inputPartition; + private final RpcEndpointRef driverRef; + protected List restoredSplitState; + protected SourceReader reader; + protected SourceSplitEnumerator splitEnumerator; - protected final Serializer splitSerializer; - protected final Serializer enumeratorStateSerializer; + protected Serializer splitSerializer; + protected Serializer enumeratorStateSerializer; private InternalMultiRowCollector collector; Handover handover; public SeaTunnelContinuousPartitionReader( - SeaTunnelSource source, - String jobId, - Integer subtaskId, - Integer parallelism, - SourceSplitEnumerator splitEnumerator, - SourceSplitEnumerator.Context splitEnumeratorCtx, - List restoredSplitState, - SourceReader reader, - Serializer splitSerializer, - Serializer enumeratorStateSerializer, - int subTaskId) { + SeaTunnelSource source, + SeaTunnelInputPartition inputPartition) { this.source = source; - this.jobId = jobId; - this.subtaskId = subtaskId; - this.parallelism = parallelism; - this.splitEnumerator = splitEnumerator; - this.splitEnumeratorCtx = splitEnumeratorCtx; - this.restoredSplitState = restoredSplitState; - this.reader = reader; - this.splitSerializer = splitSerializer; - this.enumeratorStateSerializer = enumeratorStateSerializer; + this.inputPartition = inputPartition; + this.driverRef = + RpcUtils.makeDriverRef( + inputPartition.getEndpointName(), + SparkEnv.get().conf(), + SparkEnv.get().rpcEnv()); + RpcSourceReaderContext readerCtx = new RpcSourceReaderContext(this.driverRef); + RpcSplitEnumeratorContext splitEnumeratorContext = + new RpcSplitEnumeratorContext(this.driverRef); + try { + reader = (SourceReader) this.source.createReader(readerCtx); + splitEnumerator = + (SourceSplitEnumerator) + this.source.createEnumerator(splitEnumeratorContext); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override @@ -86,7 +88,7 @@ public boolean next() throws IOException { if (handover.isEmpty()) { reader.pollNext(collector); if (handover.isEmpty()) { - // splitEnumeratorCtx.assignSplit(); + splitEnumerator.run(); } } return true; diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousPartitionReaderFactory.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousPartitionReaderFactory.java index d8cd730bc34a..c706ef0de455 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousPartitionReaderFactory.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousPartitionReaderFactory.java @@ -17,20 +17,26 @@ package org.apache.seatunnel.translation.spark.source.partition.continuous; -import org.apache.spark.SparkEnv; -import org.apache.spark.rpc.RpcEndpointRef; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReader; import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory; -import org.apache.spark.util.RpcUtils; public class SeaTunnelContinuousPartitionReaderFactory implements ContinuousPartitionReaderFactory { + private final SeaTunnelSource source; + + public SeaTunnelContinuousPartitionReaderFactory(SeaTunnelSource source) { + this.source = source; + } + @Override public ContinuousPartitionReader createReader(InputPartition partition) { - RpcEndpointRef endpointRef = - RpcUtils.makeDriverRef("", SparkEnv.get().conf(), SparkEnv.get().rpcEnv()); - - return null; + SeaTunnelInputPartition inputPartition = (SeaTunnelInputPartition) partition; + return new SeaTunnelContinuousPartitionReader<>( + (SeaTunnelSource) source, inputPartition); } } diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousStream.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousStream.java index 6285ab91011a..b74acbad8f89 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousStream.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelContinuousStream.java @@ -18,26 +18,32 @@ package org.apache.seatunnel.translation.spark.source.partition.continuous; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.translation.spark.execution.MultiTableManager; +import org.apache.seatunnel.translation.spark.source.partition.continuous.source.endpoint.EndpointSplitEnumeratorContext; import org.apache.seatunnel.translation.spark.utils.CaseInsensitiveStringMap; +import org.apache.spark.rpc.RpcEndpointRef; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory; import org.apache.spark.sql.connector.read.streaming.ContinuousStream; import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.connector.read.streaming.PartitionOffset; +import org.apache.spark.sql.execution.streaming.continuous.seatunnel.rpc.SplitEnumeratorEndpoint; public class SeaTunnelContinuousStream implements ContinuousStream { - private final SeaTunnelSource source; + private final SeaTunnelSource source; private final int parallelism; private final String jobId; private final String checkpointLocation; private final CaseInsensitiveStringMap caseInsensitiveStringMap; private final MultiTableManager multiTableManager; + private RpcEndpointRef endpointRef; public SeaTunnelContinuousStream( - SeaTunnelSource source, + SeaTunnelSource source, int parallelism, String jobId, String checkpointLocation, @@ -53,7 +59,23 @@ public SeaTunnelContinuousStream( @Override public InputPartition[] planInputPartitions(Offset start) { - return new InputPartition[0]; + SourceSplitEnumerator.Context enumeratorContext = + new EndpointSplitEnumeratorContext<>(parallelism, jobId); + + try { + SourceSplitEnumerator enumerator = source.createEnumerator(enumeratorContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + + SplitEnumeratorEndpoint enumeratorEndpoint = new SplitEnumeratorEndpoint(); + String endpointName = "SplitEnumeratorEndpoint-" + java.util.UUID.randomUUID(); + endpointRef = enumeratorEndpoint.rpcEnv().setupEndpoint(endpointName, enumeratorEndpoint); + InputPartition[] inputPartitions = new SeaTunnelInputPartition[parallelism]; + for (int idx = 0; idx < inputPartitions.length; idx++) { + inputPartitions[idx] = new SeaTunnelInputPartition(endpointName, idx); + } + return inputPartitions; } @Override diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelInputPartition.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelInputPartition.java index 44f88ec664b4..273a67650433 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelInputPartition.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/SeaTunnelInputPartition.java @@ -19,4 +19,20 @@ import org.apache.spark.sql.connector.read.InputPartition; -public class SeaTunnelInputPartition implements InputPartition {} +public class SeaTunnelInputPartition implements InputPartition { + private final String endpointName; + private final int partitionId; + + public SeaTunnelInputPartition(String endpointName, int partitionId) { + this.endpointName = endpointName; + this.partitionId = partitionId; + } + + public String getEndpointName() { + return endpointName; + } + + public int getPartitionId() { + return partitionId; + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/endpoint/EndpointSource.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/endpoint/EndpointSource.java new file mode 100644 index 000000000000..241c39afb473 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/endpoint/EndpointSource.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.spark.source.partition.continuous.source.endpoint; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.seatunnel.translation.source.BaseSourceFunction; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +public class EndpointSource + implements BaseSourceFunction { + private final SeaTunnelSource source; + protected final SourceSplitEnumerator splitEnumerator; + protected final Integer parallelism; + protected final String jobId; + + public EndpointSource( + SeaTunnelSource source, Integer parallelism, String jobId) { + this.source = source; + this.parallelism = parallelism; + this.jobId = jobId; + SourceSplitEnumerator.Context enumeratorContext = + new EndpointSplitEnumeratorContext<>(parallelism, jobId); + try { + splitEnumerator = source.restoreEnumerator(enumeratorContext, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void open() throws Exception {} + + @Override + public void run(Collector collector) throws Exception {} + + @Override + public Map> snapshotState(long checkpointId) throws Exception { + return null; + } + + @Override + public void close() throws Exception {} + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/spark/sql/execution/streaming/continuous/seatunnel/rpc/SplitEnumeratorEndPoint.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/endpoint/EndpointSplitEnumerator.java similarity index 59% rename from seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/spark/sql/execution/streaming/continuous/seatunnel/rpc/SplitEnumeratorEndPoint.java rename to seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/endpoint/EndpointSplitEnumerator.java index 224d1d3053a5..c0f1a0a6479f 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/spark/sql/execution/streaming/continuous/seatunnel/rpc/SplitEnumeratorEndPoint.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/endpoint/EndpointSplitEnumerator.java @@ -15,24 +15,6 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.streaming.continuous.seatunnel.rpc; +package org.apache.seatunnel.translation.spark.source.partition.continuous.source.endpoint; -import org.apache.spark.rpc.RpcCallContext; -import org.apache.spark.rpc.RpcEnv; -import org.apache.spark.rpc.ThreadSafeRpcEndpoint; - -import scala.PartialFunction; -import scala.runtime.BoxedUnit; - -public class SplitEnumeratorEndPoint implements ThreadSafeRpcEndpoint { - - @Override - public RpcEnv rpcEnv() { - return null; - } - - @Override - public PartialFunction receiveAndReply(RpcCallContext context) { - return null; - } -} +public class EndpointSplitEnumerator {} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/endpoint/EndpointSplitEnumeratorContext.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/endpoint/EndpointSplitEnumeratorContext.java new file mode 100644 index 000000000000..a4a0d76521c6 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/endpoint/EndpointSplitEnumeratorContext.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.spark.source.partition.continuous.source.endpoint; + +import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext; +import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.event.DefaultEventProcessor; +import org.apache.seatunnel.api.event.EventListener; +import org.apache.seatunnel.api.source.SourceEvent; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.LinkedBlockingDeque; + +public class EndpointSplitEnumeratorContext + implements SourceSplitEnumerator.Context { + protected final int parallelism; + protected final EventListener eventListener; + protected volatile boolean running = false; + private final LinkedBlockingDeque[] queues; + + public EndpointSplitEnumeratorContext(int parallelism, String jobId) { + this.parallelism = parallelism; + this.queues = new LinkedBlockingDeque[this.parallelism]; + this.eventListener = new DefaultEventProcessor(jobId); + } + + @Override + public int currentParallelism() { + return parallelism; + } + + @Override + public Set registeredReaders() { + Set readers = new HashSet<>(); + if (running) { + for (int i = 0; i < queues.length; i++) { + if (queues[i] != null) { + readers.add(i); + } + } + } + return readers; + } + + @Override + public void assignSplit(int subtaskId, List splits) { + if (subtaskId < 0 || subtaskId >= parallelism) { + return; + } + + LinkedBlockingDeque queue = queues[subtaskId]; + if (queue == null) { + queue = new LinkedBlockingDeque<>(); + } + + queue.addAll(splits); + } + + @Override + public void signalNoMoreSplits(int subtask) { + queues[subtask] = null; + } + + @Override + public void sendEventToSourceReader(int subtaskId, SourceEvent event) { + throw new UnsupportedOperationException("Unsupported"); + } + + @Override + public MetricsContext getMetricsContext() { + return new AbstractMetricsContext() {}; + } + + @Override + public EventListener getEventListener() { + return eventListener; + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/rpc/RpcSourceReaderContext.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/rpc/RpcSourceReaderContext.java new file mode 100644 index 000000000000..fd3b0fdb0428 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/rpc/RpcSourceReaderContext.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.spark.source.partition.continuous.source.rpc; + +import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.event.EventListener; +import org.apache.seatunnel.api.source.Boundedness; +import org.apache.seatunnel.api.source.SourceEvent; +import org.apache.seatunnel.api.source.SourceReader; + +import org.apache.spark.rpc.RpcEndpointRef; + +public class RpcSourceReaderContext implements SourceReader.Context { + private final RpcEndpointRef driverRef; + + public RpcSourceReaderContext(RpcEndpointRef driverRef) { + this.driverRef = driverRef; + } + + @Override + public int getIndexOfSubtask() { + return 0; + } + + @Override + public Boundedness getBoundedness() { + return null; + } + + @Override + public void signalNoMoreElement() {} + + @Override + public void sendSplitRequest() {} + + @Override + public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {} + + @Override + public MetricsContext getMetricsContext() { + return null; + } + + @Override + public EventListener getEventListener() { + return null; + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/rpc/RpcSplitEnumerator.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/rpc/RpcSplitEnumerator.java new file mode 100644 index 000000000000..cfb9b63c1ac9 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/rpc/RpcSplitEnumerator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.spark.source.partition.continuous.source.rpc; + +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; + +import org.apache.spark.rpc.RpcEndpointRef; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +public class RpcSplitEnumerator + implements SourceSplitEnumerator { + private final RpcEndpointRef driverRef; + + public RpcSplitEnumerator(RpcEndpointRef driverRef) { + this.driverRef = driverRef; + } + + @Override + public void open() {} + + @Override + public void run() throws Exception {} + + @Override + public void close() throws IOException {} + + @Override + public void addSplitsBack(List splits, int subtaskId) {} + + @Override + public int currentUnassignedSplitSize() { + return 0; + } + + @Override + public void handleSplitRequest(int subtaskId) {} + + @Override + public void registerReader(int subtaskId) {} + + @Override + public StateT snapshotState(long checkpointId) throws Exception { + return null; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception {} +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/rpc/RpcSplitEnumeratorContext.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/rpc/RpcSplitEnumeratorContext.java new file mode 100644 index 000000000000..388fc6f111f9 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/continuous/source/rpc/RpcSplitEnumeratorContext.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.translation.spark.source.partition.continuous.source.rpc; + +import org.apache.seatunnel.api.common.metrics.MetricsContext; +import org.apache.seatunnel.api.event.EventListener; +import org.apache.seatunnel.api.source.SourceEvent; +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; + +import org.apache.spark.rpc.RpcEndpointRef; + +import java.util.List; +import java.util.Set; + +public class RpcSplitEnumeratorContext + implements SourceSplitEnumerator.Context { + private final RpcEndpointRef driverRef; + + public RpcSplitEnumeratorContext(RpcEndpointRef driverRef) { + this.driverRef = driverRef; + } + + @Override + public int currentParallelism() { + return 0; + } + + @Override + public Set registeredReaders() { + return null; + } + + @Override + public void assignSplit(int subtaskId, List splits) {} + + @Override + public void signalNoMoreSplits(int subtask) {} + + @Override + public void sendEventToSourceReader(int subtaskId, SourceEvent event) {} + + @Override + public MetricsContext getMetricsContext() { + return null; + } + + @Override + public EventListener getEventListener() { + return null; + } +} diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/scan/SeaTunnelScan.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/scan/SeaTunnelScan.java index 4107853ff351..c4cf4803dded 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/scan/SeaTunnelScan.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/scan/SeaTunnelScan.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.translation.spark.source.scan; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.api.source.SourceSplit; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.translation.spark.execution.MultiTableManager; import org.apache.seatunnel.translation.spark.source.partition.batch.SeaTunnelBatch; @@ -82,7 +83,7 @@ public MicroBatchStream toMicroBatchStream(String checkpointLocation) { @Override public ContinuousStream toContinuousStream(String checkpointLocation) { return new SeaTunnelContinuousStream( - source, + (SeaTunnelSource) source, parallelism, jobId, checkpointLocation, diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/spark/sql/execution/streaming/continuous/seatunnel/rpc/SplitEnumeratorEndpoint.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/spark/sql/execution/streaming/continuous/seatunnel/rpc/SplitEnumeratorEndpoint.java new file mode 100644 index 000000000000..e4932f6ad444 --- /dev/null +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/spark/sql/execution/streaming/continuous/seatunnel/rpc/SplitEnumeratorEndpoint.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.seatunnel.rpc; + +import org.apache.seatunnel.api.source.SourceSplit; +import org.apache.seatunnel.api.source.SourceSplitEnumerator; + +import org.apache.spark.SparkEnv; +import org.apache.spark.rpc.RpcCallContext; +import org.apache.spark.rpc.RpcEndpointRef; +import org.apache.spark.rpc.RpcEnv; +import org.apache.spark.rpc.ThreadSafeRpcEndpoint; + +import scala.PartialFunction; +import scala.reflect.ClassTag; +import scala.runtime.BoxedUnit; + +import java.io.Serializable; + +public class SplitEnumeratorEndpoint implements ThreadSafeRpcEndpoint { + + @Override + public RpcEnv rpcEnv() { + return SparkEnv.get().rpcEnv(); + } + + @Override + public PartialFunction receiveAndReply(RpcCallContext context) { + return new PartialFunction() { + @Override + public boolean isDefinedAt(Object x) { + return x instanceof PollNext; + } + + @Override + public BoxedUnit apply(Object v1) { + PollNext pollNextReq = (PollNext) v1; + context.reply(new PollResponse(pollNextReq.subTaskId, null)); + return BoxedUnit.UNIT; + } + }; + } + + public static class PollNext implements Serializable { + private final int subTaskId; + + public PollNext(int subTaskId) { + this.subTaskId = subTaskId; + } + + public int subTaskId() { + return this.subTaskId; + } + } + + public static class PollResponse implements Serializable { + private final int subTaskId; + private final SourceSplit split; + + public PollResponse(int subTaskId, SourceSplit split) { + this.subTaskId = subTaskId; + this.split = split; + } + + public int getSubTaskId() { + return subTaskId; + } + + public SourceSplit getSplit() { + return split; + } + } + + public static interface ISplitEnumerator { + public SourceSplit pollNext(int subTaskId); + } + + public static class SplitEnumeratorStub implements ISplitEnumerator { + private final RpcEndpointRef endpointRef; + + public SplitEnumeratorStub(RpcEndpointRef endpointRef) { + this.endpointRef = endpointRef; + } + + @Override + public SourceSplit pollNext(int subTaskId) { + PollNext rep = new PollNext(subTaskId); + PollResponse resp = + endpointRef.askSync(rep, ClassTag.apply(PollResponse.class)); + return resp.getSplit(); + } + } + + public static class SplitEnumeratorSrv + implements ISplitEnumerator { + private SourceSplitEnumerator splitEnumerator; + + @Override + public SourceSplit pollNext(int subTaskId) { + try { + splitEnumerator.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + } + } +}