From faa27a6f2ec33ec93b3a542debef4eb6cead88ea Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Mon, 23 Dec 2024 23:00:07 -0500 Subject: [PATCH] docs: update fallback sink docs (#2303) Signed-off-by: Keran Yang --- api/json-schema/schema.json | 2 +- api/openapi-spec/swagger.json | 2 +- docs/APIs.md | 7 ++- docs/user-guide/sinks/fallback.md | 48 ++++++++++++------- docs/user-guide/sinks/retry-strategy.md | 8 ++-- pkg/apis/numaflow/v1alpha1/generated.proto | 7 ++- pkg/apis/numaflow/v1alpha1/retry_strategy.go | 7 ++- .../numaflow/v1alpha1/zz_generated.openapi.go | 2 +- .../src/models/retry_strategy.rs | 2 +- 9 files changed, 58 insertions(+), 27 deletions(-) diff --git a/api/json-schema/schema.json b/api/json-schema/schema.json index 84ed58ed90..5255c9b514 100644 --- a/api/json-schema/schema.json +++ b/api/json-schema/schema.json @@ -21764,7 +21764,7 @@ "description": "BackOff specifies the parameters for the backoff strategy, controlling how delays between retries should increase." }, "onFailure": { - "description": "OnFailure specifies the action to take when a retry fails. The default action is to retry.", + "description": "OnFailure specifies the action to take when the specified retry strategy fails. The possible values are: 1. \"retry\": start another round of retrying the operation, 2. \"fallback\": re-route the operation to a fallback sink and 3. \"drop\": drop the operation and perform no further action. The default action is to retry.", "type": "string" } }, diff --git a/api/openapi-spec/swagger.json b/api/openapi-spec/swagger.json index 19c65fe5cf..f85fb66f1e 100644 --- a/api/openapi-spec/swagger.json +++ b/api/openapi-spec/swagger.json @@ -21751,7 +21751,7 @@ "$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Backoff" }, "onFailure": { - "description": "OnFailure specifies the action to take when a retry fails. The default action is to retry.", + "description": "OnFailure specifies the action to take when the specified retry strategy fails. The possible values are: 1. \"retry\": start another round of retrying the operation, 2. \"fallback\": re-route the operation to a fallback sink and 3. \"drop\": drop the operation and perform no further action. The default action is to retry.", "type": "string" } } diff --git a/docs/APIs.md b/docs/APIs.md index bcda2eb8d4..fb07d8291d 100644 --- a/docs/APIs.md +++ b/docs/APIs.md @@ -9077,8 +9077,11 @@ OnFailureRetryStrategy (Optional)

-OnFailure specifies the action to take when a retry fails. The default -action is to retry. +OnFailure specifies the action to take when the specified retry strategy +fails. The possible values are: 1. “retry”: start another round of +retrying the operation, 2. “fallback”: re-route the operation to a +fallback sink and 3. “drop”: drop the operation and perform no further +action. The default action is to retry.

diff --git a/docs/user-guide/sinks/fallback.md b/docs/user-guide/sinks/fallback.md index 576206f1b2..af0da98f5a 100644 --- a/docs/user-guide/sinks/fallback.md +++ b/docs/user-guide/sinks/fallback.md @@ -1,25 +1,32 @@ # Fallback Sink -A `Fallback` Sink functions as a `Dead Letter Queue (DLQ)` Sink and can be configured to serve as a backup when the primary sink is down, -unavailable, or under maintenance. This is particularly useful when multiple sinks are in a pipeline; if a sink fails, the resulting -back-pressure will back-propagate and stop the source vertex from reading more data. A `Fallback` Sink can beset up to prevent this from happening. -This backup sink stores data while the primary sink is offline. The stored data can be replayed once the primary sink is back online. +A `Fallback` Sink functions as a `Dead Letter Queue (DLQ)` Sink. +It can be configured to serve as a backup sink when the primary sink fails processing messages. -Note: The `fallback` field is optional. +## The Use Case -Users are required to return a fallback response from the [user-defined sink](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/) when the primary sink fails; only -then the messages will be directed to the fallback sink. +Fallback Sink is useful to prevent back pressures caused by failed messages in the primary sink. -Example of a fallback response in a user-defined sink: [here](https://github.com/numaproj/numaflow-go/blob/main/pkg/sinker/examples/fallback/main.go) +In a pipeline without fallback sinks, if a sink fails to process certain messages, +the failed messages, by default, can get retried indefinitely, +causing back pressures propagated all the way back to the source vertex. +Eventually, the pipeline will be blocked, and no new messages will be processed. +A fallback sink can be set up to prevent this from happening, by storing the failed messages in a separate sink. -## CAVEATs -The `fallback` field can only be utilized when the primary sink is a `User Defined Sink.` +## Caveats +A fallback sink can only be configured when the primary sink is a user-defined sink. -## Example +## How to use -### Builtin Kafka -An example using builtin kafka as fallback sink: +To configure a fallback sink, +changes need to be made on both the pipeline specification and the user-defined sink implementation. + +### Step 1 - update the specification + +Add a `fallback` field to the sink configuration in the pipeline specification file. + +The following example uses the builtin kafka as a fallback sink. ```yaml - name: out @@ -34,10 +41,9 @@ An example using builtin kafka as fallback sink: - my-broker2:19700 topic: my-topic ``` -### UD Sink -An example using custom user-defined sink as fallback sink. -User Defined Sink as a fallback sink: +A fallback sink can also be a user-defined sink. + ```yaml - name: out sink: @@ -49,3 +55,13 @@ User Defined Sink as a fallback sink: container: image: my-sink:latest ``` +### Step 2 - update the user-defined sink implementation + +Code changes have to be made in the primary sink to generate either a **failed** response or a **fallback** response, +based on the use case. + +* a **failed** response gets processed following the [retry strategy](https://numaflow.numaproj.io/user-guide/sinks/retry-strategy/), and if the retry strategy is set to `fallback`, the message will be directed to the fallback sink after the retries are exhausted. +* a **fallback** response doesn't respect the sink retry strategy. It gets immediately directed to the fallback sink without getting retried. + +SDK methods to generate either a fallback or a failed response in a primary user-defined sink can be found here: +[Golang](https://github.com/numaproj/numaflow-go/blob/main/pkg/sinker/types.go), [Java](https://github.com/numaproj/numaflow-java/blob/main/src/main/java/io/numaproj/numaflow/sinker/Response.java), [Python](https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/sinker/_dtypes.py) diff --git a/docs/user-guide/sinks/retry-strategy.md b/docs/user-guide/sinks/retry-strategy.md index a5b2a7264b..486f623461 100644 --- a/docs/user-guide/sinks/retry-strategy.md +++ b/docs/user-guide/sinks/retry-strategy.md @@ -1,18 +1,20 @@ # Retry Strategy ### Overview + The `RetryStrategy` is used to configure the behavior for a sink after encountering failures during a write operation. This structure allows the user to specify how Numaflow should respond to different fail-over scenarios for Sinks, ensuring that the writing can be resilient and handle unexpected issues efficiently. +`RetryStrategy` ONLY gets applied to failed messages. To return a failed messages, use the methods provided by the SDKs. +- `ResponseFailure`for [Golang](https://github.com/numaproj/numaflow-go/blob/main/pkg/sinker/types.go) +- `responseFailure` for [Java](https://github.com/numaproj/numaflow-java/blob/main/src/main/java/io/numaproj/numaflow/sinker/Response.java#L40) +- `as_fallback` for [Python](https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/sinker/_dtypes.py) ### Struct Explanation - `retryStrategy` is optional, and can be added to the Sink spec configurations where retry logic is necessary. - - ```yaml sink: retryStrategy: diff --git a/pkg/apis/numaflow/v1alpha1/generated.proto b/pkg/apis/numaflow/v1alpha1/generated.proto index 7b81e2235e..1a677e81af 100644 --- a/pkg/apis/numaflow/v1alpha1/generated.proto +++ b/pkg/apis/numaflow/v1alpha1/generated.proto @@ -1397,7 +1397,12 @@ message RetryStrategy { // +optional optional Backoff backoff = 1; - // OnFailure specifies the action to take when a retry fails. The default action is to retry. + // OnFailure specifies the action to take when the specified retry strategy fails. + // The possible values are: + // 1. "retry": start another round of retrying the operation, + // 2. "fallback": re-route the operation to a fallback sink and + // 3. "drop": drop the operation and perform no further action. + // The default action is to retry. // +optional // +kubebuilder:default="retry" optional string onFailure = 2; diff --git a/pkg/apis/numaflow/v1alpha1/retry_strategy.go b/pkg/apis/numaflow/v1alpha1/retry_strategy.go index 12c9daab4b..c21be28a57 100644 --- a/pkg/apis/numaflow/v1alpha1/retry_strategy.go +++ b/pkg/apis/numaflow/v1alpha1/retry_strategy.go @@ -36,7 +36,12 @@ type RetryStrategy struct { // BackOff specifies the parameters for the backoff strategy, controlling how delays between retries should increase. // +optional BackOff *Backoff `json:"backoff,omitempty" protobuf:"bytes,1,opt,name=backoff"` - // OnFailure specifies the action to take when a retry fails. The default action is to retry. + // OnFailure specifies the action to take when the specified retry strategy fails. + // The possible values are: + // 1. "retry": start another round of retrying the operation, + // 2. "fallback": re-route the operation to a fallback sink and + // 3. "drop": drop the operation and perform no further action. + // The default action is to retry. // +optional // +kubebuilder:default="retry" OnFailure *OnFailureRetryStrategy `json:"onFailure,omitempty" protobuf:"bytes,2,opt,name=onFailure"` diff --git a/pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go b/pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go index b5cff624f8..3b20e1948c 100644 --- a/pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go +++ b/pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go @@ -4524,7 +4524,7 @@ func schema_pkg_apis_numaflow_v1alpha1_RetryStrategy(ref common.ReferenceCallbac }, "onFailure": { SchemaProps: spec.SchemaProps{ - Description: "OnFailure specifies the action to take when a retry fails. The default action is to retry.", + Description: "OnFailure specifies the action to take when the specified retry strategy fails. The possible values are: 1. \"retry\": start another round of retrying the operation, 2. \"fallback\": re-route the operation to a fallback sink and 3. \"drop\": drop the operation and perform no further action. The default action is to retry.", Type: []string{"string"}, Format: "", }, diff --git a/rust/numaflow-models/src/models/retry_strategy.rs b/rust/numaflow-models/src/models/retry_strategy.rs index 0b1a52a654..22cfc4809a 100644 --- a/rust/numaflow-models/src/models/retry_strategy.rs +++ b/rust/numaflow-models/src/models/retry_strategy.rs @@ -22,7 +22,7 @@ limitations under the License. pub struct RetryStrategy { #[serde(rename = "backoff", skip_serializing_if = "Option::is_none")] pub backoff: Option>, - /// OnFailure specifies the action to take when a retry fails. The default action is to retry. + /// OnFailure specifies the action to take when the specified retry strategy fails. The possible values are: 1. \"retry\": start another round of retrying the operation, 2. \"fallback\": re-route the operation to a fallback sink and 3. \"drop\": drop the operation and perform no further action. The default action is to retry. #[serde(rename = "onFailure", skip_serializing_if = "Option::is_none")] pub on_failure: Option, }