Skip to content

Commit

Permalink
[WIP][feature][spark] Support streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
CheneyYin committed Aug 24, 2024
1 parent 62b4f16 commit 2969453
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

import java.net.URL;
import java.util.ArrayList;
Expand Down Expand Up @@ -132,9 +134,22 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
sparkRuntimeEnvironment.getSparkSession().sparkContext().applicationId();
CatalogTable[] catalogTables =
datasetTableInfo.getCatalogTables().toArray(new CatalogTable[0]);
SparkSinkInjector.inject(dataset.write(), sink, catalogTables, applicationId)
.option("checkpointLocation", "/tmp")
.save();
if (isStreaming()) {
StreamingQuery streamingQuery =
SparkSinkInjector.inject(
dataset.writeStream(), sink, catalogTables, applicationId)
.option("checkpointLocation", "/tmp/test-spark-seatunnel")
.start();
try {
streamingQuery.awaitTermination();
} catch (StreamingQueryException e) {
throw new RuntimeException(e);
}
} else {
SparkSinkInjector.inject(dataset.write(), sink, catalogTables, applicationId)
.option("checkpointLocation", "/tmp/test-spark-seatunnel")
.save();
}
}
// the sink is the last stream
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,16 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;

import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
Expand Down Expand Up @@ -132,10 +137,30 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
sparkRuntimeEnvironment.getStreamingContext().sparkContext().applicationId();
CatalogTable[] catalogTables =
datasetTableInfo.getCatalogTables().toArray(new CatalogTable[0]);
SparkSinkInjector.inject(dataset.write(), sink, catalogTables, applicationId)
.option("checkpointLocation", "/tmp")
.mode(SaveMode.Append)
.save();
if (isStreaming()) {
try {
StreamingQuery streamingQuery =
SparkSinkInjector.inject(
dataset.writeStream(),
sink,
catalogTables,
applicationId)
.option("checkpointLocation", "/tmp/test-spark-seatunnel")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime(1000))
.start();
streamingQuery.awaitTermination();
} catch (TimeoutException e) {
throw new RuntimeException(e);
} catch (StreamingQueryException e) {
throw new RuntimeException(e);
}
} else {
SparkSinkInjector.inject(dataset.write(), sink, catalogTables, applicationId)
.option("checkpointLocation", "/tmp/test-spark-seatunnel")
.mode(SaveMode.Append)
.save();
}
}
// the sink is the last stream
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import com.google.common.collect.Lists;

