Skip to content

Commit

Permalink
[improve][io] Support update subscription position for sink connector (
Browse files Browse the repository at this point in the history
…apache#23538)

(cherry picked from commit 3f12269)
  • Loading branch information
shibd authored and nodece committed Nov 6, 2024
1 parent 2527aa4 commit 5c027aa
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (newConfig.getTransformFunctionConfig() != null) {
mergedConfig.setTransformFunctionConfig(newConfig.getTransformFunctionConfig());
}

if (newConfig.getSourceSubscriptionPosition() != null) {
mergedConfig.setSourceSubscriptionPosition(newConfig.getSourceSubscriptionPosition());
}
return mergedConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
Expand Down Expand Up @@ -223,6 +224,18 @@ public void testCleanSubscriptionField() throws IOException {
}
}

@Test
public void testUpdateSubscriptionPosition() {
SinkConfig sinkConfig = createSinkConfig();
SinkConfig newSinkConfig = createSinkConfig();
newSinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Earliest);
SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
assertEquals(
new Gson().toJson(newSinkConfig),
new Gson().toJson(mergedConfig)
);
}

@Test
public void testMergeEqual() {
SinkConfig sinkConfig = createSinkConfig();
Expand Down Expand Up @@ -548,6 +561,7 @@ private SinkConfig createSinkConfig() {
inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
sinkConfig.setInputSpecs(inputSpecs);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
sinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Latest);
sinkConfig.setRetainOrdering(false);
sinkConfig.setRetainKeyOrdering(false);
sinkConfig.setConfigs(new HashMap<>());
Expand Down

0 comments on commit 5c027aa

Please sign in to comment.