Skip to content

Commit

Permalink
[FLINK-34458][checkpointing] Rename options for Generalized increment…
Browse files Browse the repository at this point in the history
…al checkpoints (changelog) (apache#24324)
  • Loading branch information
Zakelly authored and masteryhx committed Feb 22, 2024
1 parent 9308e10 commit e7e973e
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 69 deletions.
4 changes: 2 additions & 2 deletions docs/content.zh/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,11 +466,11 @@ Advanced options to tune RocksDB and RocksDB checkpoints.
### State Changelog Options

Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on
using State Changelog. {{< generated/state_backend_changelog_section >}}
using State Changelog. {{< generated/state_changelog_section >}}

#### FileSystem-based Changelog options

These settings take effect when the `state.backend.changelog.storage` is set to `filesystem` (see [above](#state-backend-changelog-storage)).
These settings take effect when the `state.changelog.storage` is set to `filesystem` (see [above](#state-changelog-storage)).
{{< generated/fs_state_changelog_configuration >}}

**RocksDB Configurable Options**
Expand Down
2 changes: 1 addition & 1 deletion docs/content.zh/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1734,7 +1734,7 @@ Note that the metrics are only available via reporters.
</tr>
<tr>
<td>changelogBusyTimeMsPerSecond</td>
<td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'dstl.dfs.upload.max-in-flight' for more information.</td>
<td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'state.changelog.dstl.dfs.upload.max-in-flight' for more information.</td>
<td>Gauge</td>
</tr>
<tr>
Expand Down
8 changes: 4 additions & 4 deletions docs/content.zh/docs/ops/state/state_backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以
值得注意的是虽然 Changelog 增加了少量的日常 CPU 和网络带宽资源使用,
但会降低峰值的 CPU 和网络带宽使用量。

另一项需要考虑的事情是恢复时间。取决于 `state.backend.changelog.periodic-materialize.interval` 的设置,changelog 可能会变得冗长,因此重放会花费更多时间。即使这样,恢复时间加上 checkpoint 持续时间仍然可能低于不开启 changelog 功能的时间,从而在故障恢复的情况下也能提供更低的端到端延迟。当然,取决于上述时间的实际比例,有效恢复时间也有可能会增加。
另一项需要考虑的事情是恢复时间。取决于 `state.changelog.periodic-materialize.interval` 的设置,changelog 可能会变得冗长,因此重放会花费更多时间。即使这样,恢复时间加上 checkpoint 持续时间仍然可能低于不开启 changelog 功能的时间,从而在故障恢复的情况下也能提供更低的端到端延迟。当然,取决于上述时间的实际比例,有效恢复时间也有可能会增加。

有关更多详细信息,请参阅 [FLIP-158](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints)。

Expand All @@ -401,9 +401,9 @@ Changelog 是一项旨在减少 checkpointing 时间的功能,因此也可以

这是 YAML 中的示例配置:
```yaml
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem # 当前只支持 filesystem 和 memory(仅供测试用)
dstl.dfs.base-path: s3://<bucket-name> # 类似于 state.checkpoints.dir
state.changelog.enabled: true
state.changelog.storage: filesystem # 当前只支持 filesystem 和 memory(仅供测试用)
state.changelog.dstl.dfs.base-path: s3://<bucket-name> # 类似于 state.checkpoints.dir
```

请将如下配置保持默认值 (参见[限制](#limitations)):
Expand Down
4 changes: 2 additions & 2 deletions docs/content/docs/deployment/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,11 +468,11 @@ Advanced options to tune RocksDB and RocksDB checkpoints.
### State Changelog Options

Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on
using State Changelog. {{< generated/state_backend_changelog_section >}}
using State Changelog. {{< generated/state_changelog_section >}}

#### FileSystem-based Changelog options

These settings take effect when the `state.backend.changelog.storage` is set to `filesystem` (see [above](#state-backend-changelog-storage)).
These settings take effect when the `state.changelog.storage` is set to `filesystem` (see [above](#state-changelog-storage)).
{{< generated/fs_state_changelog_configuration >}}

**RocksDB Configurable Options**
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,7 @@ Note that the metrics are only available via reporters.
</tr>
<tr>
<td>changelogBusyTimeMsPerSecond</td>
<td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'dstl.dfs.upload.max-in-flight' for more information.</td>
<td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'state.changelog.dstl.dfs.upload.max-in-flight' for more information.</td>
<td>Gauge</td>
</tr>
<tr>
Expand Down
8 changes: 4 additions & 4 deletions docs/content/docs/ops/state/state_backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ However, resource usage is higher:
It is worth noting that changelog adds a small amount of daily CPU and network bandwidth resources,
but reduces peak CPU and network bandwidth usage.

Recovery time is another thing to consider. Depending on the `state.backend.changelog.periodic-materialize.interval`
Recovery time is another thing to consider. Depending on the `state.changelog.periodic-materialize.interval`
setting, the changelog can become lengthy and replaying it may take more time. However, recovery time combined with
checkpoint duration will likely still be lower than in non-changelog setups, providing lower end-to-end latency even in
failover case. However, it's also possible that the effective recovery time will increase, depending on the actual ratio
Expand All @@ -402,9 +402,9 @@ Make sure to [add]({{< ref "docs/deployment/filesystems/overview" >}}) the neces

Here is an example configuration in YAML:
```yaml
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem # currently, only filesystem and memory (for tests) are supported
dstl.dfs.base-path: s3://<bucket-name> # similar to state.checkpoints.dir
state.changelog.enabled: true
state.changelog.storage: filesystem # currently, only filesystem and memory (for tests) are supported
state.changelog.dstl.dfs.base-path: s3://<bucket-name> # similar to state.checkpoints.dir
```

Please keep the following defaults (see [limitations](#limitations)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,88 +9,88 @@
</thead>
<tbody>
<tr>
<td><h5>dstl.dfs.base-path</h5></td>
<td><h5>state.changelog.dstl.dfs.base-path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Base path to store changelog files.</td>
</tr>
<tr>
<td><h5>dstl.dfs.batch.persist-delay</h5></td>
<td><h5>state.changelog.dstl.dfs.batch.persist-delay</h5></td>
<td style="word-wrap: break-word;">10 ms</td>
<td>Duration</td>
<td>Delay before persisting changelog after receiving persist request (on checkpoint). Minimizes the number of files and requests if multiple operators (backends) or sub-tasks are using the same store. Correspondingly increases checkpoint time (async phase).</td>
</tr>
<tr>
<td><h5>dstl.dfs.batch.persist-size-threshold</h5></td>
<td><h5>state.changelog.dstl.dfs.batch.persist-size-threshold</h5></td>
<td style="word-wrap: break-word;">10 mb</td>
<td>MemorySize</td>
<td>Size threshold for state changes that were requested to be persisted but are waiting for dstl.dfs.batch.persist-delay (from all operators). . Once reached, accumulated changes are persisted immediately. This is different from dstl.dfs.preemptive-persist-threshold as it happens AFTER the checkpoint and potentially for state changes of multiple operators. Must not exceed in-flight data limit (see below)</td>
<td>Size threshold for state changes that were requested to be persisted but are waiting for state.changelog.dstl.dfs.batch.persist-delay (from all operators). . Once reached, accumulated changes are persisted immediately. This is different from state.changelog.dstl.dfs.preemptive-persist-threshold as it happens AFTER the checkpoint and potentially for state changes of multiple operators. Must not exceed in-flight data limit (see below)</td>
</tr>
<tr>
<td><h5>dstl.dfs.compression.enabled</h5></td>
<td><h5>state.changelog.dstl.dfs.compression.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable compression when serializing changelog.</td>
</tr>
<tr>
<td><h5>dstl.dfs.discard.num-threads</h5></td>
<td><h5>state.changelog.dstl.dfs.discard.num-threads</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Number of threads to use to discard changelog (e.g. pre-emptively uploaded unused state).</td>
</tr>
<tr>
<td><h5>dstl.dfs.download.local-cache.idle-timeout-ms</h5></td>
<td><h5>state.changelog.dstl.dfs.download.local-cache.idle-timeout-ms</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
<td>Maximum idle time for cache files of distributed changelog file, after which the cache files will be deleted.</td>
</tr>
<tr>
<td><h5>dstl.dfs.preemptive-persist-threshold</h5></td>
<td><h5>state.changelog.dstl.dfs.preemptive-persist-threshold</h5></td>
<td style="word-wrap: break-word;">5 mb</td>
<td>MemorySize</td>
<td>Size threshold for state changes of a single operator beyond which they are persisted pre-emptively without waiting for a checkpoint. Improves checkpointing time by allowing quasi-continuous uploading of state changes (as opposed to uploading all accumulated changes on checkpoint).</td>
</tr>
<tr>
<td><h5>dstl.dfs.upload.buffer-size</h5></td>
<td><h5>state.changelog.dstl.dfs.upload.buffer-size</h5></td>
<td style="word-wrap: break-word;">1 mb</td>
<td>MemorySize</td>
<td>Buffer size used when uploading change sets</td>
</tr>
<tr>
<td><h5>dstl.dfs.upload.max-attempts</h5></td>
<td><h5>state.changelog.dstl.dfs.upload.max-attempts</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Maximum number of attempts (including the initial one) to perform a particular upload. Only takes effect if dstl.dfs.upload.retry-policy is fixed.</td>
<td>Maximum number of attempts (including the initial one) to perform a particular upload. Only takes effect if state.changelog.dstl.dfs.upload.retry-policy is fixed.</td>
</tr>
<tr>
<td><h5>dstl.dfs.upload.max-in-flight</h5></td>
<td><h5>state.changelog.dstl.dfs.upload.max-in-flight</h5></td>
<td style="word-wrap: break-word;">100 mb</td>
<td>MemorySize</td>
<td>Max amount of data allowed to be in-flight. Upon reaching this limit the task will be back-pressured. I.e., snapshotting will block; normal processing will block if dstl.dfs.preemptive-persist-threshold is set and reached. The limit is applied to the total size of in-flight changes if multiple operators/backends are using the same changelog storage. Must be greater than or equal to dstl.dfs.batch.persist-size-threshold</td>
<td>Max amount of data allowed to be in-flight. Upon reaching this limit the task will be back-pressured. I.e., snapshotting will block; normal processing will block if state.changelog.dstl.dfs.preemptive-persist-threshold is set and reached. The limit is applied to the total size of in-flight changes if multiple operators/backends are using the same changelog storage. Must be greater than or equal to state.changelog.dstl.dfs.batch.persist-size-threshold</td>
</tr>
<tr>
<td><h5>dstl.dfs.upload.next-attempt-delay</h5></td>
<td><h5>state.changelog.dstl.dfs.upload.next-attempt-delay</h5></td>
<td style="word-wrap: break-word;">500 ms</td>
<td>Duration</td>
<td>Delay before the next attempt (if the failure was not caused by a timeout).</td>
</tr>
<tr>
<td><h5>dstl.dfs.upload.num-threads</h5></td>
<td><h5>state.changelog.dstl.dfs.upload.num-threads</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>Number of threads to use for upload.</td>
</tr>
<tr>
<td><h5>dstl.dfs.upload.retry-policy</h5></td>
<td><h5>state.changelog.dstl.dfs.upload.retry-policy</h5></td>
<td style="word-wrap: break-word;">"fixed"</td>
<td>String</td>
<td>Retry policy for the failed uploads (in particular, timed out). Valid values: none, fixed.</td>
</tr>
<tr>
<td><h5>dstl.dfs.upload.timeout</h5></td>
<td><h5>state.changelog.dstl.dfs.upload.timeout</h5></td>
<td style="word-wrap: break-word;">1 s</td>
<td>Duration</td>
<td>Time threshold beyond which an upload is considered timed out. If a new attempt is made but this upload succeeds earlier then this upload result will be used. May improve upload times if tail latencies of upload requests are significantly high. Only takes effect if dstl.dfs.upload.retry-policy is fixed. Please note that timeout * max_attempts should be less than execution.checkpointing.timeout</td>
<td>Time threshold beyond which an upload is considered timed out. If a new attempt is made but this upload succeeds earlier then this upload result will be used. May improve upload times if tail latencies of upload requests are significantly high. Only takes effect if state.changelog.dstl.dfs.upload.retry-policy is fixed. Please note that timeout * max_attempts should be less than execution.checkpointing.timeout</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,31 @@
</thead>
<tbody>
<tr>
<td><h5>state.backend.changelog.enabled</h5></td>
<td><h5>state.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.max-failures-allowed</h5></td>
<td><h5>state.changelog.max-failures-allowed</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Max number of consecutive materialization failures allowed.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.periodic-materialize.enabled</h5></td>
<td><h5>state.changelog.periodic-materialize.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Defines whether to enable periodic materialization, all changelogs will not be truncated which may increase the space of checkpoint if disabled</td>
</tr>
<tr>
<td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
<td><h5>state.changelog.periodic-materialize.interval</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
<td>Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.backend.changelog.periodic-materialize.enabled is true</td>
<td>Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.changelog.periodic-materialize.enabled is true</td>
</tr>
<tr>
<td><h5>state.backend.changelog.storage</h5></td>
<td><h5>state.changelog.storage</h5></td>
<td style="word-wrap: break-word;">"memory"</td>
<td>String</td>
<td>The storage to be used to store state changelog.<br />The implementation can be specified via their shortcut name.<br />The list of recognized shortcut names currently includes 'memory' and 'filesystem'.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,31 @@
</thead>
<tbody>
<tr>
<td><h5>state.backend.changelog.enabled</h5></td>
<td><h5>state.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.max-failures-allowed</h5></td>
<td><h5>state.changelog.max-failures-allowed</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Max number of consecutive materialization failures allowed.</td>
</tr>
<tr>
<td><h5>state.backend.changelog.periodic-materialize.enabled</h5></td>
<td><h5>state.changelog.periodic-materialize.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Defines whether to enable periodic materialization, all changelogs will not be truncated which may increase the space of checkpoint if disabled</td>
</tr>
<tr>
<td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
<td><h5>state.changelog.periodic-materialize.interval</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
<td>Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.backend.changelog.periodic-materialize.enabled is true</td>
<td>Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.changelog.periodic-materialize.enabled is true</td>
</tr>
<tr>
<td><h5>state.backend.changelog.storage</h5></td>
<td><h5>state.changelog.storage</h5></td>
<td style="word-wrap: break-word;">"memory"</td>
<td>String</td>
<td>The storage to be used to store state changelog.<br />The implementation can be specified via their shortcut name.<br />The list of recognized shortcut names currently includes 'memory' and 'filesystem'.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public static final class Sections {

public static final String STATE_LATENCY_TRACKING = "state_latency_tracking";

public static final String STATE_BACKEND_CHANGELOG = "state_backend_changelog";
public static final String STATE_CHANGELOG = "state_changelog";

public static final String EXPERT_CLASS_LOADING = "expert_class_loading";
public static final String EXPERT_DEBUGGING_AND_TUNING = "expert_debugging_and_tuning";
Expand Down
Loading

0 comments on commit e7e973e

Please sign in to comment.