Expand Down Expand Up @@ -86,17 +87,31 @@ public List<DatasetTableInfo> execute(List<DatasetTableInfo> upstreamDataStreams
CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
}
Dataset<Row> dataset =
sparkRuntimeEnvironment
.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
.option(CommonOptions.PARALLELISM.key(), parallelism)
.option(
Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.options(envOption)
.load();
SparkSession sparkSession = sparkRuntimeEnvironment.getSparkSession();
Dataset<Row> dataset = null;
if (isStreaming()) {
dataset =
sparkSession
.readStream()
.format(SeaTunnelSource.class.getSimpleName())
.option(CommonOptions.PARALLELISM.key(), parallelism)
.option(
Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.options(envOption)
.load();
} else {
dataset =
sparkSession
.read()
.format(SeaTunnelSource.class.getSimpleName())
.option(CommonOptions.PARALLELISM.key(), parallelism)
.option(
Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.options(envOption)
.load();
}
sources.add(
new DatasetTableInfo(
dataset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ protected SparkAbstractPluginExecuteProcessor(
this.plugins = initializePlugins(pluginConfigs);
}

public boolean isStreaming() {
return sparkRuntimeEnvironment
.getConfig()
.getString("job.mode")
.equalsIgnoreCase("STREAMING");
}

@Override
public void setRuntimeEnvironment(SparkRuntimeEnvironment sparkRuntimeEnvironment) {
this.sparkRuntimeEnvironment = sparkRuntimeEnvironment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ public Offset deserializeOffset(String microBatchState) {
@Override
public void commit(Offset end) {
// nothing
System.out.println(end);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,20 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {

@Override
public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long epochId) {
return createWriter(partitionId, taskId);
SinkWriter.Context context = new DefaultSinkWriterContext(jobId, (int) taskId);
SinkWriter<SeaTunnelRow, CommitInfoT, StateT> writer;
SinkCommitter<CommitInfoT> committer;
try {
writer = sink.createWriter(context);
} catch (IOException e) {
throw new RuntimeException("Failed to create SinkWriter.", e);
}
try {
committer = sink.createCommitter().orElse(null);
} catch (IOException e) {
throw new RuntimeException("Failed to create SinkCommitter.", e);
}
return new SeaTunnelSparkDataWriter<>(
writer, committer, new MultiTableManager(catalogTables), epochId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap caseInsensitiveString
Integer.parseInt(properties.getOrDefault(CommonOptions.PARALLELISM.key(), "1"));
String applicationId = SparkSession.getActiveSession().get().sparkContext().applicationId();
return new SeaTunnelScanBuilder(
source, parallelism, applicationId, caseInsensitiveStringMap, multiTableManager);
source,
parallelism,
applicationId,
new org.apache.seatunnel.translation.spark.utils.CaseInsensitiveStringMap(
caseInsensitiveStringMap.asCaseSensitiveMap()),
multiTableManager);
}

/** A name to identify this table */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ protected FileSystem getFileSystem()
throws URISyntaxException, IOException, InterruptedException {
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS", hdfsRoot);
configuration.set("dfs.support.append", "true");
if (StringUtils.isNotBlank(hdfsUser)) {
return FileSystem.get(new URI(hdfsRoot), configuration, hdfsUser);
} else {
Expand All @@ -114,9 +115,11 @@ protected ReaderState snapshotState() {
}

public void prepareCheckpoint() {
executor =
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(
1, String.format("parallel-reader-checkpoint-executor-%s", subtaskId));
if (executor == null) {
executor =
ThreadPoolExecutorFactory.createScheduledThreadPoolExecutor(
1, String.format("parallel-reader-checkpoint-executor-%s", subtaskId));
}
executor.schedule(this::virtualCheckpoint, checkpointInterval, TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -175,10 +178,15 @@ protected void saveState(ReaderState readerState, int checkpointId) throws IOExc
byte[] bytes = SerializationUtils.serialize(readerState);
Path hdfsPath = getCheckpointPathWithId(checkpointId);
if (!fileSystem.exists(hdfsPath)) {
fileSystem.createNewFile(hdfsPath);
try (FSDataOutputStream outputStream = fileSystem.createFile(hdfsPath).build()) {
outputStream.write(bytes);
} catch (Exception e) {
throw new RuntimeException(e);
}
return;
}

try (FSDataOutputStream outputStream = fileSystem.append(hdfsPath)) {
try (FSDataOutputStream outputStream = fileSystem.appendFile(hdfsPath).build()) {
outputStream.write(bytes);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
import org.apache.seatunnel.translation.spark.utils.CaseInsensitiveStringMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -32,7 +33,6 @@
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

import lombok.Getter;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
import org.apache.seatunnel.translation.spark.source.partition.batch.ParallelBatchPartitionReader;
import org.apache.seatunnel.translation.spark.utils.CaseInsensitiveStringMap;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
import org.apache.seatunnel.translation.spark.source.partition.batch.SeaTunnelBatch;
import org.apache.seatunnel.translation.spark.source.partition.micro.SeaTunnelMicroBatch;
import org.apache.seatunnel.translation.spark.utils.CaseInsensitiveStringMap;

import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.spark.execution.MultiTableManager;
import org.apache.seatunnel.translation.spark.utils.CaseInsensitiveStringMap;

import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/** The builder for {@link SeaTunnelScan} used to build {@link SeaTunnelScan} */
public class SeaTunnelScanBuilder implements ScanBuilder {
Expand Down
Loading

0 comments on commit 2969453

Please sign in to comment.