Skip to content

Commit 381be9c

Browse files
committed
[FLINK-36439][docs] Documents for ForSt State Backend
1 parent 4f61d85 commit 381be9c

File tree

18 files changed

+917
-96
lines changed

18 files changed

+917
-96
lines changed

Diff for: docs/content.zh/docs/deployment/config.md

+28-2
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ These values are configured as memory sizes, for example *1536m* or *2g*.
160160

161161
You can configure checkpointing directly in code within your Flink job or application. Putting these values here in the configuration defines them as defaults in case the application does not configure anything.
162162

163-
- `state.backend.type`: The state backend to use. This defines the data structure mechanism for taking snapshots. Common values are `hashmap` or `rocksdb`.
163+
- `state.backend.type`: The state backend to use. This defines the data structure mechanism for taking snapshots. Common values are `hashmap`, `rocksdb` or `forst`.
164164
- `execution.checkpointing.dir`: The directory to write checkpoints to. This takes a path URI like *s3://mybucket/flink-app/checkpoints* or *hdfs://namenode:port/flink/checkpoints*.
165165
- `execution.checkpointing.savepoint-dir`: The default directory for savepoints. Takes a path URI, similar to `execution.checkpointing.dir`.
166166
- `execution.checkpointing.interval`: The base interval setting. To enable checkpointing, you need to set this value larger than 0.
@@ -352,6 +352,12 @@ These are the options commonly needed to configure the RocksDB state backend. Se
352352

353353
{{< generated/state_backend_rocksdb_section >}}
354354

