Skip to content

Commit

Permalink
[WIP][feature][spark] Support streaming
Browse files Browse the repository at this point in the history
- Support Spark Continuous Processing
  • Loading branch information
CheneyYin committed Sep 20, 2024
1 parent 2e82482 commit b522c35
Show file tree
Hide file tree
Showing 13 changed files with 652 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.translation.spark.source.partition.continuous;

import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.event.EventListener;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class ContinuousStreamSplitEnumeratorContext<SplitT extends SourceSplit>
implements SourceSplitEnumerator.Context<SplitT> {
private final int parallelism;
private final EventListener eventListener;
private final Set<Integer> readers = new HashSet<>();

public ContinuousStreamSplitEnumeratorContext(int parallelism, EventListener eventListener) {
this.parallelism = parallelism;
this.eventListener = eventListener;
}

@Override
public int currentParallelism() {
return this.parallelism;
}

@Override
public Set<Integer> registeredReaders() {
return this.readers;
}

@Override
public void assignSplit(int subtaskId, List<SplitT> splits) {}

@Override
public void signalNoMoreSplits(int subtask) {}

@Override
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
throw new UnsupportedOperationException(
"Flink ParallelSource don't support sending SourceEvent. "
+ "Please implement the `SupportCoordinate` marker interface on the SeaTunnel source.");
}

@Override
public MetricsContext getMetricsContext() {
// TODO Waiting for Flink and Spark to implement MetricsContext
// https://github.com/apache/seatunnel/issues/3431
return new AbstractMetricsContext() {};
}

