Skip to content

Commit

Permalink
Committer for exactly once sink (#157)
Browse files Browse the repository at this point in the history
Invoked BQ storage flush API for commit operation
  • Loading branch information
jayehwhyehentee authored Sep 30, 2024
1 parent 6aceee8 commit 0f79fba
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@

import org.apache.flink.api.connector.sink2.Committer;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.services.BigQueryServices;
import com.google.cloud.flink.bigquery.services.BigQueryServicesFactory;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
Expand All @@ -30,10 +39,49 @@
*/
public class BigQueryCommitter implements Committer<BigQueryCommittable>, Closeable {

private static final Logger LOG = LoggerFactory.getLogger(BigQueryCommitter.class);

private final BigQueryConnectOptions connectOptions;

public BigQueryCommitter(BigQueryConnectOptions connectOptions) {
this.connectOptions = connectOptions;
}

@Override
public void commit(Collection<CommitRequest<BigQueryCommittable>> commitRequests)
throws IOException, InterruptedException {}
public void commit(Collection<CommitRequest<BigQueryCommittable>> commitRequests) {
if (commitRequests.isEmpty()) {
LOG.info("No committable found. Nothing to commit!");
return;
}
try (BigQueryServices.StorageWriteClient writeClient =
BigQueryServicesFactory.instance(connectOptions).storageWrite()) {
for (CommitRequest<BigQueryCommittable> commitRequest : commitRequests) {
BigQueryCommittable committable = commitRequest.getCommittable();
long producerId = committable.getProducerId();
String streamName = committable.getStreamName();
long streamOffset = committable.getStreamOffset();
LOG.info("Committing records appended by producer {}", producerId);
LOG.debug(
"Invoking flushRows API on stream {} till offset {}",
streamName,
streamOffset);
FlushRowsResponse response = writeClient.flushRows(streamName, streamOffset);
if (response.getOffset() != streamOffset) {
LOG.error(
"BigQuery FlushRows API failed. Returned offset {}, expected {}",
response.getOffset(),
streamOffset);
throw new BigQueryConnectorException(
String.format("Commit operation failed for producer %d", producerId));
}
}
} catch (IOException | ApiException e) {
throw new BigQueryConnectorException("Commit operation failed", e);
}
}

@Override
public void close() {}
public void close() {
// No op.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,11 @@ public Collection<BigQueryCommittable> prepareCommit()
logger.info("No new data appended in subtask {}. Nothing to commit.", subtaskId);
return Collections.EMPTY_LIST;
}
// The value of streamOffset in writer represents the next available offset where append
// should be performed. However, committer needs to know the offset up to which data can be
// committed. That latest committable offset is `streamOffset - 1`.
return Collections.singletonList(
new BigQueryCommittable(subtaskId, streamName, streamOffset));
new BigQueryCommittable(subtaskId, streamName, streamOffset - 1));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatistics2;
Expand Down Expand Up @@ -370,7 +372,10 @@ public WriteStream createWriteStream(String tablePath, WriteStream.Type streamTy
@Override
public FlushRowsResponse flushRows(String streamName, long offset) {
if (flushResponse == null) {
throw new RuntimeException("testing error scenario");
throw new ApiException(
new RuntimeException("testing error scenario"),
new TestStatusCode(),
false);
}
return flushResponse;
}
Expand Down Expand Up @@ -402,6 +407,19 @@ public void verifytAppendWithOffsetInvocations(int expectedInvocations) {
.append(Mockito.any(), Mockito.anyLong());
}
}

static class TestStatusCode implements StatusCode {

@Override
public Code getCode() {
return null;
}

@Override
public Object getTransportCode() {
return null;
}
}
}

public static final String SIMPLE_AVRO_SCHEMA_FIELDS_STRING =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright 2024 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.flink.bigquery.sink.committer;

import org.apache.flink.api.connector.sink2.Committer.CommitRequest;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.flink.bigquery.fakes.StorageClientFaker;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException;
import org.junit.Test;

import java.io.IOException;
import java.util.Collections;

