Skip to content

Commit

Permalink
fix(udf): prevent continuous retries on connection error (#13804)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <[email protected]>
  • Loading branch information
wangrunji0408 authored Dec 5, 2023
1 parent 6bf5d47 commit 59d56c3
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
44 changes: 39 additions & 5 deletions src/expr/core/src/expr/expr_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::{Arc, LazyLock, Mutex, Weak};

use arrow_schema::{Field, Fields, Schema};
Expand All @@ -38,8 +39,16 @@ pub struct UdfExpression {
client: Arc<ArrowFlightUdfClient>,
identifier: String,
span: await_tree::Span,
/// Number of remaining successful calls until retry is enabled.
/// If non-zero, we will not retry on connection errors to prevent blocking the stream.
/// On each connection error, the count will be reset to `INITIAL_RETRY_COUNT`.
/// On each successful call, the count will be decreased by 1.
/// See <https://github.com/risingwavelabs/risingwave/issues/13791>.
disable_retry_count: AtomicU8,
}

const INITIAL_RETRY_COUNT: u8 = 16;

#[async_trait::async_trait]
impl Expression for UdfExpression {
fn return_type(&self) -> DataType {
Expand Down Expand Up @@ -98,11 +107,35 @@ impl UdfExpression {
)
.expect("failed to build record batch");

let output = self
.client
.call_with_retry(&self.identifier, input)
.instrument_await(self.span.clone())
.await?;
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let result = if disable_retry_count != 0 {
self.client
.call(&self.identifier, input)
.instrument_await(self.span.clone())
.await
} else {
self.client
.call_with_retry(&self.identifier, input)
.instrument_await(self.span.clone())
.await
};
let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed);
let connection_error = matches!(&result, Err(e) if e.is_connection_error());
if connection_error && disable_retry_count != INITIAL_RETRY_COUNT {
// reset count on connection error
self.disable_retry_count
.store(INITIAL_RETRY_COUNT, Ordering::Relaxed);
} else if !connection_error && disable_retry_count != 0 {
// decrease count on success, ignore if exchange failed
_ = self.disable_retry_count.compare_exchange(
disable_retry_count,
disable_retry_count - 1,
Ordering::Relaxed,
Ordering::Relaxed,
);
}
let output = result?;

if output.num_rows() != vis.count_ones() {
bail!(
"UDF returned {} rows, but expected {}",
Expand Down Expand Up @@ -165,6 +198,7 @@ impl Build for UdfExpression {
client,
identifier: udf.identifier.clone(),
span: format!("expr_udf_call ({})", udf.identifier).into(),
disable_retry_count: AtomicU8::new(0),
})
}
}
Expand Down
6 changes: 0 additions & 6 deletions src/udf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ impl Error {
/// Returns true if the error is caused by a connection error.
pub fn is_connection_error(&self) -> bool {
match self.inner() {
// stream closed because of a broken pipe
ErrorInner::Flight(FlightError::Tonic(status))
if status.code() == tonic::Code::Unknown =>
{
true
}
// Connection refused
ErrorInner::Tonic(status) if status.code() == tonic::Code::Unavailable => true,
_ => false,
Expand Down

0 comments on commit 59d56c3

Please sign in to comment.