Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a parameter that controls the number of StreamLoad tasks committed per partition #92 #99

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

baishaoisde
Copy link

Add a parameter that controls the number of StreamLoad tasks committed per partition

Issue Number: close #92

  1. Does it affect the original behavior: (I Don't know)
  2. Has unit tests been added: (No)
  3. Has document been added or modified: (Yes)
  4. Does it need to update dependencies: (Yes)
  5. Are there any changes that cannot be rolled back: (No)

…区只能执行一次StreamLoad,即将一个分区的所有数据通过一个StreamLoad任务写入Doris。避免task任务失败重试时对同样的数据提交多次StreamLoad任务。
@baishaoisde baishaoisde changed the title Baishaode pr Add a parameter that controls the number of StreamLoad tasks committed per partition #92 May 17, 2023
@JNSimba
Copy link
Member

JNSimba commented May 26, 2023

Thank you for your contribution, can you resolve the conflict?

@@ -85,4 +85,10 @@ public interface ConfigurationOptions {
String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition";

boolean DORIS_SINK_TASK_USE_REPARTITION_DEFAULT = false;

//设置每个分区仅提交一个StreamLoad任务,以保证任务失败时task重试不会导致对同一批数据重复提交StreamLoad任务。
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

english comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will change it to English.

flush
}
})
flush
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this flush deal with?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flush is to allow each partition to commit only one StreamLoad task when the partitionTaskAtomicity parameter is true, but seems redundant because the code already has the "if (! rowArray.isEmpty) {flush} "operation.

@JNSimba
Copy link
Member

JNSimba commented May 26, 2023

If the data is processed according to the partition, when a single partition is particularly large, there may be problems in a streamload?

@baishaoisde
Copy link
Author

If the data is processed according to the partition, when a single partition is particularly large, there may be problems in a streamload?

Yes, for this problem, we need to prompt the user in the parameter description that "reparation needs to be used to ensure that the data volume per partition is in a reasonable range after this parameter is enabled". Do you think it is appropriate?

gnehil and others added 7 commits May 30, 2023 22:55
* use writer to write data
* resolve conflicts
* unify jackson version
* remove useless code
…ults to false. If false, StreamLoad can be performed multiple times per partition. If true, limit StreamLoad to one time per partition, that is, all the data of a partition is written to Doris via a StreamLoad task. Avoid submitting multiple StreamLoad tasks with the same data when the task fails to retry.
# Conflicts:
#	spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
#	spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Enhancement] Add a parameter that controls the number of StreamLoad tasks committed per partition
4 participants