/** Tests for {@link BigQueryCommitter}. */
public class BigQueryCommitterTest {

@Test
public void testCommit_withEmptyCommitRequest() throws IOException {
BigQueryCommitter committer = createCommitter(null);
// BQ write client used in this test throws a RuntimeException if flushRows is invoked with
// flushRowsResponse set as null. Since flush should not be called, this commit should not
// throw any exception.
committer.commit(Collections.EMPTY_LIST);
}

@Test
public void testCommit() throws IOException {
BigQueryCommitter committer =
createCommitter(FlushRowsResponse.newBuilder().setOffset(10L).build());
committer.commit(
Collections.singletonList(
new TestCommitRequest(new BigQueryCommittable(1L, "foo", 10L))));
}

@Test(expected = BigQueryConnectorException.class)
public void testCommit_withOffsetMismatch() throws IOException {
BigQueryCommitter committer =
createCommitter(FlushRowsResponse.newBuilder().setOffset(5L).build());
committer.commit(
Collections.singletonList(
new TestCommitRequest(new BigQueryCommittable(1L, "foo", 10L))));
}

@Test(expected = BigQueryConnectorException.class)
public void testCommit_withFlushRowsApiFailure() throws IOException {
// BQ write client used in this test throws a RuntimeException if flushRows is invoked with
// flushRowsResponse set as null. The committer should wrap client errors in a
// BigQueryConnectorException.
BigQueryCommitter committer = createCommitter(null);
committer.commit(
Collections.singletonList(
new TestCommitRequest(new BigQueryCommittable(1L, "foo", 10L))));
}

private BigQueryCommitter createCommitter(FlushRowsResponse flushRowsResponse)
throws IOException {
return new BigQueryCommitter(
StorageClientFaker.createConnectOptionsForWrite(
new ApiFuture[] {null}, null, flushRowsResponse, null));
}

static class TestCommitRequest implements CommitRequest<BigQueryCommittable> {

private final BigQueryCommittable committable;

TestCommitRequest(BigQueryCommittable committable) {
this.committable = committable;
}

@Override
public BigQueryCommittable getCommittable() {
return committable;
}

@Override
public int getNumberOfRetries() {
return 0;
}

@Override
public void signalFailedWithKnownReason(Throwable t) {
// Do nothing.
}

@Override
public void signalFailedWithUnknownReason(Throwable t) {
// Do nothing.
}

@Override
public void retryLater() {
// Do nothing.
}

@Override
public void updateAndRetryLater(BigQueryCommittable committable) {
// Do nothing.
}

@Override
public void signalAlreadyCommitted() {
// Do nothing.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ public void testPrepareCommit_withAppends() throws IOException, InterruptedExcep
BigQueryCommittable committable = (BigQueryCommittable) committables.toArray()[0];
assertEquals(1, committable.getProducerId());
assertEquals("new_stream", committable.getStreamName());
assertEquals(3, committable.getStreamOffset());
assertEquals(2, committable.getStreamOffset());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,26 +207,38 @@ private StorageWriteClientImpl(CredentialsOptions options) throws IOException {
createWriteStreamSettings
.getRetrySettings()
.toBuilder()
.setMaxAttempts(10)
.setMaxAttempts(5)
.setInitialRpcTimeout(Duration.ofSeconds(60))
.setRpcTimeoutMultiplier(1.2)
.setInitialRetryDelay(Duration.ofSeconds(2))
.setRetryDelayMultiplier(1.2)
.build());

UnaryCallSettings.Builder<FlushRowsRequest, FlushRowsResponse> flushRowsSettings =
settingsBuilder.getStubSettingsBuilder().flushRowsSettings();
flushRowsSettings.setRetrySettings(
createWriteStreamSettings
flushRowsSettings
.getRetrySettings()
.toBuilder()
.setMaxAttempts(10)
.setMaxAttempts(5)
.setInitialRpcTimeout(Duration.ofSeconds(30))
.setRpcTimeoutMultiplier(1)
.setInitialRetryDelay(Duration.ofSeconds(1))
.setRetryDelayMultiplier(1.2)
.build());

UnaryCallSettings.Builder<FinalizeWriteStreamRequest, FinalizeWriteStreamResponse>
finalizeWriteStreamSettings =
settingsBuilder.getStubSettingsBuilder().finalizeWriteStreamSettings();
finalizeWriteStreamSettings.setRetrySettings(
createWriteStreamSettings
finalizeWriteStreamSettings
.getRetrySettings()
.toBuilder()
.setMaxAttempts(10)
.setMaxAttempts(5)
.setInitialRpcTimeout(Duration.ofSeconds(30))
.setRpcTimeoutMultiplier(1)
.setInitialRetryDelay(Duration.ofSeconds(1))
.setRetryDelayMultiplier(1.2)
.build());

this.client = BigQueryWriteClient.create(settingsBuilder.build());
Expand All @@ -249,8 +261,7 @@ public StreamWriter createStreamWriter(
.setMaxAttempts(5) // maximum number of retries
.setTotalTimeout(
Duration.ofMinutes(5)) // total duration of retry process
.setInitialRpcTimeout(
Duration.ofSeconds(30)) // delay before first retry
.setInitialRpcTimeout(Duration.ofSeconds(30)) // intial timeout
.setMaxRpcTimeout(Duration.ofMinutes(2)) // maximum RPC timeout
.setRpcTimeoutMultiplier(1.6) // change in RPC timeout
.setRetryDelayMultiplier(1.6) // change in delay before next retry
Expand Down

0 comments on commit 0f79fba

Please sign in to comment.