355+
### ForSt State Backend
356+
357+
These are the options commonly needed to configure the ForSt state backend. See the [Advanced ForSt Backend Section](#advanced-forst-state-backends-options) for options necessary for advanced low level configurations and trouble-shooting.
358+
359+
{{< generated/state_backend_forst_section >}}
360+
355361
----
356362
----
357363

@@ -374,6 +380,16 @@ Enabling RocksDB's native metrics may cause degraded performance and should be s
374380

375381
{{< generated/rocksdb_native_metric_configuration >}}
376382

383+
### ForSt Native Metrics
384+
385+
ForSt has similar native metric mechanism to RocksDB.
386+
387+
{{< hint warning >}}
388+
Enabling ForSt's native metrics may cause degraded performance and should be set carefully.
389+
{{< /hint >}}
390+
391+
{{< generated/forst_native_metric_configuration >}}
392+
377393
----
378394
----
379395

@@ -474,6 +490,12 @@ Advanced options to tune RocksDB and RocksDB checkpoints.
474490

475491
{{< generated/expert_rocksdb_section >}}
476492

493+
### Advanced ForSt State Backends Options
494+
495+
Advanced options to tune ForSt and ForSt checkpoints.
496+
497+
{{< generated/expert_forst_section >}}
498+
477499
### State Changelog Options
478500

479501
Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on
@@ -484,13 +506,17 @@ using State Changelog. {{< generated/state_changelog_section >}}
484506
These settings take effect when the `state.changelog.storage` is set to `filesystem` (see [above](#state-changelog-storage)).
485507
{{< generated/fs_state_changelog_configuration >}}
486508

487-
**RocksDB Configurable Options**
509+
### RocksDB Configurable Options
488510

489511
These options give fine-grained control over the behavior and resources of ColumnFamilies.
490512
With the introduction of `state.backend.rocksdb.memory.managed` and `state.backend.rocksdb.memory.fixed-per-slot` (Apache Flink 1.10), it should be only necessary to use the options here for advanced performance tuning. These options here can also be specified in the application program via `RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory)`.
491513

492514
{{< generated/rocksdb_configurable_configuration >}}
493515

516+
### ForSt State Backend Configurable Options
517+
518+
{{< generated/forst_configurable_configuration >}}
519+
494520
### Advanced Fault Tolerance Options
495521

496522
*These parameters can help with problems related to failover and to components erroneously considering each other as failed.*

Diff for: docs/content.zh/docs/ops/metrics.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -1541,7 +1541,12 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
15411541
### RocksDB
15421542
Certain RocksDB native metrics are available but disabled by default, you can find full documentation [here]({{< ref "docs/deployment/config" >}}#rocksdb-native-metrics)
15431543

1544-
### ForStDB
1544+
### ForSt
1545+
1546+
Certain ForSt native metrics are available but disabled by default, you can find full documentation [here]({{< ref "docs/deployment/config" >}}#forst-native-metrics)
1547+
1548+
Besides that, we support the following metrics:
1549+
15451550
<table class="table table-bordered">
15461551
<thead>
15471552
<tr>

Diff for: docs/content/docs/deployment/config.md

+28-2
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ These values are configured as memory sizes, for example *1536m* or *2g*.
160160

161161
You can configure checkpointing directly in code within your Flink job or application. Putting these values here in the configuration defines them as defaults in case the application does not configure anything.
162162

163-
- `state.backend.type`: The state backend to use. This defines the data structure mechanism for taking snapshots. Common values are `hashmap` or `rocksdb`.
163+
- `state.backend.type`: The state backend to use. This defines the data structure mechanism for taking snapshots. Common values are `hashmap`, `rocksdb` or `forst`.
164164
- `execution.checkpointing.dir`: The directory to write checkpoints to. This takes a path URI like *s3://mybucket/flink-app/checkpoints* or *hdfs://namenode:port/flink/checkpoints*.
165165
- `execution.checkpointing.savepoint-dir`: The default directory for savepoints. Takes a path URI, similar to `execution.checkpointing.dir`.
166166
- `execution.checkpointing.interval`: The base interval setting. To enable checkpointing, you need to set this value larger than 0.
@@ -354,6 +354,12 @@ These are the options commonly needed to configure the RocksDB state backend. Se
354354

355355
{{< generated/state_backend_rocksdb_section >}}
356356

357+
### ForSt State Backend
358+
359+
These are the options commonly needed to configure the ForSt state backend. See the [Advanced ForSt Backend Section](#advanced-forst-state-backends-options) for options necessary for advanced low level configurations and trouble-shooting.
360+
361+
{{< generated/state_backend_forst_section >}}
362+
357363
----
358364
----
359365

@@ -376,6 +382,16 @@ Enabling RocksDB's native metrics may cause degraded performance and should be s
376382

377383
{{< generated/rocksdb_native_metric_configuration >}}
378384

385+
### ForSt Native Metrics
386+
387+
ForSt has similar native metric mechanism to RocksDB.
388+
389+
{{< hint warning >}}
390+
Enabling ForSt's native metrics may cause degraded performance and should be set carefully.
391+
{{< /hint >}}
392+
393+
{{< generated/forst_native_metric_configuration >}}
394+
379395
----
380396
----
381397

@@ -476,6 +492,12 @@ Advanced options to tune RocksDB and RocksDB checkpoints.
476492

477493
{{< generated/expert_rocksdb_section >}}
478494

495+
### Advanced ForSt State Backends Options
496+
497+
Advanced options to tune ForSt and ForSt checkpoints.
498+
499+
{{< generated/expert_forst_section >}}
500+
479501
### State Changelog Options
480502

481503
Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on
@@ -486,13 +508,17 @@ using State Changelog. {{< generated/state_changelog_section >}}
486508
These settings take effect when the `state.changelog.storage` is set to `filesystem` (see [above](#state-changelog-storage)).
487509
{{< generated/fs_state_changelog_configuration >}}
488510

489-
**RocksDB Configurable Options**
511+
### RocksDB Configurable Options
490512

491513
These options give fine-grained control over the behavior and resources of ColumnFamilies.
492514
With the introduction of `state.backend.rocksdb.memory.managed` and `state.backend.rocksdb.memory.fixed-per-slot` (Apache Flink 1.10), it should be only necessary to use the options here for advanced performance tuning. These options here can also be specified in the application program via `RocksDBStateBackend.setRocksDBOptions(RocksDBOptionsFactory)`.
493515

494516
{{< generated/rocksdb_configurable_configuration >}}
495517

518+
### ForSt State Backend Configurable Options
519+
520+
{{< generated/forst_configurable_configuration >}}
521+
496522
### Advanced Fault Tolerance Options
497523

498524
*These parameters can help with problems related to failover and to components erroneously considering each other as failed.*

Diff for: docs/content/docs/ops/metrics.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -1531,7 +1531,12 @@ Note that for failed checkpoints, metrics are updated on a best efforts basis an
15311531
### RocksDB
15321532
Certain RocksDB native metrics are available but disabled by default, you can find full documentation [here]({{< ref "docs/deployment/config" >}}#rocksdb-native-metrics)
15331533

1534-
### ForStDB
1534+
### ForSt
1535+
1536+
Certain ForSt native metrics are available but disabled by default, you can find full documentation [here]({{< ref "docs/deployment/config" >}}#forst-native-metrics)
1537+
1538+
Besides that, we support the following metrics:
1539+
15351540
<table class="table table-bordered">
15361541
<thead>
15371542
<tr>

Diff for: docs/content/docs/ops/state/large_state_tuning.md

+15-1
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,27 @@ the same time. For applications with large state in Flink, this often ties up to
9797
When a savepoint is manually triggered, it may be in process concurrently with an ongoing checkpoint.
9898

9999

100-
## Tuning RocksDB
100+
## Tuning RocksDB or ForSt
101101

102102
The state storage workhorse of many large scale Flink streaming applications is the *RocksDB State Backend*.
103103
The backend scales well beyond main memory and reliably stores large [keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}).
104104

105+
If you are handling very large state, even exceeding the local disk space of the TaskManagers, you may want to consider
106+
using the disaggregated state store [ForStStateBackend]({{< ref "docs/deployment/config" >}}#forst-state-backend).
107+
This backend stores the state in a separate storage system, such as HDFS or S3, and only keeps the
108+
state metadata and cache in the TaskManagers. And the [State API V2]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}})
109+
is also recommended to cooperate with ForStStateBackend for large state applications.
110+
105111
RocksDB's performance can vary with configuration, this section outlines some best-practices for tuning jobs that use the RocksDB State Backend.
106112

113+
{{< hint info >}}
114+
The design of ForSt is very similar to RocksDB, and the configurable options are almost the same,
115+
so you can refer to following sections to configure ForSt.
116+
117+
The following article is introduced from the perspective of RocksDB. If you want to configure ForSt
118+
in a similar way, you need to use the corresponding configuration under ForSt.
119+
{{< /hint >}}
120+
107121
### Incremental Checkpoints
108122

109123
When it comes to reducing the time that checkpoints take, activating incremental checkpoints should be one of the first considerations.

Diff for: docs/content/docs/ops/state/state_backends.md

+53-4
Original file line numberDiff line numberDiff line change
@@ -95,12 +95,50 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
9595

9696
The total memory amount of RocksDB instance(s) per slot can also be bounded, please refer to documentation [here]({{< ref "docs/ops/state/large_state_tuning" >}}#bounding-rocksdb-memory-usage) for details.
9797

98+
## The ForStStateBackend
99+
100+
The *ForStStateBackend* is a state backend that is based on [ForSt project](https://github.com/ververica/ForSt),
101+
which is also a LSM-tree structured key-value store and built on top of the RocksDB.
102+
It is designed to provide a more efficient way to store and access state in Flink applications.
103+
Most importantly, it can hold its sst files on remote file systems that Flink supports, such as HDFS, S3, etc.
104+
This allows Flink to scale the state size beyond the local disk capacity of the TaskManager.
105+
Moreover, by putting the sst files on remote file systems, it can also provide a more lightweight
106+
way to perform checkpoint and recovery.
107+
108+
The ForStStateBackend is still in the experimental stage and is not fully available for production.
109+
It always performs asynchronous incremental snapshots.
110+
111+
The ForStStateBackend is encouraged for:
112+
113+
- Jobs with very large state, long windows, large key/value states. Local disk may not be enough to
114+
store the state.
115+
- All high-availability setups.
116+
- Asynchronous state access is preferred. Since the ForStStateBackend is the only one supporting
117+
asynchronous state access.
118+
- Jobs that require lightweight checkpoint and recovery, such as cloud-native applications.
119+
120+
Limitations of the ForStStateBackend (for now):
121+
122+
- Same as EmbeddedRocksDBStateBackend, the maximum supported size per key and per value is 2^31 bytes each.
123+
- Does not support canonical savepoint, full snapshot, changelog and file-merging checkpoints.
124+
Always perform incremental snapshots.
125+
126+
Compared with EmbeddedRocksDBStateBackend, ForStStateBackend stores data on remote file system, thus
127+
the amount of state that you can keep is unlimited. The local disk of TaskManager is only used to
128+
store cache of file, to provide better performance. Note that when most of the active state is on
129+
remote file system, the performance of state access may be affected by the network latency. Flink
130+
introduces asynchronous state access to mitigate this issue. If you are using the asynchronous state
131+
methods in State API V2, you can benefit from the asynchronous state access. To get familiar with the
132+
State API V2, please refer to the [State API V2 documentation]({{< ref "docs/dev/datastream/fault-tolerance/state_v2" >}}).
133+
98134
## Choose The Right State Backend
99135

100136
When deciding between `HashMapStateBackend` and `RocksDB`, it is a choice between performance and scalability.
101137
`HashMapStateBackend` is very fast as each state access and update operates on objects on the Java heap; however, state size is limited by available memory within the cluster.
102-
On the other hand, `RocksDB` can scale based on available disk space and is the only state backend to support incremental snapshots.
138+
On the other hand, `RocksDB` can scale based on available disk space.
103139
However, each state access and update requires (de-)serialization and potentially reading from disk which leads to average performance that is an order of magnitude slower than the memory state backends.
140+
If you are handling very large state even exceeding the available disk space,
141+
or you prefer a fast rescale under cloud-native setup, you should consider using `ForStStateBackend`.
104142

105143
{{< hint info >}}
106144
In Flink 1.13 we unified the binary format of Flink's savepoints. That means you can take a savepoint and then restore from it using a different state backend.
@@ -150,18 +188,29 @@ If you want to use the `EmbeddedRocksDBStateBackend` in your IDE or configure it
150188
</dependency>
151189
```
152190

191+
Same for `ForStStateBackend`:
192+
```xml
193+
<dependency>
194+
<groupId>org.apache.flink</groupId>
195+
<artifactId>flink-statebackend-forst</artifactId>
196+
<version>{{< version >}}</version>
197+
<scope>provided</scope>
198+
</dependency>
199+
```
200+
153201
{{< hint info >}}
154-
Since RocksDB is part of the default Flink distribution, you do not need this dependency if you are not using any RocksDB code in your job and configure the state backend via `state.backend.type` and further [checkpointing]({{< ref "docs/deployment/config" >}}#checkpointing) and [RocksDB-specific]({{< ref "docs/deployment/config" >}}#rocksdb-state-backend) parameters in your [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}).
202+
Since RocksDB and ForSt is part of the default Flink distribution, you do not need this dependency if you are not using any RocksDB code in your job and configure the state backend via `state.backend.type` and further [checkpointing]({{< ref "docs/deployment/config" >}}#checkpointing) and [RocksDB-specific]({{< ref "docs/deployment/config" >}}#rocksdb-state-backend) or [ForSt-specific]({{< ref "docs/deployment/config" >}}#forst-state-backend) parameters in your [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}).
155203
{{< /hint >}}
156204

157205

158206
### Setting Default State Backend
159207

160208
A default state backend can be configured in the [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}), using the configuration key `state.backend.type`.
161209

162-
Possible values for the config entry are *hashmap* (HashMapStateBackend), *rocksdb* (EmbeddedRocksDBStateBackend), or the fully qualified class
210+
Possible values for the config entry are *hashmap* (HashMapStateBackend), *rocksdb* (EmbeddedRocksDBStateBackend), *forst* (ForStStateBackend) or the fully qualified class
163211
name of the class that implements the state backend factory {{< gh_link file="flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java" name="StateBackendFactory" >}},
164-
such as `org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory` for EmbeddedRocksDBStateBackend.
212+
such as `org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackendFactory` for EmbeddedRocksDBStateBackend
213+
and `org.apache.flink.state.forst.ForStStateBackendFactory` for ForStStateBackend.
165214

166215
The `execution.checkpointing.dir` option defines the directory to which all backends write checkpoint data and meta data files.
167216
You can find more details about the checkpoint directory structure [here]({{< ref "docs/ops/state/checkpoints" >}}#directory-structure).
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<table class="configuration table table-bordered">
2+
<thead>
3+
<tr>
4+
<th class="text-left" style="width: 20%">Key</th>
5+
<th class="text-left" style="width: 15%">Default</th>
6+
<th class="text-left" style="width: 10%">Type</th>
7+
<th class="text-left" style="width: 55%">Description</th>
8+
</tr>
9+
</thead>
10+
<tbody>
11+
<tr>
12+
<td><h5>state.backend.forst.executor.inline-coordinator</h5></td>
13+
<td style="word-wrap: break-word;">false</td>
14+
<td>Boolean</td>
15+
<td>Whether to let the task thread be the coordinator thread responsible for distributing requests. If set to 'true', the task thread will be responsible for distributing requests, otherwise, a dedicated coordinator thread will be used. The default value is 'false'.</td>
16+
</tr>
17+
<tr>
18+
<td><h5>state.backend.forst.executor.inline-write</h5></td>
19+
<td style="word-wrap: break-word;">true</td>
20+
<td>Boolean</td>
21+
<td>Whether to let write requests be executed within the coordinator thread. If set to 'true', write requests will be executed within the coordinator thread, otherwise, a dedicated write thread will be used. The default value is 'true'.</td>
22+
</tr>
23+
<tr>
24+
<td><h5>state.backend.forst.local-dir</h5></td>
25+
<td style="word-wrap: break-word;">(none)</td>
26+
<td>String</td>
27+
<td>The local directory (on the TaskManager) where ForSt puts some metadata files. By default, it will be &lt;WORKING_DIR&gt;/tmp. See <code class="highlighter-rouge">process.taskmanager.working-dir</code> for more details.</td>
28+
</tr>
29+
<tr>
30+
<td><h5>state.backend.forst.memory.fixed-per-slot</h5></td>
31+
<td style="word-wrap: break-word;">(none)</td>
32+
<td>MemorySize</td>
33+
<td>The fixed total amount of memory per slot, shared among all ForSt instances.This option overrides the 'state.backend.forst.memory.managed' option.</td>
34+
</tr>
35+
<tr>
36+
<td><h5>state.backend.forst.memory.fixed-per-tm</h5></td>
37+
<td style="word-wrap: break-word;">(none)</td>
38+
<td>MemorySize</td>
39+
<td>The fixed total amount of memory per Task Manager, shared among all ForSt instances. This is a cluster-level option. This option only takes effect if 'state.backend.forst.memory.managed' is set to false and 'state.backend.forst.memory.fixed-per-slot' is not configured. If so, then each ForSt column family state has its own memory caches (as controlled by the column family options). The relevant options for the shared resources (e.g. write-buffer-ratio) can be set on the same level (config.yaml). Note that this feature breaks resource isolation between the slots.</td>
40+
</tr>
41+
<tr>
42+
<td><h5>state.backend.forst.memory.managed</h5></td>
43+
<td style="word-wrap: break-word;">true</td>
44+
<td>Boolean</td>
45+
<td>If set true, the ForSt state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc.</td>
46+
</tr>
47+
<tr>
48+
<td><h5>state.backend.forst.options-factory</h5></td>
49+
<td style="word-wrap: break-word;">(none)</td>
50+
<td>String</td>
51+
<td>The options factory class for users to add customized options in DBOptions and ColumnFamilyOptions for ForSt. If set, the ForSt state backend will load the class and apply configs to DBOptions and ColumnFamilyOptions after loading ones from 'ForStConfigurableOptions' and pre-defined options.</td>
52+
</tr>
53+
</tbody>
54+
</table>

0 commit comments

Comments
 (0)