Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][feature][spark] Support streaming #7476

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from

Conversation

CheneyYin
Copy link
Contributor

@CheneyYin CheneyYin commented Aug 23, 2024

Purpose of this pull request

Support streaming for spark engine.
Related:

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@CheneyYin CheneyYin marked this pull request as draft August 23, 2024 12:10
@github-actions github-actions bot added core SeaTunnel core module Spark labels Aug 23, 2024
@hailin0
Copy link
Member

hailin0 commented Aug 23, 2024

cc @Carl-Zhou-CN

@CheneyYin
Copy link
Contributor Author

public class SeaTunnelMicroBatchPartitionReader implements PartitionReader<InternalRow> {
private final ParallelBatchPartitionReader partitionReader;
public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partitionReader) {
this.partitionReader = partitionReader;
}
@Override
public boolean next() throws IOException {
return partitionReader.next();
}
@Override
public InternalRow get() {
return partitionReader.get();
}
@Override
public void close() throws IOException {
partitionReader.close();
}
}

public boolean next() throws IOException {
prepare();
while (running && handover.isEmpty()) {
try {
Thread.sleep(INTERVAL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return running || !handover.isEmpty();
}

PartitionReader never close in streaming mode.

@CheneyYin CheneyYin force-pushed the support-spark-streaming branch 8 times, most recently from 1b1d744 to 6392a37 Compare August 27, 2024 11:30
@github-actions github-actions bot added the api label Aug 27, 2024
@Carl-Zhou-CN
Copy link
Member

public class SeaTunnelMicroBatchPartitionReader implements PartitionReader<InternalRow> {
private final ParallelBatchPartitionReader partitionReader;
public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partitionReader) {
this.partitionReader = partitionReader;
}
@Override
public boolean next() throws IOException {
return partitionReader.next();
}
@Override
public InternalRow get() {
return partitionReader.get();
}
@Override
public void close() throws IOException {
partitionReader.close();
}
}

public boolean next() throws IOException {
prepare();
while (running && handover.isEmpty()) {
try {
Thread.sleep(INTERVAL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return running || !handover.isEmpty();
}

PartitionReader never close in streaming mode.

hi @CheneyYin It seems that after a checkpoint, it will be close

@CheneyYin
Copy link
Contributor Author

public class SeaTunnelMicroBatchPartitionReader implements PartitionReader<InternalRow> {
private final ParallelBatchPartitionReader partitionReader;
public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partitionReader) {
this.partitionReader = partitionReader;
}
@Override
public boolean next() throws IOException {
return partitionReader.next();
}
@Override
public InternalRow get() {
return partitionReader.get();
}
@Override
public void close() throws IOException {
partitionReader.close();
}
}

public boolean next() throws IOException {
prepare();
while (running && handover.isEmpty()) {
try {
Thread.sleep(INTERVAL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return running || !handover.isEmpty();
}

PartitionReader never close in streaming mode.

hi @CheneyYin It seems that after a checkpoint, it will be close

Yes. If the reader does not receive new data for a long time, Spark will end the current micro batch. Spark's micro batch mechanism does not fully meet the requirements of long term streaming computing. First, creating a new reader for the next batch will incur some overhead. Second, the granularity of fault recovery is too large, and the Spark micro batch mechanism cannot restore the reader from the latest snapshot of the Seatunnel reader.
I am looking for strategies to alleviate these problems while ensuring fault recovery. Currently, I add metadata to the seatunnel row and use a special identifier to represent the checkpoint event. After the source completes a checkpoint, it will create a checkpoint record and send it to the downstream. After receiving the checkpoint record, the sink saves the snapshot and confirms the prepared checkpoint made by the source. These checkpoint operations are performed based on the file system directory space.

@CheneyYin
Copy link
Contributor Author

The checkpoint space like this:

./
├── commits
│   ├........
│   ├── 10
│   ├── 11
│   ├── 12
│   ├── 13
│   ├── 14
│   ├── 15
|    ......
├── metadata
├── offsets
│   ├ ........
│   ├── 10
│   ├── 11
│   ├── 12
│   ├── 13
│   ├── 14
│   ├── 15
│   ├── 16
│     .......
└── sources
    └── 0
        ├ ........
        ├── 11
        │   ├── 0
        │   │   └── 0.committed
        │   └── 1
        │       └── 0.committed
        ├── 12
        │   ├── 0
        │   │   └── 0.committed
        │   └── 1
        │       └── 0.committed
        ├── 13
        │   ├── 0
        │   │   └── 0.committed
        │   └── 1
        │       └── 0.committed
        ├── 14
        │   ├── 0
        │   │   └── 0.committed
        │   └── 1
        │       └── 0.committed
        ├── 15
        │   ├── 0
        │   │   └── 0.committed
        │   └── 1
        │       └── 0.committed
  ........

@CheneyYin
Copy link
Contributor Author

public class SeaTunnelMicroBatchPartitionReader implements PartitionReader<InternalRow> {
private final ParallelBatchPartitionReader partitionReader;
public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partitionReader) {
this.partitionReader = partitionReader;
}
@Override
public boolean next() throws IOException {
return partitionReader.next();
}
@Override
public InternalRow get() {
return partitionReader.get();
}
@Override
public void close() throws IOException {
partitionReader.close();
}
}

public boolean next() throws IOException {
prepare();
while (running && handover.isEmpty()) {
try {
Thread.sleep(INTERVAL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return running || !handover.isEmpty();
}

PartitionReader never close in streaming mode.

hi @CheneyYin It seems that after a checkpoint, it will be close

The next() never return false useless call close(). However Spark MicroStreamExecution will call close() after next() return false. So reader never stop and never commit batch.

@Carl-Zhou-CN
Copy link
Member

public class SeaTunnelMicroBatchPartitionReader implements PartitionReader<InternalRow> {
private final ParallelBatchPartitionReader partitionReader;
public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partitionReader) {
this.partitionReader = partitionReader;
}
@Override
public boolean next() throws IOException {
return partitionReader.next();
}
@Override
public InternalRow get() {
return partitionReader.get();
}
@Override
public void close() throws IOException {
partitionReader.close();
}
}

public boolean next() throws IOException {
prepare();
while (running && handover.isEmpty()) {
try {
Thread.sleep(INTERVAL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return running || !handover.isEmpty();
}

PartitionReader never close in streaming mode.

hi @CheneyYin It seems that after a checkpoint, it will be close

Yes. If the reader does not receive new data for a long time, Spark will end the current micro batch. Spark's micro batch mechanism does not fully meet the requirements of long term streaming computing. First, creating a new reader for the next batch will incur some overhead. Second, the granularity of fault recovery is too large, and the Spark micro batch mechanism cannot restore the reader from the latest snapshot of the Seatunnel reader. I am looking for strategies to alleviate these problems while ensuring fault recovery. Currently, I add metadata to the seatunnel row and use a special identifier to represent the checkpoint event. After the source completes a checkpoint, it will create a checkpoint record and send it to the downstream. After receiving the checkpoint record, the sink saves the snapshot and confirms the prepared checkpoint made by the source. These checkpoint operations are performed based on the file system directory space.

Yes, the micro-batch process cannot meet the requirements of streaming.

@CheneyYin CheneyYin force-pushed the support-spark-streaming branch 5 times, most recently from 2f11b11 to d3d78b3 Compare August 29, 2024 12:47
@Carl-Zhou-CN
Copy link
Member

public class SeaTunnelMicroBatchPartitionReader implements PartitionReader<InternalRow> {
private final ParallelBatchPartitionReader partitionReader;
public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partitionReader) {
this.partitionReader = partitionReader;
}
@Override
public boolean next() throws IOException {
return partitionReader.next();
}
@Override
public InternalRow get() {
return partitionReader.get();
}
@Override
public void close() throws IOException {
partitionReader.close();
}
}

public boolean next() throws IOException {
prepare();
while (running && handover.isEmpty()) {
try {
Thread.sleep(INTERVAL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return running || !handover.isEmpty();
}

PartitionReader never close in streaming mode.

hi @CheneyYin It seems that after a checkpoint, it will be close

The next() never return false useless call close(). However Spark MicroStreamExecution will call close() after next() return false. So reader never stop and never commit batch.

yes, you're right

@Carl-Zhou-CN
Copy link
Member

public class SeaTunnelMicroBatchPartitionReader implements PartitionReader<InternalRow> {
private final ParallelBatchPartitionReader partitionReader;
public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partitionReader) {
this.partitionReader = partitionReader;
}
@Override
public boolean next() throws IOException {
return partitionReader.next();
}
@Override
public InternalRow get() {
return partitionReader.get();
}
@Override
public void close() throws IOException {
partitionReader.close();
}
}

public boolean next() throws IOException {
prepare();
while (running && handover.isEmpty()) {
try {
Thread.sleep(INTERVAL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return running || !handover.isEmpty();
}

PartitionReader never close in streaming mode.

hi @CheneyYin It seems that after a checkpoint, it will be close

Yes. If the reader does not receive new data for a long time, Spark will end the current micro batch. Spark's micro batch mechanism does not fully meet the requirements of long term streaming computing. First, creating a new reader for the next batch will incur some overhead. Second, the granularity of fault recovery is too large, and the Spark micro batch mechanism cannot restore the reader from the latest snapshot of the Seatunnel reader. I am looking for strategies to alleviate these problems while ensuring fault recovery. Currently, I add metadata to the seatunnel row and use a special identifier to represent the checkpoint event. After the source completes a checkpoint, it will create a checkpoint record and send it to the downstream. After receiving the checkpoint record, the sink saves the snapshot and confirms the prepared checkpoint made by the source. These checkpoint operations are performed based on the file system directory space.

I think this pattern would be more like Spark's continuous streaming mode, but it seems to completely lack fault tolerance

@CheneyYin
Copy link
Contributor Author

CheneyYin commented Aug 30, 2024

public class SeaTunnelMicroBatchPartitionReader implements PartitionReader<InternalRow> {
private final ParallelBatchPartitionReader partitionReader;
public SeaTunnelMicroBatchPartitionReader(ParallelBatchPartitionReader partitionReader) {
this.partitionReader = partitionReader;
}
@Override
public boolean next() throws IOException {
return partitionReader.next();
}
@Override
public InternalRow get() {
return partitionReader.get();
}
@Override
public void close() throws IOException {
partitionReader.close();
}
}

public boolean next() throws IOException {
prepare();
while (running && handover.isEmpty()) {
try {
Thread.sleep(INTERVAL);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return running || !handover.isEmpty();
}

PartitionReader never close in streaming mode.

hi @CheneyYin It seems that after a checkpoint, it will be close

Yes. If the reader does not receive new data for a long time, Spark will end the current micro batch. Spark's micro batch mechanism does not fully meet the requirements of long term streaming computing. First, creating a new reader for the next batch will incur some overhead. Second, the granularity of fault recovery is too large, and the Spark micro batch mechanism cannot restore the reader from the latest snapshot of the Seatunnel reader. I am looking for strategies to alleviate these problems while ensuring fault recovery. Currently, I add metadata to the seatunnel row and use a special identifier to represent the checkpoint event. After the source completes a checkpoint, it will create a checkpoint record and send it to the downstream. After receiving the checkpoint record, the sink saves the snapshot and confirms the prepared checkpoint made by the source. These checkpoint operations are performed based on the file system directory space.

I think this pattern would be more like Spark's continuous streaming mode, but it seems to completely lack fault tolerance

It can ensure end-to-end at-least-once semantics. If sink can be idempotent for handling reprocessing data, it can ensure exactly-once. At present, spark continuous streaming execution mode is still experimental and guarantees at-least-once fault-tolerance.

@CheneyYin CheneyYin force-pushed the support-spark-streaming branch 2 times, most recently from 52bb0ae to f0eced2 Compare September 3, 2024 11:24
@hailin0
Copy link
Member

hailin0 commented Sep 10, 2024

❤️

@CheneyYin CheneyYin force-pushed the support-spark-streaming branch 5 times, most recently from 9630cd0 to 37b6533 Compare September 10, 2024 06:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api core SeaTunnel core module Spark
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants