Skip to content

Commit

Permalink
docs: update fallback sink docs (#2303)
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang authored Dec 24, 2024
1 parent 8128476 commit faa27a6
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 27 deletions.
2 changes: 1 addition & 1 deletion api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -21764,7 +21764,7 @@
"description": "BackOff specifies the parameters for the backoff strategy, controlling how delays between retries should increase."
},
"onFailure": {
"description": "OnFailure specifies the action to take when a retry fails. The default action is to retry.",
"description": "OnFailure specifies the action to take when the specified retry strategy fails. The possible values are: 1. \"retry\": start another round of retrying the operation, 2. \"fallback\": re-route the operation to a fallback sink and 3. \"drop\": drop the operation and perform no further action. The default action is to retry.",
"type": "string"
}
},
Expand Down
2 changes: 1 addition & 1 deletion api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -21751,7 +21751,7 @@
"$ref": "#/definitions/io.numaproj.numaflow.v1alpha1.Backoff"
},
"onFailure": {
"description": "OnFailure specifies the action to take when a retry fails. The default action is to retry.",
"description": "OnFailure specifies the action to take when the specified retry strategy fails. The possible values are: 1. \"retry\": start another round of retrying the operation, 2. \"fallback\": re-route the operation to a fallback sink and 3. \"drop\": drop the operation and perform no further action. The default action is to retry.",
"type": "string"
}
}
Expand Down
7 changes: 5 additions & 2 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9077,8 +9077,11 @@ OnFailureRetryStrategy </a> </em>
<em>(Optional)</em>
<p>

OnFailure specifies the action to take when a retry fails. The default
action is to retry.
OnFailure specifies the action to take when the specified retry strategy
fails. The possible values are: 1. “retry”: start another round of
retrying the operation, 2. “fallback”: re-route the operation to a
fallback sink and 3. “drop”: drop the operation and perform no further
action. The default action is to retry.
</p>

</td>
Expand Down
48 changes: 32 additions & 16 deletions docs/user-guide/sinks/fallback.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
# Fallback Sink

A `Fallback` Sink functions as a `Dead Letter Queue (DLQ)` Sink and can be configured to serve as a backup when the primary sink is down,
unavailable, or under maintenance. This is particularly useful when multiple sinks are in a pipeline; if a sink fails, the resulting
back-pressure will back-propagate and stop the source vertex from reading more data. A `Fallback` Sink can beset up to prevent this from happening.
This backup sink stores data while the primary sink is offline. The stored data can be replayed once the primary sink is back online.
A `Fallback` Sink functions as a `Dead Letter Queue (DLQ)` Sink.
It can be configured to serve as a backup sink when the primary sink fails processing messages.

Note: The `fallback` field is optional.
## The Use Case

Users are required to return a fallback response from the [user-defined sink](https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/) when the primary sink fails; only
then the messages will be directed to the fallback sink.
Fallback Sink is useful to prevent back pressures caused by failed messages in the primary sink.

