Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jayehwhyehentee committed Sep 20, 2024
1 parent 8254a5e commit 6b38afd
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@
* <p>Note that actual separation between CreateWriteStream invocations across all writers will not
* ensure exact QPS of 3, because neither all writers are initialized at the same instant, nor do
* they all identify the need to create a write stream after some uniform fixed duration. Given
* these uncontrolled variations, this throttler aims to achieve ~3 QPS on a best effort basis.
* these uncontrollable factors, this throttler aims to achieve 3 QPS on a best effort basis.
*/
public class WriteStreamCreationThrottler implements Throttler {

// MAX_SINK_PARALLELISM is set as 128.
public static final int MAX_BUCKETS = BigQueryExactlyOnceSink.MAX_SINK_PARALLELISM / 3;
private static final Logger LOG = LoggerFactory.getLogger(WriteStreamCreationThrottler.class);
private final int writerId;
Expand All @@ -48,6 +49,7 @@ public WriteStreamCreationThrottler(int writerId) {

public void throttle() {
int waitSeconds = writerId % MAX_BUCKETS;
LOG.debug("Throttling writer {} for {} second", writerId, waitSeconds);
try {
// Sleep does nothing if input is 0 or less.
TimeUnit.SECONDS.sleep(waitSeconds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,32 +158,37 @@ void append() {
/** Creates a StreamWriter for appending to BigQuery table. */
void createStreamWriter(boolean enableConnectionPool) {
try {
logger.debug("Creating BigQuery StreamWriter in subtask {}", subtaskId);
if (writeClient == null) {
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite();
}
logger.info(
"Creating BigQuery StreamWriter for write stream {} in subtask {}",
streamName,
subtaskId);
streamWriter =
writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool);
} catch (IOException e) {
logger.error("Unable to create StreamWriter for stream {}", streamName);
throw new BigQueryConnectorException("Unable to create StreamWriter", e);
logger.error(
String.format(
"Unable to create StreamWriter for stream %s in subtask %d",
streamName, subtaskId),
e);
throw new BigQueryConnectorException("Unable to connect to BigQuery", e);
}
}

/** Creates a write stream and StreamWriter for appending to BigQuery table. */
void createWriteStreamAndStreamWriter(
WriteStream.Type streamType, boolean enableConnectionPool) {
logger.info("Creating BigQuery write stream and StreamWriter in subtask {}", subtaskId);
/** Creates a write stream for appending to BigQuery table. */
void createWriteStream(WriteStream.Type streamType) {
try {
if (writeClient == null) {
writeClient = BigQueryServicesFactory.instance(connectOptions).storageWrite();
}
logger.info("Creating BigQuery write stream in subtask {}", subtaskId);
streamName = writeClient.createWriteStream(tablePath, streamType).getName();
streamWriter =
writeClient.createStreamWriter(streamName, protoSchema, enableConnectionPool);
} catch (IOException e) {
logger.error("Unable to connect to BigQuery in subtask {}", streamName);
throw new BigQueryConnectorException("Unable to create write stream", e);
logger.error(
String.format("Unable to create write stream in subtask %d", subtaskId), e);
throw new BigQueryConnectorException("Unable to connect to BigQuery", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ void sendAppendRequest(ProtoRows protoRows) {
// Throttle stream creation to ensure proper usage of BigQuery createWriteStream API.
logger.info("Throttling creation of BigQuery write stream in subtask {}", subtaskId);
writeStreamCreationThrottler.throttle();
createWriteStreamAndStreamWriter(WriteStream.Type.BUFFERED, false);
createWriteStream(WriteStream.Type.BUFFERED);
createStreamWriter(false);
}
ApiFuture<AppendRowsResponse> future = streamWriter.append(protoRows, streamOffset);
postAppendOps(future, rowCount);
Expand Down Expand Up @@ -274,7 +275,7 @@ private void discardStream(Exception e) {
subtaskId, streamName),
e);
finalizeStream();
// Empty streamName will prompt following sendAppendRequest method to create a new write
// Empty streamName will prompt following sendAppendRequest invocation to create anew write
// stream.
streamName = "";
// Also discard the offset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,30 @@ public class WriteStreamCreationThrottlerTest {

@Test
public void testThrottle() {
WriteStreamCreationThrottler throttler = new WriteStreamCreationThrottler(3);
Duration duration = invokeThrottle(3);
assertTrue(duration.toMillis() >= 3000L);
}

@Test
public void testThrottle_withInterruptedException() {
// Force interruption
Thread.currentThread().interrupt();
Duration duration = invokeThrottle(3);
assertTrue(duration.toMillis() < 3000L);
}

@Test
public void testThrottle_withInvalidWriterId_expectNoThrottling() {
Duration duration = invokeThrottle(-1);
long waitSeconds = duration.toMillis() / 1000;
assertTrue(waitSeconds == 0);
}

private Duration invokeThrottle(int writerId) {
WriteStreamCreationThrottler throttler = new WriteStreamCreationThrottler(writerId);
Instant start = Instant.now();
throttler.throttle();
Instant end = Instant.now();
Duration duration = Duration.between(start, end);
assertTrue(duration.toMillis() >= 3000L);
return Duration.between(start, end);
}
}

0 comments on commit 6b38afd

Please sign in to comment.