diff --git a/CHANGES.md b/CHANGES.md index cf2478e02358f..5d821172da195 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -71,6 +71,7 @@ * Go SDK Minimum Go Version updated to 1.21 ([#32092](https://github.com/apache/beam/pull/32092)). * [BigQueryIO] Added support for withFormatRecordOnFailureFunction() for STORAGE_WRITE_API and STORAGE_API_AT_LEAST_ONCE methods (Java) ([#31354](https://github.com/apache/beam/issues/31354)). * Updated Go protobuf package to new version (Go) ([#21515](https://github.com/apache/beam/issues/21515)). +* Adds OrderedListState support for Java SDK via FnApi. ## Breaking Changes diff --git a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto index 422c2e1a5f7c9..10434e514eae1 100644 --- a/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto @@ -1681,6 +1681,10 @@ message StandardProtocols { // the ProcessBundleProgressResponse messages. SDK_CONSUMING_RECEIVED_DATA = 9 [(beam_urn) = "beam:protocol:sdk_consuming_received_data:v1"]; + + // Indicates whether the SDK supports ordered list state. + ORDERED_LIST_STATE = 10 + [(beam_urn) = "beam:protocol:ordered_list_state:v1"]; } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java index 9bdb45fe32eb6..52120b396e1f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Environments.java @@ -488,6 +488,7 @@ public static Set getJavaCapabilities() { capabilities.add(BeamUrns.getUrn(Primitives.TO_STRING)); capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.DATA_SAMPLING)); capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.SDK_CONSUMING_RECEIVED_DATA)); + capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.ORDERED_LIST_STATE)); return capabilities.build(); } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/EnvironmentsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/EnvironmentsTest.java index 93ec4f7e8f7f9..3c6862bd3b3ba 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/EnvironmentsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/EnvironmentsTest.java @@ -216,6 +216,9 @@ public void testCapabilities() { assertThat( Environments.getJavaCapabilities(), hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.DATA_SAMPLING))); + assertThat( + Environments.getJavaCapabilities(), + hasItem(BeamUrns.getUrn(RunnerApi.StandardProtocols.Enum.ORDERED_LIST_STATE))); // Check that SDF truncation is supported assertThat( Environments.getJavaCapabilities(),