Skip to content

Commit

Permalink
Fix typos
Browse files Browse the repository at this point in the history
  • Loading branch information
jayehwhyehentee committed Sep 19, 2024
1 parent 6fcfc5a commit 8254a5e
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ public BigQueryBufferedWriter(
BigQuerySchemaProvider schemaProvider,
BigQueryProtoSerializer serializer) {
super(subtaskId, tablePath, connectOptions, schemaProvider, serializer);
this.streamName = streamName;
this.streamNameInState = StringUtils.isNullOrWhitespaceOnly(streamName) ? "" : streamName;
this.streamOffset = streamOffset;
this.streamName = this.streamNameInState;
this.streamOffsetInState = streamOffset;
this.streamOffset = streamOffset;
this.totalRecordsSeen = totalRecordsSeen;
this.totalRecordsWritten = totalRecordsWritten;
writeStreamCreationThrottler = new WriteStreamCreationThrottler(subtaskId);
Expand Down Expand Up @@ -147,13 +147,15 @@ public void write(IN element, Context context) {
@Override
void sendAppendRequest(ProtoRows protoRows) {
long rowCount = protoRows.getSerializedRowsCount();
if (streamOffset == streamOffsetInState && streamName != null && !streamName.isEmpty()) {
if (streamOffset == streamOffsetInState
&& streamName.equals(streamNameInState)
&& !StringUtils.isNullOrWhitespaceOnly(streamName)) {
// Writer has an associated write stream and is invoking append for the first
// time since re-initialization.
performFirstAppendOnRestoredStream(protoRows, rowCount);
return;
}
if (streamName == null || streamName.isEmpty()) {
if (StringUtils.isNullOrWhitespaceOnly(streamName)) {
// Throttle stream creation to ensure proper usage of BigQuery createWriteStream API.
logger.info("Throttling creation of BigQuery write stream in subtask {}", subtaskId);
writeStreamCreationThrottler.throttle();
Expand All @@ -179,7 +181,7 @@ void validateAppendResponse(AppendInfo appendInfo) {
if (offset != expectedOffset) {
logAndThrowFatalException(
String.format(
"Inconsistent offset in BigQuery API response. Found %d, expected %s",
"Inconsistent offset in BigQuery API response. Found %d, expected %d",
offset, expectedOffset));
}
totalRecordsWritten += recordsAppended;
Expand Down Expand Up @@ -221,18 +223,18 @@ public List<BigQueryWriterState> snapshotState(long checkpointId) throws IOExcep

@Override
public void close() {
super.close();
if (!streamNameInState.equals(streamName) || streamOffsetInState != streamOffset) {
// Either new stream was created which will not be stored in any state, or something was
// appended to the existing stream which will not be committed. In both scenarios, the
// stream is not usable and must be finalized, i.e. "closed".
finalizeStream();
}
super.close();
}

private void performFirstAppendOnRestoredStream(ProtoRows protoRows, long rowCount) {
try {
// Connectrion pool can be enabled only for default stream.
// Connection pool (method parameter below) can be enabled only for default stream.
createStreamWriter(false);
} catch (BigQueryConnectorException e) {
// If StreamWriter could not be created for this write stream, then discard it.
Expand Down Expand Up @@ -268,7 +270,7 @@ private void discardStreamAndResendAppendRequest(Exception e, ProtoRows protoRow
private void discardStream(Exception e) {
logger.info(
String.format(
"Writer %d cannot use stream %s. Discarding this stream and creating new one.",
"Writer %d cannot use stream %s. Discarding this stream.",
subtaskId, streamName),
e);
finalizeStream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ private BigQueryBufferedWriter createBufferedWriter(

private void checkStreamlessWriterAttributes(BigQueryBufferedWriter bufferedWriter) {
assertNull(bufferedWriter.streamWriter);
assertNull(bufferedWriter.streamName);
assertEquals("", bufferedWriter.streamName);
assertEquals("", bufferedWriter.getStreamNameInState());
assertEquals(0, bufferedWriter.getStreamOffset());
assertEquals(0, bufferedWriter.getStreamOffsetInState());
Expand Down

0 comments on commit 8254a5e

Please sign in to comment.