Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: workaround BigtableIO.read() recreating the client for every split #32724

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,16 @@
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceFactory.BigtableServiceEntry.CloseMode;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -51,9 +58,14 @@ class BigtableServiceFactory implements Serializable {
private static final ConcurrentHashMap<UUID, AtomicInteger> refCounts = new ConcurrentHashMap<>();
private static final Object lock = new Object();

private static final SelfClosingExecutor ENTRY_CLOSER = new SelfClosingExecutor();

private static final String BIGTABLE_ENABLE_CLIENT_SIDE_METRICS =
"bigtable_enable_client_side_metrics";

private static final String BIGTABLE_READ_CLOSE_MODE = "bigtable_read_close_mode";
private static final String BIGTABLE_READ_CLOSE_DELAY_SECS = "bigtable_read_close_delay_secs";

@AutoValue
abstract static class ConfigId implements Serializable {

Expand All @@ -66,17 +78,46 @@ static ConfigId create() {

@AutoValue
abstract static class BigtableServiceEntry implements Serializable, AutoCloseable {
enum CloseMode {
NONE,
DELAYED,
INLINE
}

abstract ConfigId getConfigId();

abstract BigtableService getService();

abstract CloseMode getCloseMode();

@Nullable
abstract Duration getCloseDelay();

static BigtableServiceEntry create(ConfigId configId, BigtableService service) {
return new AutoValue_BigtableServiceFactory_BigtableServiceEntry(configId, service);
return new AutoValue_BigtableServiceFactory_BigtableServiceEntry(
configId, service, CloseMode.INLINE, null);
}

static BigtableServiceEntry create(
ConfigId configId, BigtableService service, CloseMode closeMode, Duration closeDelay) {
return new AutoValue_BigtableServiceFactory_BigtableServiceEntry(
configId, service, closeMode, closeDelay);
}

@Override
public void close() {
switch (getCloseMode()) {
case NONE:
return;
case INLINE:
closeImpl();
return;
case DELAYED:
ENTRY_CLOSER.enqueue(this, getCloseDelay());
}
}

private void closeImpl() {
synchronized (lock) {
int refCount =
refCounts.getOrDefault(getConfigId().id(), new AtomicInteger(0)).decrementAndGet();
Expand Down Expand Up @@ -132,9 +173,22 @@ BigtableServiceEntry getServiceForReading(
LOG.info("Enabling client side metrics");
BigtableDataSettings.enableBuiltinMetrics();
}
CloseMode closeMode =
Optional.ofNullable(
ExperimentalOptions.getExperimentValue(pipelineOptions, BIGTABLE_READ_CLOSE_MODE))
.map(BigtableServiceEntry.CloseMode::valueOf)
.orElse(CloseMode.DELAYED);

Duration closeDelaySecs =
Optional.ofNullable(
ExperimentalOptions.getExperimentValue(
pipelineOptions, BIGTABLE_READ_CLOSE_DELAY_SECS))
.map(Integer::parseInt)
.map(Duration::ofSeconds)
.orElse(Duration.ofSeconds(10));

BigtableService service = new BigtableServiceImpl(settings);
entry = BigtableServiceEntry.create(configId, service);
entry = BigtableServiceEntry.create(configId, service, closeMode, closeDelaySecs);
entries.put(configId.id(), entry);
refCounts.put(configId.id(), new AtomicInteger(1));
LOG.debug("getServiceForReading() created a new service entry");
Expand Down Expand Up @@ -236,4 +290,43 @@ private BigtableOptions getEffectiveOptions(BigtableConfig config) {
}
return effectiveOptions;
}

// TODO: Remove this after migrating to an SDF for BigtableIO.read()
// This is only necessary because the Source api does not provide a hook into worker teardown
// event. This workaround will extend the lifetime of each service entry just long enough so that
// the refcount does not reach 0.
/**
* Simple wrapper around ScheduledThreadPoolExecutor that auto closes itself. It's meant to have a
* similar behavior as ScheduledThreadPoolExecutor with 0 core threads, which is unfortunately
* broken in jdk < 9.
*/
static class SelfClosingExecutor {
private ScheduledExecutorService executor = null;
private int numOutstanding = 0;

@SuppressWarnings("FutureReturnValueIgnored")
synchronized void enqueue(BigtableServiceEntry entry, Duration closeDelay) {
if (numOutstanding == 0) {
executor = Executors.newScheduledThreadPool(1);
}
numOutstanding++;

executor.schedule(
() -> {
try {
entry.close();
} finally {
synchronized (SelfClosingExecutor.this) {
if (--numOutstanding == 0) {
executor.shutdown();
executor = null;
}
;
}
}
},
closeDelay.getSeconds(),
TimeUnit.SECONDS);
}
}
}
Loading