Skip to content

Commit

Permalink
[CDAP-21131] Delay in spanner table creation should not crash statele…
Browse files Browse the repository at this point in the history
…ss pods
  • Loading branch information
sidhdirenge committed Feb 5, 2025
1 parent c3bc2f2 commit 1939795
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 2 deletions.
17 changes: 17 additions & 0 deletions cdap-common/src/main/resources/cdap-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,23 @@
</description>
</property>

<property>
<name>data.storage.properties.gcp-spanner.tx.runner.max.retries</name>
<value>12</value>
<description>
The max number of retries for the spanner transaction runner on retryable
failures.
</description>
</property>

<property>
<name>data.storage.properties.gcp-spanner.tx.runner.initial.delay.ms</name>
<value>500</value>
<description>
The initial delay between retries due to spanner transaction failures in milliseconds.
</description>
</property>

<property>
<name>data.tx.enabled</name>
<value>true</value>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright © 2025 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.storage.spanner;

import com.google.api.client.util.ExponentialBackOff;
import com.google.common.base.Throwables;
import io.cdap.cdap.spi.data.TableNotFoundException;
import io.cdap.cdap.spi.data.transaction.TransactionException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TxRunnable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Retries Cloud Spanner operations in case they fail due to retryable errors.
*/
public class RetryingSpannerTransactionRunner implements TransactionRunner {

private static final Logger LOG = LoggerFactory.getLogger(RetryingSpannerTransactionRunner.class);
static final String MAX_RETRIES = "tx.runner.max.retries";
static final String INITIAL_DELAY_MILLIS = "tx.runner.initial.delay.ms";
private final int maxRetries;
private final int initialDelayMs;
private final SpannerTransactionRunner transactionRunner;

RetryingSpannerTransactionRunner(Map<String, String> conf, SpannerStructuredTableAdmin admin) {
this.maxRetries = Integer.parseInt(conf.get(MAX_RETRIES));
this.initialDelayMs = Integer.parseInt(conf.get(INITIAL_DELAY_MILLIS));
this.transactionRunner = new SpannerTransactionRunner(admin);
}

@Override
public void run(TxRunnable runnable) throws TransactionException {
ExponentialBackOff backOff = new ExponentialBackOff.Builder().setInitialIntervalMillis(
initialDelayMs).build();
int counter = 0;
while (counter < maxRetries) {
counter++;
try {
transactionRunner.run(runnable);
break;
} catch (TransactionException e) {
if (isRetryable(e)) {
LOG.debug("Transaction failed with retryable exception", e);
try {
Thread.sleep(backOff.nextBackOffMillis());
} catch (InterruptedException | IOException ex) {
// Reinstate the interrupt
Thread.currentThread().interrupt();
// Fail with the original exception
throw e;
}
}
}
}
}

private boolean isRetryable(TransactionException e) {
List<Throwable> causes = Throwables.getCausalChain(e);
for (Throwable cause : causes) {
if (cause instanceof TableNotFoundException) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class SpannerStorageProvider implements StorageProvider {

private Spanner spanner;
private SpannerStructuredTableAdmin admin;
private SpannerTransactionRunner txRunner;
private RetryingSpannerTransactionRunner txRunner;

@Override
public void initialize(StorageProviderContext context) throws Exception {
Expand Down Expand Up @@ -90,7 +90,7 @@ public void initialize(StorageProviderContext context) throws Exception {

this.spanner = options.getService();
this.admin = new SpannerStructuredTableAdmin(spanner, databaseId);
this.txRunner = new SpannerTransactionRunner(admin);
this.txRunner = new RetryingSpannerTransactionRunner(conf, admin);
}

@Override
Expand Down

0 comments on commit 1939795

Please sign in to comment.