Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Addition of Flink Metrics in Atleast Once Implementation. #154

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class BigQueryDefaultSink extends BigQueryBaseSink {
public SinkWriter createWriter(InitContext context) {
checkParallelism(context.getNumberOfParallelSubtasks());
return new BigQueryDefaultWriter(
context.getSubtaskId(), tablePath, connectOptions, schemaProvider, serializer);
context.getSubtaskId(),
tablePath,
connectOptions,
schemaProvider,
serializer,
context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.Counter;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
Expand Down Expand Up @@ -56,15 +57,15 @@
* validated lazily.
*
* <p>Serializer's "init" method is called in the writer's constructor because the resulting {@link
* Descriptor} is not serializable and cannot be propagated to machines hosting writer instances.
* Hence, this derivation of descriptors must be performed during writer initialization.
* com.google.protobuf.Descriptors.Descriptor} is not serializable and cannot be propagated to
* machines hosting writer instances. Hence, this derivation of descriptors must be performed during
* writer initialization.
*
* @param <IN> Type of records to be written to BigQuery.
*/
abstract class BaseWriter<IN> implements SinkWriter<IN> {

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected final Logger logger = LoggerFactory.getLogger(BaseWriter.class);
// Multiply 0.95 to keep a buffer from exceeding payload limits.
private static final long MAX_APPEND_REQUEST_BYTES =
(long) (StreamWriter.getApiMaxRequestBytes() * 0.95);
Expand All @@ -90,6 +91,10 @@ abstract class BaseWriter<IN> implements SinkWriter<IN> {
// the records in a stream get committed to the table. Hence, records written to BigQuery
// table is equal to this "totalRecordsWritten" only upon checkpoint completion.
long totalRecordsWritten;
// Counters for metric reporting
Counter successfullyAppendedRecordsCounter;
Counter successfullyAppendedRecordsSinceChkptCounter;
Counter numRecordsInSinceChkptCounter;

BaseWriter(
int subtaskId,
Expand All @@ -116,6 +121,11 @@ public void flush(boolean endOfInput) {
}
logger.info("Validating all pending append responses in subtask {}", subtaskId);
validateAppendResponses(true);
// .flush() is called at checkpoint, resetting the counters after all tasks are done.
// Set to 0.
numRecordsInSinceChkptCounter.dec(numRecordsInSinceChkptCounter.getCount());
successfullyAppendedRecordsSinceChkptCounter.dec(
successfullyAppendedRecordsSinceChkptCounter.getCount());
}

/** Close resources maintained by this writer. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.flink.bigquery.sink.writer;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.util.StringUtils;

import com.google.api.core.ApiFuture;
Expand All @@ -31,6 +32,7 @@
import com.google.cloud.flink.bigquery.common.config.BigQueryConnectOptions;
import com.google.cloud.flink.bigquery.sink.TwoPhaseCommittingStatefulSink;
import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommittable;
import com.google.cloud.flink.bigquery.sink.committer.BigQueryCommitter;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQueryConnectorException;
import com.google.cloud.flink.bigquery.sink.exceptions.BigQuerySerializationException;
import com.google.cloud.flink.bigquery.sink.serializer.BigQueryProtoSerializer;
Expand Down Expand Up @@ -98,7 +100,8 @@ public BigQueryBufferedWriter(
long totalRecordsWritten,
BigQueryConnectOptions connectOptions,
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer) {
BigQueryProtoSerializer serializer,
Sink.InitContext context) {
super(subtaskId, tablePath, connectOptions, schemaProvider, serializer);
this.streamNameInState = StringUtils.isNullOrWhitespaceOnly(streamName) ? "" : streamName;
this.streamName = this.streamNameInState;
Expand All @@ -108,6 +111,16 @@ public BigQueryBufferedWriter(
this.totalRecordsWritten = totalRecordsWritten;
writeStreamCreationThrottler = new WriteStreamCreationThrottler(subtaskId);
appendRequestRowCount = 0L;
// Initialize the metric counters.
successfullyAppendedRecordsCounter =
context.metricGroup().counter("successfullyAppendedRecords");
// Update the metrics to values saved at checkpoint.
successfullyAppendedRecordsCounter.inc(totalRecordsWritten);
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter().inc(totalRecordsSeen);
// ..SinceChkpt Counters restart at 0.
numRecordsInSinceChkptCounter = context.metricGroup().counter("numRecordsInSinceChkpt");
successfullyAppendedRecordsSinceChkptCounter =
context.metricGroup().counter("successfullyAppendedRecordsSinceChkpt");
}

/**
Expand All @@ -119,6 +132,7 @@ public BigQueryBufferedWriter(
@Override
public void write(IN element, Context context) {
totalRecordsSeen++;
numRecordsInSinceChkptCounter.inc();
try {
ByteString protoRow = getProtoRow(element);
if (!fitsInAppendRequest(protoRow)) {
Expand Down Expand Up @@ -186,6 +200,8 @@ void validateAppendResponse(AppendInfo appendInfo) {
offset, expectedOffset));
}
totalRecordsWritten += recordsAppended;
successfullyAppendedRecordsCounter.inc(recordsAppended);
successfullyAppendedRecordsSinceChkptCounter.inc(recordsAppended);
} catch (ExecutionException | InterruptedException e) {
if (e.getCause().getClass() == OffsetAlreadyExists.class) {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.flink.bigquery.sink.writer;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
Expand Down Expand Up @@ -53,11 +56,20 @@ public BigQueryDefaultWriter(
String tablePath,
BigQueryConnectOptions connectOptions,
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer) {
BigQueryProtoSerializer serializer,
Sink.InitContext context) {
super(subtaskId, tablePath, connectOptions, schemaProvider, serializer);
streamName = String.format("%s/streams/_default", tablePath);
totalRecordsSeen = 0L;
totalRecordsWritten = 0L;

SinkWriterMetricGroup sinkWriterMetricGroup = context.metricGroup();
// Count of records which are successfully appended to BQ.
successfullyAppendedRecordsCounter =
sinkWriterMetricGroup.counter("successfullyAppendedRecords");
numRecordsInSinceChkptCounter = sinkWriterMetricGroup.counter("numRecordsInSinceChkpt");
successfullyAppendedRecordsSinceChkptCounter =
sinkWriterMetricGroup.counter("successfullyAppendedRecordsSinceChkpt");
}

/**
Expand All @@ -69,6 +81,7 @@ public BigQueryDefaultWriter(
@Override
public void write(IN element, Context context) {
totalRecordsSeen++;
numRecordsInSinceChkptCounter.inc();
try {
ByteString protoRow = getProtoRow(element);
if (!fitsInAppendRequest(protoRow)) {
Expand Down Expand Up @@ -105,6 +118,9 @@ void validateAppendResponse(AppendInfo appendInfo) {
logAndThrowFatalException(response.getError().getMessage());
}
totalRecordsWritten += recordsAppended;
// the request succeeded without errors (records are in BQ)
this.successfullyAppendedRecordsCounter.inc(recordsAppended);
this.successfullyAppendedRecordsSinceChkptCounter.inc(recordsAppended);
} catch (ExecutionException | InterruptedException e) {
logAndThrowFatalException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

import com.google.cloud.flink.bigquery.fakes.StorageClientFaker;
import com.google.cloud.flink.bigquery.sink.serializer.FakeBigQuerySerializer;
Expand Down Expand Up @@ -93,6 +94,8 @@ public void testCreateWriter() throws IOException {
InitContext mockedContext = Mockito.mock(InitContext.class);
Mockito.when(mockedContext.getSubtaskId()).thenReturn(1);
Mockito.when(mockedContext.getNumberOfParallelSubtasks()).thenReturn(50);
Mockito.when(mockedContext.metricGroup())
.thenReturn(UnregisteredMetricsGroup.createSinkWriterMetricGroup());
BigQuerySinkConfig sinkConfig =
BigQuerySinkConfig.newBuilder()
.connectOptions(StorageClientFaker.createConnectOptionsForWrite(null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.flink.bigquery.sink.writer;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
Expand Down Expand Up @@ -786,6 +789,9 @@ private BigQueryBufferedWriter createBufferedWriter(
long totalRecordsWritten,
BigQueryProtoSerializer mockSerializer)
throws IOException {
Sink.InitContext mockSinkContext = Mockito.mock(Sink.InitContext.class);
Mockito.when(mockSinkContext.metricGroup())
.thenReturn(UnregisteredMetricsGroup.createSinkWriterMetricGroup());
return new BigQueryBufferedWriter(
1,
streamName,
Expand All @@ -795,7 +801,8 @@ private BigQueryBufferedWriter createBufferedWriter(
totalRecordsWritten,
StorageClientFaker.createConnectOptionsForWrite(null),
TestBigQuerySchemas.getSimpleRecordSchema(),
mockSerializer);
mockSerializer,
mockSinkContext);
}

private BigQueryBufferedWriter createBufferedWriter(
Expand All @@ -808,6 +815,9 @@ private BigQueryBufferedWriter createBufferedWriter(
WriteStream writeStream,
FinalizeWriteStreamResponse finalizeResponse)
throws IOException {
Sink.InitContext mockSinkContext = Mockito.mock(Sink.InitContext.class);
Mockito.when(mockSinkContext.metricGroup())
.thenReturn(UnregisteredMetricsGroup.createSinkWriterMetricGroup());
return new BigQueryBufferedWriter(
1,
streamName,
Expand All @@ -818,7 +828,8 @@ private BigQueryBufferedWriter createBufferedWriter(
StorageClientFaker.createConnectOptionsForWrite(
appendResponseFutures, writeStream, null, finalizeResponse),
TestBigQuerySchemas.getSimpleRecordSchema(),
mockSerializer);
mockSerializer,
mockSinkContext);
}

private void checkStreamlessWriterAttributes(BigQueryBufferedWriter bufferedWriter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.google.cloud.flink.bigquery.sink.writer;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;

import com.google.api.core.ApiFutures;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
Expand Down Expand Up @@ -249,11 +252,15 @@ public void testValidateAppendResponse_withExecutionException() throws IOExcepti
private BigQueryDefaultWriter createDefaultWriter(
BigQueryProtoSerializer mockSerializer, AppendRowsResponse appendResponse)
throws IOException {
Sink.InitContext mockInitContext = Mockito.mock(Sink.InitContext.class);
Mockito.when(mockInitContext.metricGroup())
.thenReturn(UnregisteredMetricsGroup.createSinkWriterMetricGroup());
return new BigQueryDefaultWriter(
0,
"/projects/project/datasets/dataset/tables/table",
StorageClientFaker.createConnectOptionsForWrite(appendResponse),
TestBigQuerySchemas.getSimpleRecordSchema(),
mockSerializer);
mockSerializer,
mockInitContext);
}
}
Loading