Skip to content

Commit

Permalink
add a retry interval config
Browse files Browse the repository at this point in the history
  • Loading branch information
fornaix committed Sep 15, 2023
1 parent 3d49195 commit b85b927
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public interface ConfigurationOptions {

int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;

String DORIS_SINK_RETRY_INTERVAL_MS = "doris.sink.retry.interval.ms";

int DORIS_SINK_RETRY_INTERVAL_MS_DEFAULT = 50;

/**
* set types to ignore, split by comma
* e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import java.io.IOException
import java.time.Duration
import java.util
import java.util.Objects
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.LockSupport
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Success}
Expand All @@ -46,8 +48,10 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
private val sinkTaskUseRepartition: Boolean = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION,
ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
private val batchIntervalMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT)
private val retryIntervalMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_RETRY_INTERVAL_MS,
ConfigurationOptions.DORIS_SINK_RETRY_INTERVAL_MS_DEFAULT)

private val enable2PC: Boolean = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC,
ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT);
Expand Down Expand Up @@ -84,10 +88,12 @@ class DorisWriter(settings: SparkSettings) extends Serializable {
resultRdd.foreachPartition(iterator => {
while (iterator.hasNext) {
// do load batch with retries
Utils.retry[Int, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) {
Utils.retry[Int, Exception](maxRetryTimes, Duration.ofMillis(retryIntervalMs.toLong), logger) {
loadFunc(iterator.asJava, schema)
} match {
case Success(txnId) => if (enable2PC) handleLoadSuccess(txnId, preCommittedTxnAcc)
case Success(txnId) =>
if (enable2PC) handleLoadSuccess(txnId, preCommittedTxnAcc)
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(batchIntervalMs.toLong))
case Failure(e) =>
if (enable2PC) handleLoadFailure(preCommittedTxnAcc)
throw new IOException(
Expand Down

0 comments on commit b85b927

Please sign in to comment.