@Override
public EventListener getEventListener() {
return eventListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.translation.spark.serialization.InternalMultiRowCollector;
import org.apache.seatunnel.translation.spark.source.partition.continuous.source.rpc.RpcSourceReaderContext;
import org.apache.seatunnel.translation.spark.source.partition.continuous.source.rpc.RpcSplitEnumeratorContext;

import org.apache.spark.SparkEnv;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReader;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.util.RpcUtils;

import java.io.IOException;
import java.io.Serializable;
Expand All @@ -37,42 +42,39 @@
public class SeaTunnelContinuousPartitionReader<
SplitT extends SourceSplit, StateT extends Serializable>
implements ContinuousPartitionReader<InternalRow> {
private final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
private final String jobId;
private final Integer subtaskId;
private final Integer parallelism;
private final SourceSplitEnumerator splitEnumerator;
private final SourceSplitEnumerator.Context splitEnumeratorCtx;
protected final List<SplitT> restoredSplitState;
protected final SourceReader<SeaTunnelRow, SplitT> reader;
private final SeaTunnelSource<SeaTunnelRow, SourceSplit, ?> source;
private final SeaTunnelInputPartition inputPartition;
private final RpcEndpointRef driverRef;
protected List<SplitT> restoredSplitState;
protected SourceReader<SeaTunnelRow, SourceSplit> reader;
protected SourceSplitEnumerator<SourceSplit, Serializable> splitEnumerator;

protected final Serializer<SplitT> splitSerializer;
protected final Serializer<StateT> enumeratorStateSerializer;
protected Serializer<SplitT> splitSerializer;
protected Serializer<StateT> enumeratorStateSerializer;
private InternalMultiRowCollector collector;
Handover<InternalRow> handover;

public SeaTunnelContinuousPartitionReader(
SeaTunnelSource<SeaTunnelRow, ?, ?> source,
String jobId,
Integer subtaskId,
Integer parallelism,
SourceSplitEnumerator splitEnumerator,
SourceSplitEnumerator.Context splitEnumeratorCtx,
List<SplitT> restoredSplitState,
SourceReader<SeaTunnelRow, SplitT> reader,
Serializer<SplitT> splitSerializer,
Serializer<StateT> enumeratorStateSerializer,
int subTaskId) {
SeaTunnelSource<SeaTunnelRow, SourceSplit, ?> source,
SeaTunnelInputPartition inputPartition) {
this.source = source;
this.jobId = jobId;
this.subtaskId = subtaskId;
this.parallelism = parallelism;
this.splitEnumerator = splitEnumerator;
this.splitEnumeratorCtx = splitEnumeratorCtx;
this.restoredSplitState = restoredSplitState;
this.reader = reader;
this.splitSerializer = splitSerializer;
this.enumeratorStateSerializer = enumeratorStateSerializer;
this.inputPartition = inputPartition;
this.driverRef =
RpcUtils.makeDriverRef(
inputPartition.getEndpointName(),
SparkEnv.get().conf(),
SparkEnv.get().rpcEnv());
RpcSourceReaderContext readerCtx = new RpcSourceReaderContext(this.driverRef);
RpcSplitEnumeratorContext<SourceSplit> splitEnumeratorContext =
new RpcSplitEnumeratorContext<SourceSplit>(this.driverRef);
try {
reader = (SourceReader<SeaTunnelRow, SourceSplit>) this.source.createReader(readerCtx);
splitEnumerator =
(SourceSplitEnumerator<SourceSplit, Serializable>)
this.source.createEnumerator(splitEnumeratorContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand All @@ -86,7 +88,7 @@ public boolean next() throws IOException {
if (handover.isEmpty()) {
reader.pollNext(collector);
if (handover.isEmpty()) {
// splitEnumeratorCtx.assignSplit();
splitEnumerator.run();
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,26 @@

package org.apache.seatunnel.translation.spark.source.partition.continuous;

import org.apache.spark.SparkEnv;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReader;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory;
import org.apache.spark.util.RpcUtils;

public class SeaTunnelContinuousPartitionReaderFactory implements ContinuousPartitionReaderFactory {
private final SeaTunnelSource<SeaTunnelRow, ?, ?> source;

public SeaTunnelContinuousPartitionReaderFactory(SeaTunnelSource<SeaTunnelRow, ?, ?> source) {
this.source = source;
}

@Override
public ContinuousPartitionReader<InternalRow> createReader(InputPartition partition) {
RpcEndpointRef endpointRef =
RpcUtils.makeDriverRef("", SparkEnv.get().conf(), SparkEnv.get().rpcEnv());

return null;
SeaTunnelInputPartition inputPartition = (SeaTunnelInputPartition) partition;
return new SeaTunnelContinuousPartitionReader<>(
(SeaTunnelSource<SeaTunnelRow, SourceSplit, ?>) source, inputPartition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,32 @@
package org.apache.seatunnel.translation.spark.source.partition.continuous;

import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
import org.apache.seatunnel.translation.spark.source.partition.continuous.source.endpoint.EndpointSplitEnumeratorContext;
import org.apache.seatunnel.translation.spark.utils.CaseInsensitiveStringMap;

import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.sql.execution.streaming.continuous.seatunnel.rpc.SplitEnumeratorEndpoint;

public class SeaTunnelContinuousStream implements ContinuousStream {
private final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
private final SeaTunnelSource<SeaTunnelRow, SourceSplit, ?> source;
private final int parallelism;
private final String jobId;
private final String checkpointLocation;
private final CaseInsensitiveStringMap caseInsensitiveStringMap;
private final MultiTableManager multiTableManager;
private RpcEndpointRef endpointRef;

public SeaTunnelContinuousStream(
SeaTunnelSource<SeaTunnelRow, ?, ?> source,
SeaTunnelSource<SeaTunnelRow, SourceSplit, ?> source,
int parallelism,
String jobId,
String checkpointLocation,
Expand All @@ -53,7 +59,23 @@ public SeaTunnelContinuousStream(

@Override
public InputPartition[] planInputPartitions(Offset start) {
return new InputPartition[0];
SourceSplitEnumerator.Context<SourceSplit> enumeratorContext =
new EndpointSplitEnumeratorContext<>(parallelism, jobId);

try {
SourceSplitEnumerator enumerator = source.createEnumerator(enumeratorContext);
} catch (Exception e) {
throw new RuntimeException(e);
}

SplitEnumeratorEndpoint enumeratorEndpoint = new SplitEnumeratorEndpoint();
String endpointName = "SplitEnumeratorEndpoint-" + java.util.UUID.randomUUID();
endpointRef = enumeratorEndpoint.rpcEnv().setupEndpoint(endpointName, enumeratorEndpoint);
InputPartition[] inputPartitions = new SeaTunnelInputPartition[parallelism];
for (int idx = 0; idx < inputPartitions.length; idx++) {
inputPartitions[idx] = new SeaTunnelInputPartition(endpointName, idx);
}
return inputPartitions;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,20 @@

import org.apache.spark.sql.connector.read.InputPartition;

public class SeaTunnelInputPartition implements InputPartition {}
public class SeaTunnelInputPartition implements InputPartition {
private final String endpointName;
private final int partitionId;

public SeaTunnelInputPartition(String endpointName, int partitionId) {
this.endpointName = endpointName;
this.partitionId = partitionId;
}

public String getEndpointName() {
return endpointName;
}

public int getPartitionId() {
return partitionId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.translation.spark.source.partition.continuous.source.endpoint;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.translation.source.BaseSourceFunction;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

public class EndpointSource<T, SplitT extends SourceSplit, StateT extends Serializable>
implements BaseSourceFunction<T> {
private final SeaTunnelSource<T, SplitT, StateT> source;
protected final SourceSplitEnumerator<SplitT, StateT> splitEnumerator;
protected final Integer parallelism;
protected final String jobId;

public EndpointSource(
SeaTunnelSource<T, SplitT, StateT> source, Integer parallelism, String jobId) {
this.source = source;
this.parallelism = parallelism;
this.jobId = jobId;
SourceSplitEnumerator.Context<SplitT> enumeratorContext =
new EndpointSplitEnumeratorContext<>(parallelism, jobId);
try {
splitEnumerator = source.restoreEnumerator(enumeratorContext, null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void open() throws Exception {}

@Override
public void run(Collector<T> collector) throws Exception {}

@Override
public Map<Integer, List<byte[]>> snapshotState(long checkpointId) throws Exception {
return null;
}

@Override
public void close() throws Exception {}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,6 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.continuous.seatunnel.rpc;
package org.apache.seatunnel.translation.spark.source.partition.continuous.source.endpoint;

import org.apache.spark.rpc.RpcCallContext;
import org.apache.spark.rpc.RpcEnv;
import org.apache.spark.rpc.ThreadSafeRpcEndpoint;

import scala.PartialFunction;
import scala.runtime.BoxedUnit;

public class SplitEnumeratorEndPoint implements ThreadSafeRpcEndpoint {

@Override
public RpcEnv rpcEnv() {
return null;
}

@Override
public PartialFunction<Object, BoxedUnit> receiveAndReply(RpcCallContext context) {
return null;
}
}
public class EndpointSplitEnumerator {}
Loading

0 comments on commit b522c35

Please sign in to comment.