From 9c2bebacceb7a5935ed3a662ced5c00f650271d7 Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Wed, 9 Oct 2024 15:18:34 -0400 Subject: [PATCH 1/2] fix: workaround BigtableIO.read() recreating the client for every split --- .../gcp/bigtable/BigtableServiceFactory.java | 96 ++++++++++++++++++- 1 file changed, 94 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java index 4c7805f655893..6b619ba9a772f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java @@ -25,9 +25,16 @@ import com.google.cloud.bigtable.data.v2.BigtableDataSettings; import java.io.IOException; import java.io.Serializable; +import java.time.Duration; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableServiceFactory.BigtableServiceEntry.CloseMode; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; @@ -51,9 +58,14 @@ class BigtableServiceFactory implements Serializable { private static final ConcurrentHashMap refCounts = new ConcurrentHashMap<>(); private static final Object lock = new Object(); + private static final SelfClosingExecutor ENTRY_CLOSER = new SelfClosingExecutor(); + private static final String BIGTABLE_ENABLE_CLIENT_SIDE_METRICS = "bigtable_enable_client_side_metrics"; + private static final String BIGTABLE_READ_CLOSE_MODE = "bigtable_read_close_mode"; + private static final String BIGTABLE_READ_CLOSE_DELAY_SECS = "bigtable_read_close_delay_secs"; + @AutoValue abstract static class ConfigId implements Serializable { @@ -66,17 +78,46 @@ static ConfigId create() { @AutoValue abstract static class BigtableServiceEntry implements Serializable, AutoCloseable { + enum CloseMode { + NONE, + DELAYED, + INLINE + } abstract ConfigId getConfigId(); abstract BigtableService getService(); + abstract CloseMode getCloseMode(); + + @Nullable + abstract Duration getCloseDelay(); + static BigtableServiceEntry create(ConfigId configId, BigtableService service) { - return new AutoValue_BigtableServiceFactory_BigtableServiceEntry(configId, service); + return new AutoValue_BigtableServiceFactory_BigtableServiceEntry( + configId, service, CloseMode.INLINE); + } + + static BigtableServiceEntry create( + ConfigId configId, BigtableService service, CloseMode closeMode, Duration closeDelay) { + return new AutoValue_BigtableServiceFactory_BigtableServiceEntry( + configId, service, closeMode, closeDelay); } @Override public void close() { + switch (getCloseMode()) { + case NONE: + return; + case INLINE: + closeImpl(); + return; + case DELAYED: + ENTRY_CLOSER.enqueue(this, getCloseDelay()); + } + } + + private void closeImpl() { synchronized (lock) { int refCount = refCounts.getOrDefault(getConfigId().id(), new AtomicInteger(0)).decrementAndGet(); @@ -132,9 +173,22 @@ BigtableServiceEntry getServiceForReading( LOG.info("Enabling client side metrics"); BigtableDataSettings.enableBuiltinMetrics(); } + CloseMode closeMode = + Optional.ofNullable( + ExperimentalOptions.getExperimentValue(pipelineOptions, BIGTABLE_READ_CLOSE_MODE)) + .map(BigtableServiceEntry.CloseMode::valueOf) + .orElse(CloseMode.DELAYED); + + Duration closeDelaySecs = + Optional.ofNullable( + ExperimentalOptions.getExperimentValue( + pipelineOptions, BIGTABLE_READ_CLOSE_DELAY_SECS)) + .map(Integer::parseInt) + .map(Duration::ofSeconds) + .orElse(Duration.ofSeconds(10)); BigtableService service = new BigtableServiceImpl(settings); - entry = BigtableServiceEntry.create(configId, service); + entry = BigtableServiceEntry.create(configId, service, closeMode, closeDelaySecs); entries.put(configId.id(), entry); refCounts.put(configId.id(), new AtomicInteger(1)); LOG.debug("getServiceForReading() created a new service entry"); @@ -236,4 +290,42 @@ private BigtableOptions getEffectiveOptions(BigtableConfig config) { } return effectiveOptions; } + + // TODO: Remove this after migrating to an SDF for BigtableIO.read() + // This is only necessary because the Source api does not provide a hook into worker teardown + // event. This workaround will extend the lifetime of each service entry just long enough so that + // the refcount does not reach 0. + /** + * Simple wrapper around ScheduledThreadPoolExecutor that auto closes itself. It's meant to have a + * similar behavior as ScheduledThreadPoolExecutor with 0 core threads, which is unfortunately + * broken in jdk < 9. + */ + static class SelfClosingExecutor { + private ScheduledExecutorService executor = null; + private int numOutstanding = 0; + + synchronized void enqueue(BigtableServiceEntry entry, Duration closeDelay) { + if (numOutstanding == 0) { + executor = Executors.newScheduledThreadPool(1); + } + numOutstanding++; + + executor.schedule( + () -> { + try { + entry.close(); + } finally { + synchronized (SelfClosingExecutor.this) { + if (--numOutstanding == 0) { + executor.shutdown(); + executor = null; + } + ; + } + } + }, + closeDelay.getSeconds(), + TimeUnit.SECONDS); + } + } } From de55c3bcb70fe2035a4a96d826f8dbb719419d2a Mon Sep 17 00:00:00 2001 From: Igor Berntein Date: Wed, 9 Oct 2024 15:44:03 -0400 Subject: [PATCH 2/2] fixes --- .../beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java index 6b619ba9a772f..19562e15a5541 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableServiceFactory.java @@ -95,7 +95,7 @@ enum CloseMode { static BigtableServiceEntry create(ConfigId configId, BigtableService service) { return new AutoValue_BigtableServiceFactory_BigtableServiceEntry( - configId, service, CloseMode.INLINE); + configId, service, CloseMode.INLINE, null); } static BigtableServiceEntry create( @@ -304,6 +304,7 @@ static class SelfClosingExecutor { private ScheduledExecutorService executor = null; private int numOutstanding = 0; + @SuppressWarnings("FutureReturnValueIgnored") synchronized void enqueue(BigtableServiceEntry entry, Duration closeDelay) { if (numOutstanding == 0) { executor = Executors.newScheduledThreadPool(1);