Example of a fallback response in a user-defined sink: [here](https://github.com/numaproj/numaflow-go/blob/main/pkg/sinker/examples/fallback/main.go)
In a pipeline without fallback sinks, if a sink fails to process certain messages,
the failed messages, by default, can get retried indefinitely,
causing back pressures propagated all the way back to the source vertex.
Eventually, the pipeline will be blocked, and no new messages will be processed.
A fallback sink can be set up to prevent this from happening, by storing the failed messages in a separate sink.

## CAVEATs
The `fallback` field can only be utilized when the primary sink is a `User Defined Sink.`
## Caveats

A fallback sink can only be configured when the primary sink is a user-defined sink.

## Example
## How to use

### Builtin Kafka
An example using builtin kafka as fallback sink:
To configure a fallback sink,
changes need to be made on both the pipeline specification and the user-defined sink implementation.

### Step 1 - update the specification

Add a `fallback` field to the sink configuration in the pipeline specification file.

The following example uses the builtin kafka as a fallback sink.

```yaml
- name: out
Expand All @@ -34,10 +41,9 @@ An example using builtin kafka as fallback sink:
- my-broker2:19700
topic: my-topic
```
### UD Sink
An example using custom user-defined sink as fallback sink.
User Defined Sink as a fallback sink:
A fallback sink can also be a user-defined sink.
```yaml
- name: out
sink:
Expand All @@ -49,3 +55,13 @@ User Defined Sink as a fallback sink:
container:
image: my-sink:latest
```
### Step 2 - update the user-defined sink implementation
Code changes have to be made in the primary sink to generate either a **failed** response or a **fallback** response,
based on the use case.
* a **failed** response gets processed following the [retry strategy](https://numaflow.numaproj.io/user-guide/sinks/retry-strategy/), and if the retry strategy is set to `fallback`, the message will be directed to the fallback sink after the retries are exhausted.
* a **fallback** response doesn't respect the sink retry strategy. It gets immediately directed to the fallback sink without getting retried.

SDK methods to generate either a fallback or a failed response in a primary user-defined sink can be found here:
[Golang](https://github.com/numaproj/numaflow-go/blob/main/pkg/sinker/types.go), [Java](https://github.com/numaproj/numaflow-java/blob/main/src/main/java/io/numaproj/numaflow/sinker/Response.java), [Python](https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/sinker/_dtypes.py)
8 changes: 5 additions & 3 deletions docs/user-guide/sinks/retry-strategy.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
# Retry Strategy

### Overview

The `RetryStrategy` is used to configure the behavior for a sink after encountering failures during a write operation.
This structure allows the user to specify how Numaflow should respond to different fail-over scenarios for Sinks, ensuring that the writing can be resilient and handle
unexpected issues efficiently.

`RetryStrategy` ONLY gets applied to failed messages. To return a failed messages, use the methods provided by the SDKs.
- `ResponseFailure`for [Golang](https://github.com/numaproj/numaflow-go/blob/main/pkg/sinker/types.go)
- `responseFailure` for [Java](https://github.com/numaproj/numaflow-java/blob/main/src/main/java/io/numaproj/numaflow/sinker/Response.java#L40)
- `as_fallback` for [Python](https://github.com/numaproj/numaflow-python/blob/main/pynumaflow/sinker/_dtypes.py)

### Struct Explanation


`retryStrategy` is optional, and can be added to the Sink spec configurations where retry logic is necessary.



```yaml
sink:
retryStrategy:
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/numaflow/v1alpha1/generated.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion pkg/apis/numaflow/v1alpha1/retry_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ type RetryStrategy struct {
// BackOff specifies the parameters for the backoff strategy, controlling how delays between retries should increase.
// +optional
BackOff *Backoff `json:"backoff,omitempty" protobuf:"bytes,1,opt,name=backoff"`
// OnFailure specifies the action to take when a retry fails. The default action is to retry.
// OnFailure specifies the action to take when the specified retry strategy fails.
// The possible values are:
// 1. "retry": start another round of retrying the operation,
// 2. "fallback": re-route the operation to a fallback sink and
// 3. "drop": drop the operation and perform no further action.
// The default action is to retry.
// +optional
// +kubebuilder:default="retry"
OnFailure *OnFailureRetryStrategy `json:"onFailure,omitempty" protobuf:"bytes,2,opt,name=onFailure"`
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/numaflow/v1alpha1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/numaflow-models/src/models/retry_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ limitations under the License.
pub struct RetryStrategy {
#[serde(rename = "backoff", skip_serializing_if = "Option::is_none")]
pub backoff: Option<Box<crate::models::Backoff>>,
/// OnFailure specifies the action to take when a retry fails. The default action is to retry.
/// OnFailure specifies the action to take when the specified retry strategy fails. The possible values are: 1. \"retry\": start another round of retrying the operation, 2. \"fallback\": re-route the operation to a fallback sink and 3. \"drop\": drop the operation and perform no further action. The default action is to retry.
#[serde(rename = "onFailure", skip_serializing_if = "Option::is_none")]
pub on_failure: Option<String>,
}
Expand Down

0 comments on commit faa27a6

Please sign in to comment.