diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java index 0601f4a8c39..95ef99448f5 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java @@ -33,7 +33,9 @@ import java.util.Map.Entry; import java.util.Set; -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.zookeeper.FateLock; import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath; import org.apache.accumulo.core.fate.zookeeper.ZooReader; @@ -210,14 +212,14 @@ public Map> getDanglingWaitingLocks() { /** * Returns a list of the FATE transactions, optionally filtered by transaction id and status. This * method does not process lock information, if lock information is desired, use - * {@link #getStatus(ReadOnlyTStore, ZooReader, ServiceLockPath, Set, EnumSet)} + * {@link #getStatus(ReadOnlyFateStore, ZooReader, ServiceLockPath, Set, EnumSet)} * * @param zs read-only zoostore * @param filterTxid filter results to include for provided transaction ids. * @param filterStatus filter results to include only provided status types * @return list of FATE transactions that match filter criteria */ - public List getTransactionStatus(ReadOnlyTStore zs, Set filterTxid, + public List getTransactionStatus(ReadOnlyFateStore zs, Set filterTxid, EnumSet filterStatus) { FateStatus status = getTransactionStatus(zs, filterTxid, filterStatus, @@ -239,7 +241,7 @@ public List getTransactionStatus(ReadOnlyTStore zs, Set zs, ZooReader zk, + public FateStatus getStatus(ReadOnlyFateStore zs, ZooReader zk, ServiceLock.ServiceLockPath lockPath, Set filterTxid, EnumSet filterStatus) throws KeeperException, InterruptedException { Map> heldLocks = new HashMap<>(); @@ -332,7 +334,7 @@ private void findLocks(ZooReader zk, final ServiceLock.ServiceLockPath lockPath, * @param waitingLocks populated list of locks held by transaction - or an empty map if none. * @return current fate and lock status */ - private FateStatus getTransactionStatus(ReadOnlyTStore zs, Set filterTxid, + private FateStatus getTransactionStatus(ReadOnlyFateStore zs, Set filterTxid, EnumSet filterStatus, Map> heldLocks, Map> waitingLocks) { @@ -341,9 +343,9 @@ private FateStatus getTransactionStatus(ReadOnlyTStore zs, Set filterTx for (Long tid : transactions) { - zs.reserve(tid); + ReadOnlyFateTxStore txStore = zs.read(tid); - String txName = (String) zs.getTransactionInfo(tid, Fate.TxInfo.TX_NAME); + String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME); List hlocks = heldLocks.remove(tid); @@ -358,16 +360,14 @@ private FateStatus getTransactionStatus(ReadOnlyTStore zs, Set filterTx } String top = null; - ReadOnlyRepo repo = zs.top(tid); + ReadOnlyRepo repo = txStore.top(); if (repo != null) { top = repo.getName(); } - TStatus status = zs.getStatus(tid); + TStatus status = txStore.getStatus(); - long timeCreated = zs.timeCreated(tid); - - zs.unreserve(tid, 0); + long timeCreated = txStore.timeCreated(); if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) { statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated)); @@ -386,14 +386,14 @@ private boolean includeByTxid(Long tid, Set filterTxid) { return (filterTxid == null) || filterTxid.isEmpty() || filterTxid.contains(tid); } - public void printAll(ReadOnlyTStore zs, ZooReader zk, + public void printAll(ReadOnlyFateStore zs, ZooReader zk, ServiceLock.ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException { print(zs, zk, tableLocksPath, new Formatter(System.out), null, null); } - public void print(ReadOnlyTStore zs, ZooReader zk, ServiceLock.ServiceLockPath tableLocksPath, - Formatter fmt, Set filterTxid, EnumSet filterStatus) - throws KeeperException, InterruptedException { + public void print(ReadOnlyFateStore zs, ZooReader zk, + ServiceLock.ServiceLockPath tableLocksPath, Formatter fmt, Set filterTxid, + EnumSet filterStatus) throws KeeperException, InterruptedException { FateStatus fateStatus = getStatus(zs, zk, tableLocksPath, filterTxid, filterStatus); for (TransactionStatus txStatus : fateStatus.getTransactions()) { @@ -417,7 +417,7 @@ public void print(ReadOnlyTStore zs, ZooReader zk, ServiceLock.ServiceLockPat } } - public boolean prepDelete(TStore zs, ZooReaderWriter zk, ServiceLockPath path, + public boolean prepDelete(FateStore zs, ZooReaderWriter zk, ServiceLockPath path, String txidStr) { if (!checkGlobalLock(zk, path)) { return false; @@ -431,30 +431,32 @@ public boolean prepDelete(TStore zs, ZooReaderWriter zk, ServiceLockPath path return false; } boolean state = false; - zs.reserve(txid); - TStatus ts = zs.getStatus(txid); - switch (ts) { - case UNKNOWN: - System.out.printf("Invalid transaction ID: %016x%n", txid); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - case FAILED: - case FAILED_IN_PROGRESS: - case SUCCESSFUL: - System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts); - zs.delete(txid); - state = true; - break; + FateTxStore txStore = zs.reserve(txid); + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.printf("Invalid transaction ID: %016x%n", txid); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + case FAILED: + case FAILED_IN_PROGRESS: + case SUCCESSFUL: + System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts); + txStore.delete(); + state = true; + break; + } + } finally { + txStore.unreserve(0); } - - zs.unreserve(txid, 0); return state; } - public boolean prepFail(TStore zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath, + public boolean prepFail(FateStore zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath, String txidStr) { if (!checkGlobalLock(zk, zLockManagerPath)) { return false; @@ -468,33 +470,36 @@ public boolean prepFail(TStore zs, ZooReaderWriter zk, ServiceLockPath zLockM return false; } boolean state = false; - zs.reserve(txid); - TStatus ts = zs.getStatus(txid); - switch (ts) { - case UNKNOWN: - System.out.printf("Invalid transaction ID: %016x%n", txid); - break; - - case SUBMITTED: - case IN_PROGRESS: - case NEW: - System.out.printf("Failing transaction: %016x (%s)%n", txid, ts); - zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS); - state = true; - break; - - case SUCCESSFUL: - System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts); - break; - - case FAILED: - case FAILED_IN_PROGRESS: - System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts); - state = true; - break; + FateTxStore txStore = zs.reserve(txid); + try { + TStatus ts = txStore.getStatus(); + switch (ts) { + case UNKNOWN: + System.out.printf("Invalid transaction ID: %016x%n", txid); + break; + + case SUBMITTED: + case IN_PROGRESS: + case NEW: + System.out.printf("Failing transaction: %016x (%s)%n", txid, ts); + txStore.setStatus(TStatus.FAILED_IN_PROGRESS); + state = true; + break; + + case SUCCESSFUL: + System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts); + break; + + case FAILED: + case FAILED_IN_PROGRESS: + System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts); + state = true; + break; + } + } finally { + txStore.unreserve(0); } - zs.unreserve(txid, 0); return state; } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java index 5ed59f21fe5..c8be589aeff 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java @@ -18,13 +18,12 @@ */ package org.apache.accumulo.core.fate; -import java.io.Serializable; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +34,7 @@ * * No external time source is used. It starts tracking idle time when its created. */ -public class AgeOffStore implements TStore { +public class AgeOffStore implements FateStore { public interface TimeSource { long currentTimeMillis(); @@ -43,7 +42,7 @@ public interface TimeSource { private static final Logger log = LoggerFactory.getLogger(AgeOffStore.class); - private final ZooStore store; + private final FateStore store; private Map candidates; private long ageOffTime; private long minTime; @@ -93,13 +92,13 @@ public void ageOff() { for (Long txid : oldTxs) { try { - store.reserve(txid); + FateTxStore txStore = store.reserve(txid); try { - switch (store.getStatus(txid)) { + switch (txStore.getStatus()) { case NEW: case FAILED: case SUCCESSFUL: - store.delete(txid); + txStore.delete(); log.debug("Aged off FATE tx {}", FateTxId.formatTid(txid)); break; default: @@ -107,7 +106,7 @@ public void ageOff() { } } finally { - store.unreserve(txid, 0); + txStore.unreserve(0); } } catch (Exception e) { log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e); @@ -115,7 +114,7 @@ public void ageOff() { } } - public AgeOffStore(ZooStore store, long ageOffTime, TimeSource timeSource) { + public AgeOffStore(FateStore store, long ageOffTime, TimeSource timeSource) { this.store = store; this.ageOffTime = ageOffTime; this.timeSource = timeSource; @@ -125,9 +124,9 @@ public AgeOffStore(ZooStore store, long ageOffTime, TimeSource timeSource) { List txids = store.list(); for (Long txid : txids) { - store.reserve(txid); + FateTxStore txStore = store.reserve(txid); try { - switch (store.getStatus(txid)) { + switch (txStore.getStatus()) { case NEW: case FAILED: case SUCCESSFUL: @@ -137,7 +136,7 @@ public AgeOffStore(ZooStore store, long ageOffTime, TimeSource timeSource) { break; } } finally { - store.unreserve(txid, 0); + txStore.unreserve(0); } } } @@ -150,98 +149,59 @@ public long create() { } @Override - public long reserve() { - return store.reserve(); + public FateTxStore reserve() { + return new AgeOffFateTxStore(store.reserve()); } @Override - public void reserve(long tid) { - store.reserve(tid); + public FateTxStore reserve(long tid) { + return new AgeOffFateTxStore(store.reserve(tid)); } @Override - public boolean tryReserve(long tid) { - return store.tryReserve(tid); + public Optional> tryReserve(long tid) { + return store.tryReserve(tid).map(AgeOffFateTxStore::new); } - @Override - public void unreserve(long tid, long deferTime) { - store.unreserve(tid, deferTime); - } - - @Override - public Repo top(long tid) { - return store.top(tid); - } - - @Override - public void push(long tid, Repo repo) throws StackOverflowException { - store.push(tid, repo); - } + private class AgeOffFateTxStore extends WrappedFateTxStore { - @Override - public void pop(long tid) { - store.pop(tid); - } - - @Override - public org.apache.accumulo.core.fate.TStore.TStatus getStatus(long tid) { - return store.getStatus(tid); - } - - @Override - public void setStatus(long tid, org.apache.accumulo.core.fate.TStore.TStatus status) { - store.setStatus(tid, status); - - switch (status) { - case SUBMITTED: - case IN_PROGRESS: - case FAILED_IN_PROGRESS: - removeCandidate(tid); - break; - case FAILED: - case SUCCESSFUL: - addCandidate(tid); - break; - default: - break; + private AgeOffFateTxStore(FateTxStore wrapped) { + super(wrapped); } - } - @Override - public org.apache.accumulo.core.fate.TStore.TStatus waitForStatusChange(long tid, - EnumSet expected) { - return store.waitForStatusChange(tid, expected); - } - - @Override - public void setTransactionInfo(long tid, Fate.TxInfo txInfo, Serializable val) { - store.setTransactionInfo(tid, txInfo, val); - } + @Override + public void setStatus(FateStore.TStatus status) { + super.setStatus(status); + + switch (status) { + case SUBMITTED: + case IN_PROGRESS: + case FAILED_IN_PROGRESS: + removeCandidate(getID()); + break; + case FAILED: + case SUCCESSFUL: + addCandidate(getID()); + break; + default: + break; + } + } - @Override - public Serializable getTransactionInfo(long tid, Fate.TxInfo txInfo) { - return store.getTransactionInfo(tid, txInfo); + @Override + public void delete() { + super.delete(); + removeCandidate(getID()); + } } @Override - public void delete(long tid) { - store.delete(tid); - removeCandidate(tid); + public ReadOnlyFateTxStore read(long tid) { + return store.read(tid); } @Override public List list() { return store.list(); } - - @Override - public long timeCreated(long tid) { - return store.timeCreated(tid); - } - - @Override - public List> getStack(long tid) { - return store.getStack(tid); - } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 1f0ae0d7583..a7ad8ce2437 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -21,16 +21,17 @@ import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.IN_PROGRESS; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.NEW; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUBMITTED; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUCCESSFUL; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.UNKNOWN; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN; import static org.apache.accumulo.core.util.ShutdownUtil.isIOException; import java.util.EnumSet; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -41,7 +42,8 @@ import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.logging.FateLogger; import org.apache.accumulo.core.util.ShutdownUtil; import org.apache.accumulo.core.util.UtilWaitThread; @@ -58,7 +60,7 @@ public class Fate { private static final Logger log = LoggerFactory.getLogger(Fate.class); private final Logger runnerLog = LoggerFactory.getLogger(TransactionRunner.class); - private final TStore store; + private final FateStore store; private final T environment; private final ScheduledThreadPoolExecutor fatePoolWatcher; private final ExecutorService executor; @@ -77,17 +79,17 @@ private class TransactionRunner implements Runnable { public void run() { while (keepRunning.get()) { long deferTime = 0; - Long tid = null; + FateTxStore txStore = null; try { - tid = store.reserve(); - TStatus status = store.getStatus(tid); - Repo op = store.top(tid); + txStore = store.reserve(); + TStatus status = txStore.getStatus(); + Repo op = txStore.top(); if (status == FAILED_IN_PROGRESS) { - processFailed(tid, op); + processFailed(txStore, op); } else { Repo prevOp = null; try { - deferTime = op.isReady(tid, environment); + deferTime = op.isReady(txStore.getID(), environment); // Here, deferTime is only used to determine success (zero) or failure (non-zero), // proceeding on success and returning to the while loop on failure. @@ -95,16 +97,16 @@ public void run() { if (deferTime == 0) { prevOp = op; if (status == SUBMITTED) { - store.setStatus(tid, IN_PROGRESS); + txStore.setStatus(IN_PROGRESS); } - op = op.call(tid, environment); + op = op.call(txStore.getID(), environment); } else { continue; } } catch (Exception e) { - blockIfHadoopShutdown(tid, e); - transitionToFailed(tid, e); + blockIfHadoopShutdown(txStore.getID(), e); + transitionToFailed(txStore, e); continue; } @@ -112,18 +114,18 @@ public void run() { // transaction is finished String ret = prevOp.getReturn(); if (ret != null) { - store.setTransactionInfo(tid, TxInfo.RETURN_VALUE, ret); + txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret); } - store.setStatus(tid, SUCCESSFUL); - doCleanUp(tid); + txStore.setStatus(SUCCESSFUL); + doCleanUp(txStore); } else { try { - store.push(tid, op); + txStore.push(op); } catch (StackOverflowException e) { // the op that failed to push onto the stack was never executed, so no need to undo // it // just transition to failed and undo the ops that executed - transitionToFailed(tid, e); + transitionToFailed(txStore, e); continue; } } @@ -131,8 +133,8 @@ public void run() { } catch (Exception e) { runnerLog.error("Uncaught exception in FATE runner thread.", e); } finally { - if (tid != null) { - store.unreserve(tid, deferTime); + if (txStore != null) { + txStore.unreserve(deferTime); } } } @@ -166,8 +168,8 @@ private void blockIfHadoopShutdown(long tid, Exception e) { } } - private void transitionToFailed(long tid, Exception e) { - String tidStr = FateTxId.formatTid(tid); + private void transitionToFailed(FateTxStore txStore, Exception e) { + String tidStr = FateTxId.formatTid(txStore.getID()); final String msg = "Failed to execute Repo " + tidStr; // Certain FATE ops that throw exceptions don't need to be propagated up to the Monitor // as a warning. They're a normal, handled failure condition. @@ -178,32 +180,32 @@ private void transitionToFailed(long tid, Exception e) { } else { log.warn(msg, e); } - store.setTransactionInfo(tid, TxInfo.EXCEPTION, e); - store.setStatus(tid, FAILED_IN_PROGRESS); + txStore.setTransactionInfo(TxInfo.EXCEPTION, e); + txStore.setStatus(FAILED_IN_PROGRESS); log.info("Updated status for Repo with {} to FAILED_IN_PROGRESS", tidStr); } - private void processFailed(long tid, Repo op) { + private void processFailed(FateTxStore txStore, Repo op) { while (op != null) { - undo(tid, op); + undo(txStore.getID(), op); - store.pop(tid); - op = store.top(tid); + txStore.pop(); + op = txStore.top(); } - store.setStatus(tid, FAILED); - doCleanUp(tid); + txStore.setStatus(FAILED); + doCleanUp(txStore); } - private void doCleanUp(long tid) { - Boolean autoClean = (Boolean) store.getTransactionInfo(tid, TxInfo.AUTO_CLEAN); + private void doCleanUp(FateTxStore txStore) { + Boolean autoClean = (Boolean) txStore.getTransactionInfo(TxInfo.AUTO_CLEAN); if (autoClean != null && autoClean) { - store.delete(tid); + txStore.delete(); } else { // no longer need persisted operations, so delete them to save space in case // TX is never cleaned up... - while (store.top(tid) != null) { - store.pop(tid); + while (txStore.top() != null) { + txStore.pop(); } } } @@ -223,7 +225,7 @@ private void undo(long tid, Repo op) { * * @param toLogStrFunc A function that converts Repo to Strings that are suitable for logging */ - public Fate(T environment, TStore store, Function,String> toLogStrFunc, + public Fate(T environment, FateStore store, Function,String> toLogStrFunc, AccumuloConfiguration conf) { this.store = FateLogger.wrap(store, toLogStrFunc); this.environment = environment; @@ -266,13 +268,13 @@ public long startTransaction() { // multiple times for a transaction... but it will only seed once public void seedTransaction(String txName, long tid, Repo repo, boolean autoCleanUp, String goalMessage) { - store.reserve(tid); + FateTxStore txStore = store.reserve(tid); try { - if (store.getStatus(tid) == NEW) { - if (store.top(tid) == null) { + if (txStore.getStatus() == NEW) { + if (txStore.top() == null) { try { log.info("Seeding {} {}", FateTxId.formatTid(tid), goalMessage); - store.push(tid, repo); + txStore.push(repo); } catch (StackOverflowException e) { // this should not happen throw new IllegalStateException(e); @@ -280,22 +282,22 @@ public void seedTransaction(String txName, long tid, Repo repo, boolean autoC } if (autoCleanUp) { - store.setTransactionInfo(tid, TxInfo.AUTO_CLEAN, autoCleanUp); + txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp); } - store.setTransactionInfo(tid, TxInfo.TX_NAME, txName); + txStore.setTransactionInfo(TxInfo.TX_NAME, txName); - store.setStatus(tid, SUBMITTED); + txStore.setStatus(SUBMITTED); } } finally { - store.unreserve(tid, 0); + txStore.unreserve(0); } } // check on the transaction public TStatus waitForCompletion(long tid) { - return store.waitForStatusChange(tid, FINISHED_STATES); + return store.read(tid).waitForStatusChange(FINISHED_STATES); } /** @@ -308,14 +310,16 @@ public TStatus waitForCompletion(long tid) { public boolean cancel(long tid) { String tidStr = FateTxId.formatTid(tid); for (int retries = 0; retries < 5; retries++) { - if (store.tryReserve(tid)) { + Optional> optionalTxStore = store.tryReserve(tid); + if (optionalTxStore.isPresent()) { + var txStore = optionalTxStore.orElseThrow(); try { - TStatus status = store.getStatus(tid); + TStatus status = txStore.getStatus(); log.info("status is: {}", status); if (status == NEW || status == SUBMITTED) { - store.setTransactionInfo(tid, TxInfo.EXCEPTION, new TApplicationException( + txStore.setTransactionInfo(TxInfo.EXCEPTION, new TApplicationException( TApplicationException.INTERNAL_ERROR, "Fate transaction cancelled by user")); - store.setStatus(tid, FAILED_IN_PROGRESS); + txStore.setStatus(FAILED_IN_PROGRESS); log.info("Updated status for {} to FAILED_IN_PROGRESS because it was cancelled by user", tidStr); return true; @@ -324,7 +328,7 @@ public boolean cancel(long tid) { return false; } } finally { - store.unreserve(tid, 0); + txStore.unreserve(0); } } else { // reserved, lets retry. @@ -337,14 +341,14 @@ public boolean cancel(long tid) { // resource cleanup public void delete(long tid) { - store.reserve(tid); + FateTxStore txStore = store.reserve(tid); try { - switch (store.getStatus(tid)) { + switch (txStore.getStatus()) { case NEW: case SUBMITTED: case FAILED: case SUCCESSFUL: - store.delete(tid); + txStore.delete(); break; case FAILED_IN_PROGRESS: case IN_PROGRESS: @@ -355,34 +359,34 @@ public void delete(long tid) { break; } } finally { - store.unreserve(tid, 0); + txStore.unreserve(0); } } public String getReturn(long tid) { - store.reserve(tid); + FateTxStore txStore = store.reserve(tid); try { - if (store.getStatus(tid) != SUCCESSFUL) { + if (txStore.getStatus() != SUCCESSFUL) { throw new IllegalStateException("Tried to get exception when transaction " + FateTxId.formatTid(tid) + " not in successful state"); } - return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE); + return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE); } finally { - store.unreserve(tid, 0); + txStore.unreserve(0); } } // get reportable failures public Exception getException(long tid) { - store.reserve(tid); + FateTxStore txStore = store.reserve(tid); try { - if (store.getStatus(tid) != FAILED) { + if (txStore.getStatus() != FAILED) { throw new IllegalStateException("Tried to get exception when transaction " + FateTxId.formatTid(tid) + " not in failed state"); } - return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION); + return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION); } finally { - store.unreserve(tid, 0); + txStore.unreserve(0); } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java new file mode 100644 index 00000000000..834a2fa6e5b --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.io.Serializable; +import java.util.Optional; + +/** + * Transaction Store: a place to save transactions + * + * A transaction consists of a number of operations. To use, first create a transaction id, and then + * seed the transaction with an initial operation. An executor service can then execute the + * transaction's operation, possibly pushing more operations onto the transaction as each step + * successfully completes. If a step fails, the stack can be unwound, undoing each operation. + */ +public interface FateStore extends ReadOnlyFateStore { + + /** + * Create a new transaction id + * + * @return a transaction id + */ + long create(); + + interface FateTxStore extends ReadOnlyFateTxStore { + @Override + Repo top(); + + /** + * Update the given transaction with the next operation + * + * @param repo the operation + */ + void push(Repo repo) throws StackOverflowException; + + /** + * Remove the last pushed operation from the given transaction. + */ + void pop(); + + /** + * Update the state of a given transaction + * + * @param status execution status + */ + void setStatus(TStatus status); + + /** + * Set transaction-specific information. + * + * @param txInfo name of attribute of a transaction to set. + * @param val transaction data to store + */ + void setTransactionInfo(Fate.TxInfo txInfo, Serializable val); + + /** + * Remove the transaction from the store. + * + */ + void delete(); + + /** + * Return the given transaction to the store. + * + * upon successful return the store now controls the referenced transaction id. caller should no + * longer interact with it. + * + * @param deferTime time in millis to keep this transaction out of the pool used in the + * {@link #reserve() reserve} method. must be non-negative. + */ + void unreserve(long deferTime); + } + + /** + * Attempt to reserve transaction + * + * @param tid transaction id + * @return true if reserved by this call, false if already reserved + */ + Optional> tryReserve(long tid); + + /** + * Reserve the specific tid. + * + * Reserving a transaction id ensures that nothing else in-process interacting via the same + * instance will be operating on that transaction id. + * + */ + FateTxStore reserve(long tid); + + /** + * Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS. + * + * Reserving a transaction id ensures that nothing else in-process interacting via the same + * instance will be operating on that transaction id. + * + * @return a transaction id that is safe to interact with, chosen by the store. + */ + FateTxStore reserve(); + +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java new file mode 100644 index 00000000000..4e06ab0f9e4 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.io.Serializable; +import java.util.EnumSet; +import java.util.List; + +/** + * Read only access to a Transaction Store. + * + * A transaction consists of a number of operations. Instances of this class may check on the queue + * of outstanding transactions but may neither modify them nor create new ones. + */ +public interface ReadOnlyFateStore { + + /** + * Possible operational status codes. Serialized by name within stores. + */ + enum TStatus { + /** Unseeded transaction */ + NEW, + /** Transaction that is executing */ + IN_PROGRESS, + /** Transaction has failed, and is in the process of being rolled back */ + FAILED_IN_PROGRESS, + /** Transaction has failed and has been fully rolled back */ + FAILED, + /** Transaction has succeeded */ + SUCCESSFUL, + /** Unrecognized or unknown transaction state */ + UNKNOWN, + /** Transaction that is eligible to be executed */ + SUBMITTED + } + + /** + * Reads the data related to fate transaction without reserving it. + */ + ReadOnlyFateTxStore read(long tid); + + /** + * Storage for an individual fate transaction + */ + interface ReadOnlyFateTxStore { + + /** + * Get the current operation for the given transaction id. + * + * Caller must have already reserved tid. + * + * @return a read-only view of the operation + */ + ReadOnlyRepo top(); + + /** + * Get all operations on a transactions stack. Element 0 contains the most recent operation + * pushed or the top. + */ + List> getStack(); + + /** + * Get the state of a given transaction. + * + * Caller must have already reserved tid. + * + * @return execution status + */ + TStatus getStatus(); + + /** + * Wait for the status of a transaction to change + * + * @param expected a set of possible statuses we are interested in being notified about. may not + * be null. + * @return execution status. + */ + TStatus waitForStatusChange(EnumSet expected); + + /** + * Retrieve transaction-specific information. + * + * Caller must have already reserved tid. + * + * @param txInfo name of attribute of a transaction to retrieve. + */ + Serializable getTransactionInfo(Fate.TxInfo txInfo); + + /** + * Retrieve the creation time of a FaTE transaction. + * + * @return creation time of transaction. + */ + long timeCreated(); + + /** + * @return the id of the FATE transaction + */ + long getID(); + } + + /** + * list all transaction ids in store. + * + * @return all outstanding transactions, including those reserved by others. + */ + List list(); +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java deleted file mode 100644 index e4f55e4b16b..00000000000 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate; - -import java.io.Serializable; -import java.util.EnumSet; -import java.util.List; - -/** - * Read only access to a Transaction Store. - * - * A transaction consists of a number of operations. Instances of this class may check on the queue - * of outstanding transactions but may neither modify them nor create new ones. - */ -public interface ReadOnlyTStore { - - /** - * Possible operational status codes. Serialized by name within stores. - */ - enum TStatus { - /** Unseeded transaction */ - NEW, - /** Transaction that is executing */ - IN_PROGRESS, - /** Transaction has failed, and is in the process of being rolled back */ - FAILED_IN_PROGRESS, - /** Transaction has failed and has been fully rolled back */ - FAILED, - /** Transaction has succeeded */ - SUCCESSFUL, - /** Unrecognized or unknown transaction state */ - UNKNOWN, - /** Transaction that is eligible to be executed */ - SUBMITTED - } - - /** - * Reserve a transaction that is IN_PROGRESS or FAILED_IN_PROGRESS. - * - * Reserving a transaction id ensures that nothing else in-process interacting via the same - * instance will be operating on that transaction id. - * - * @return a transaction id that is safe to interact with, chosen by the store. - */ - long reserve(); - - /** - * Reserve the specific tid. - * - * Reserving a transaction id ensures that nothing else in-process interacting via the same - * instance will be operating on that transaction id. - * - */ - void reserve(long tid); - - /** - * Return the given transaction to the store. - * - * upon successful return the store now controls the referenced transaction id. caller should no - * longer interact with it. - * - * @param tid transaction id, previously reserved. - * @param deferTime time in millis to keep this transaction out of the pool used in the - * {@link #reserve() reserve} method. must be non-negative. - */ - void unreserve(long tid, long deferTime); - - /** - * Get the current operation for the given transaction id. - * - * Caller must have already reserved tid. - * - * @param tid transaction id, previously reserved. - * @return a read-only view of the operation - */ - ReadOnlyRepo top(long tid); - - /** - * Get all operations on a transactions stack. Element 0 contains the most recent operation pushed - * or the top. - */ - List> getStack(long tid); - - /** - * Get the state of a given transaction. - * - * Caller must have already reserved tid. - * - * @param tid transaction id, previously reserved. - * @return execution status - */ - TStatus getStatus(long tid); - - /** - * Wait for the status of a transaction to change - * - * @param tid transaction id, need not have been reserved. - * @param expected a set of possible statuses we are interested in being notified about. may not - * be null. - * @return execution status. - */ - TStatus waitForStatusChange(long tid, EnumSet expected); - - /** - * Retrieve transaction-specific information. - * - * Caller must have already reserved tid. - * - * @param tid transaction id, previously reserved. - * @param txInfo name of attribute of a transaction to retrieve. - */ - Serializable getTransactionInfo(long tid, Fate.TxInfo txInfo); - - /** - * list all transaction ids in store. - * - * @return all outstanding transactions, including those reserved by others. - */ - List list(); - - /** - * Retrieve the creation time of a FaTE transaction. - * - * @param tid Transaction id, previously reserved. - * @return creation time of transaction. - */ - long timeCreated(long tid); -} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/TStore.java b/core/src/main/java/org/apache/accumulo/core/fate/TStore.java deleted file mode 100644 index 8958628b7db..00000000000 --- a/core/src/main/java/org/apache/accumulo/core/fate/TStore.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.core.fate; - -import java.io.Serializable; - -/** - * Transaction Store: a place to save transactions - * - * A transaction consists of a number of operations. To use, first create a transaction id, and then - * seed the transaction with an initial operation. An executor service can then execute the - * transaction's operation, possibly pushing more operations onto the transaction as each step - * successfully completes. If a step fails, the stack can be unwound, undoing each operation. - */ -public interface TStore extends ReadOnlyTStore { - - /** - * Create a new transaction id - * - * @return a transaction id - */ - long create(); - - @Override - Repo top(long tid); - - /** - * Update the given transaction with the next operation - * - * @param tid the transaction id - * @param repo the operation - */ - void push(long tid, Repo repo) throws StackOverflowException; - - /** - * Remove the last pushed operation from the given transaction. - */ - void pop(long tid); - - /** - * Update the state of a given transaction - * - * @param tid transaction id - * @param status execution status - */ - void setStatus(long tid, TStatus status); - - /** - * Set transaction-specific information. - * - * @param tid transaction id - * @param txInfo name of attribute of a transaction to set. - * @param val transaction data to store - */ - void setTransactionInfo(long tid, Fate.TxInfo txInfo, Serializable val); - - /** - * Remove the transaction from the store. - * - * @param tid the transaction id - */ - void delete(long tid); - - /** - * Attempt to reserve transaction - * - * @param tid transaction id - * @return true if reserved by this call, false if already reserved - */ - boolean tryReserve(long tid); - -} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java new file mode 100644 index 00000000000..238f981a22a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import java.io.Serializable; +import java.util.EnumSet; +import java.util.List; + +public class WrappedFateTxStore implements FateStore.FateTxStore { + protected final FateStore.FateTxStore wrapped; + + public WrappedFateTxStore(FateStore.FateTxStore wrapped) { + this.wrapped = wrapped; + } + + @Override + public void unreserve(long deferTime) { + wrapped.unreserve(deferTime); + } + + @Override + public Repo top() { + return wrapped.top(); + } + + @Override + public void push(Repo repo) throws StackOverflowException { + wrapped.push(repo); + } + + @Override + public void pop() { + wrapped.pop(); + } + + @Override + public FateStore.TStatus getStatus() { + return wrapped.getStatus(); + } + + @Override + public void setStatus(FateStore.TStatus status) { + wrapped.setStatus(status); + } + + @Override + public FateStore.TStatus waitForStatusChange(EnumSet expected) { + return wrapped.waitForStatusChange(expected); + } + + @Override + public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { + wrapped.setTransactionInfo(txInfo, val); + } + + @Override + public Serializable getTransactionInfo(Fate.TxInfo txInfo) { + return wrapped.getTransactionInfo(txInfo); + } + + @Override + public void delete() { + wrapped.delete(); + } + + @Override + public long timeCreated() { + return wrapped.timeCreated(); + } + + @Override + public long getID() { + return wrapped.getID(); + } + + @Override + public List> getStack() { + return wrapped.getStack(); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index 728cabcf7ee..683f17d9585 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -37,6 +37,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -55,7 +56,7 @@ //TODO use zoocache? - ACCUMULO-1297 //TODO handle zookeeper being down gracefully - ACCUMULO-1297 -public class ZooStore implements TStore { +public class ZooStore implements FateStore { private static final Logger log = LoggerFactory.getLogger(ZooStore.class); private String path; @@ -136,7 +137,7 @@ public long create() { } @Override - public long reserve() { + public FateTxStore reserve() { try { while (true) { @@ -186,7 +187,7 @@ public long reserve() { TStatus status = TStatus.valueOf(new String(zk.getData(path + "/" + txdir), UTF_8)); if (status == TStatus.SUBMITTED || status == TStatus.IN_PROGRESS || status == TStatus.FAILED_IN_PROGRESS) { - return tid; + return new FateTxStoreImpl(tid, true); } else { unreserve(tid); } @@ -220,7 +221,7 @@ public long reserve() { } @Override - public void reserve(long tid) { + public FateTxStore reserve(long tid) { synchronized (this) { reservationsWaiting++; try { @@ -233,6 +234,7 @@ public void reserve(long tid) { } reserved.add(tid); + return new FateTxStoreImpl(tid, true); } finally { reservationsWaiting--; } @@ -246,13 +248,12 @@ public void reserve(long tid) { * @return true if reserved by this call, false if already reserved */ @Override - public boolean tryReserve(long tid) { + public Optional> tryReserve(long tid) { synchronized (this) { if (!reserved.contains(tid)) { - reserve(tid); - return true; + return Optional.of(reserve(tid)); } - return false; + return Optional.empty(); } } @@ -272,238 +273,318 @@ private void unreserve(long tid) { } } - @Override - public void unreserve(long tid, long deferTime) { + private class FateTxStoreImpl implements FateTxStore { + + private final long tid; + private final boolean isReserved; - if (deferTime < 0) { - throw new IllegalArgumentException("deferTime < 0 : " + deferTime); + private FateTxStoreImpl(long tid, boolean isReserved) { + this.tid = tid; + this.isReserved = isReserved; } - synchronized (this) { - if (!reserved.remove(tid)) { - throw new IllegalStateException( - "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); + @Override + public void unreserve(long deferTime) { + + if (deferTime < 0) { + throw new IllegalArgumentException("deferTime < 0 : " + deferTime); } - if (deferTime > 0) { - defered.put(tid, System.currentTimeMillis() + deferTime); + synchronized (this) { + if (!reserved.remove(tid)) { + throw new IllegalStateException( + "Tried to unreserve id that was not reserved " + FateTxId.formatTid(tid)); + } + + if (deferTime > 0) { + defered.put(tid, System.currentTimeMillis() + deferTime); + } + + this.notifyAll(); } - this.notifyAll(); } - } + private void verifyReserved(boolean isWrite) { + if (!isReserved && isWrite) { + throw new IllegalStateException("Attempted write on unreserved FATE transaction."); + } - private void verifyReserved(long tid) { - synchronized (this) { - if (!reserved.contains(tid)) { - throw new IllegalStateException( - "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); + if (isReserved) { + synchronized (this) { + if (!reserved.contains(tid)) { + throw new IllegalStateException( + "Tried to operate on unreserved transaction " + FateTxId.formatTid(tid)); + } + } } } - } - private static final int RETRIES = 10; + private static final int RETRIES = 10; - @Override - public Repo top(long tid) { - verifyReserved(tid); + @Override + public Repo top() { + verifyReserved(false); - for (int i = 0; i < RETRIES; i++) { - String txpath = getTXPath(tid); - try { - String top; + for (int i = 0; i < RETRIES; i++) { + String txpath = getTXPath(tid); try { - top = findTop(txpath); - if (top == null) { - return null; + String top; + try { + top = findTop(txpath); + if (top == null) { + return null; + } + } catch (KeeperException.NoNodeException ex) { + throw new IllegalStateException(ex); } + + byte[] ser = zk.getData(txpath + "/" + top); + @SuppressWarnings("unchecked") + var deserialized = (Repo) deserialize(ser); + return deserialized; } catch (KeeperException.NoNodeException ex) { - throw new IllegalStateException(ex); + log.debug("zookeeper error reading " + txpath + ": " + ex, ex); + sleepUninterruptibly(100, MILLISECONDS); + continue; + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); } - - byte[] ser = zk.getData(txpath + "/" + top); - @SuppressWarnings("unchecked") - var deserialized = (Repo) deserialize(ser); - return deserialized; - } catch (KeeperException.NoNodeException ex) { - log.debug("zookeeper error reading " + txpath + ": " + ex, ex); - sleepUninterruptibly(100, MILLISECONDS); - continue; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); } + return null; } - return null; - } - private String findTop(String txpath) throws KeeperException, InterruptedException { - List ops = zk.getChildren(txpath); + private String findTop(String txpath) throws KeeperException, InterruptedException { + List ops = zk.getChildren(txpath); - ops = new ArrayList<>(ops); + ops = new ArrayList<>(ops); + + String max = ""; - String max = ""; + for (String child : ops) { + if (child.startsWith("repo_") && child.compareTo(max) > 0) { + max = child; + } + } - for (String child : ops) { - if (child.startsWith("repo_") && child.compareTo(max) > 0) { - max = child; + if (max.equals("")) { + return null; } + + return max; } - if (max.equals("")) { - return null; + @Override + public void push(Repo repo) throws StackOverflowException { + verifyReserved(true); + + String txpath = getTXPath(tid); + try { + String top = findTop(txpath); + if (top != null && Long.parseLong(top.split("_")[1]) > 100) { + throw new StackOverflowException("Repo stack size too large"); + } + + zk.putPersistentSequential(txpath + "/repo_", serialize(repo)); + } catch (StackOverflowException soe) { + throw soe; + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } } - return max; - } + @Override + public void pop() { + verifyReserved(true); - @Override - public void push(long tid, Repo repo) throws StackOverflowException { - verifyReserved(tid); + try { + String txpath = getTXPath(tid); + String top = findTop(txpath); + if (top == null) { + throw new IllegalStateException("Tried to pop when empty " + FateTxId.formatTid(tid)); + } + zk.recursiveDelete(txpath + "/" + top, NodeMissingPolicy.SKIP); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } - String txpath = getTXPath(tid); - try { - String top = findTop(txpath); - if (top != null && Long.parseLong(top.split("_")[1]) > 100) { - throw new StackOverflowException("Repo stack size too large"); + private TStatus _getStatus(long tid) { + try { + return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); + } catch (NoNodeException nne) { + return TStatus.UNKNOWN; + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); } + } - zk.putPersistentSequential(txpath + "/repo_", serialize(repo)); - } catch (StackOverflowException soe) { - throw soe; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); + @Override + public TStatus getStatus() { + verifyReserved(false); + return _getStatus(tid); } - } - @Override - public void pop(long tid) { - verifyReserved(tid); + @Override + public TStatus waitForStatusChange(EnumSet expected) { + while (true) { + long events; + synchronized (this) { + events = statusChangeEvents; + } - try { - String txpath = getTXPath(tid); - String top = findTop(txpath); - if (top == null) { - throw new IllegalStateException("Tried to pop when empty " + FateTxId.formatTid(tid)); + TStatus status = _getStatus(tid); + if (expected.contains(status)) { + return status; + } + + synchronized (this) { + // suppress lgtm alert - synchronized variable is not always true + if (events == statusChangeEvents) { // lgtm [java/constant-comparison] + try { + this.wait(5000); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + } } - zk.recursiveDelete(txpath + "/" + top, NodeMissingPolicy.SKIP); - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); } - } - private TStatus _getStatus(long tid) { - try { - return TStatus.valueOf(new String(zk.getData(getTXPath(tid)), UTF_8)); - } catch (NoNodeException nne) { - return TStatus.UNKNOWN; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } + @Override + public void setStatus(TStatus status) { + verifyReserved(true); - @Override - public TStatus getStatus(long tid) { - verifyReserved(tid); - return _getStatus(tid); - } + try { + zk.putPersistentData(getTXPath(tid), status.name().getBytes(UTF_8), + NodeExistsPolicy.OVERWRITE); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } - @Override - public TStatus waitForStatusChange(long tid, EnumSet expected) { - while (true) { - long events; synchronized (this) { - events = statusChangeEvents; + statusChangeEvents++; } - TStatus status = _getStatus(tid); - if (expected.contains(status)) { - return status; + } + + @Override + public void delete() { + verifyReserved(true); + + try { + zk.recursiveDelete(getTXPath(tid), NodeMissingPolicy.SKIP); + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); } + } - synchronized (this) { - // suppress lgtm alert - synchronized variable is not always true - if (events == statusChangeEvents) { // lgtm [java/constant-comparison] - try { - this.wait(5000); - } catch (InterruptedException e) { - throw new IllegalStateException(e); - } + @Override + public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) { + verifyReserved(true); + + try { + if (so instanceof String) { + zk.putPersistentData(getTXPath(tid) + "/" + txInfo, ("S " + so).getBytes(UTF_8), + NodeExistsPolicy.OVERWRITE); + } else { + byte[] sera = serialize(so); + byte[] data = new byte[sera.length + 2]; + System.arraycopy(sera, 0, data, 2, sera.length); + data[0] = 'O'; + data[1] = ' '; + zk.putPersistentData(getTXPath(tid) + "/" + txInfo, data, NodeExistsPolicy.OVERWRITE); } + } catch (KeeperException | InterruptedException e2) { + throw new IllegalStateException(e2); } } - } - @Override - public void setStatus(long tid, TStatus status) { - verifyReserved(tid); + @Override + public Serializable getTransactionInfo(Fate.TxInfo txInfo) { + verifyReserved(false); - try { - zk.putPersistentData(getTXPath(tid), status.name().getBytes(UTF_8), - NodeExistsPolicy.OVERWRITE); - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); + try { + byte[] data = zk.getData(getTXPath(tid) + "/" + txInfo); + + if (data[0] == 'O') { + byte[] sera = new byte[data.length - 2]; + System.arraycopy(data, 2, sera, 0, sera.length); + return (Serializable) deserialize(sera); + } else if (data[0] == 'S') { + return new String(data, 2, data.length - 2, UTF_8); + } else { + throw new IllegalStateException("Bad node data " + txInfo); + } + } catch (NoNodeException nne) { + return null; + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } } - synchronized (this) { - statusChangeEvents++; + @Override + public long timeCreated() { + verifyReserved(false); + + try { + Stat stat = zk.getZooKeeper().exists(getTXPath(tid), false); + return stat.getCtime(); + } catch (Exception e) { + return 0; + } } - } + @Override + public long getID() { + return tid; + } - @Override - public void delete(long tid) { - verifyReserved(tid); + @Override + public List> getStack() { + verifyReserved(false); + String txpath = getTXPath(tid); - try { - zk.recursiveDelete(getTXPath(tid), NodeMissingPolicy.SKIP); - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } + outer: while (true) { + List ops; + try { + ops = zk.getChildren(txpath); + } catch (KeeperException.NoNodeException e) { + return Collections.emptyList(); + } catch (KeeperException | InterruptedException e1) { + throw new IllegalStateException(e1); + } - @Override - public void setTransactionInfo(long tid, Fate.TxInfo txInfo, Serializable so) { - verifyReserved(tid); + ops = new ArrayList<>(ops); + ops.sort(Collections.reverseOrder()); + + ArrayList> dops = new ArrayList<>(); + + for (String child : ops) { + if (child.startsWith("repo_")) { + byte[] ser; + try { + ser = zk.getData(txpath + "/" + child); + @SuppressWarnings("unchecked") + var repo = (ReadOnlyRepo) deserialize(ser); + dops.add(repo); + } catch (KeeperException.NoNodeException e) { + // children changed so start over + continue outer; + } catch (KeeperException | InterruptedException e) { + throw new IllegalStateException(e); + } + } + } - try { - if (so instanceof String) { - zk.putPersistentData(getTXPath(tid) + "/" + txInfo, ("S " + so).getBytes(UTF_8), - NodeExistsPolicy.OVERWRITE); - } else { - byte[] sera = serialize(so); - byte[] data = new byte[sera.length + 2]; - System.arraycopy(sera, 0, data, 2, sera.length); - data[0] = 'O'; - data[1] = ' '; - zk.putPersistentData(getTXPath(tid) + "/" + txInfo, data, NodeExistsPolicy.OVERWRITE); + return dops; } - } catch (KeeperException | InterruptedException e2) { - throw new IllegalStateException(e2); } } @Override - public Serializable getTransactionInfo(long tid, Fate.TxInfo txInfo) { - verifyReserved(tid); - - try { - byte[] data = zk.getData(getTXPath(tid) + "/" + txInfo); - - if (data[0] == 'O') { - byte[] sera = new byte[data.length - 2]; - System.arraycopy(data, 2, sera, 0, sera.length); - return (Serializable) deserialize(sera); - } else if (data[0] == 'S') { - return new String(data, 2, data.length - 2, UTF_8); - } else { - throw new IllegalStateException("Bad node data " + txInfo); - } - } catch (NoNodeException nne) { - return null; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } + public ReadOnlyFateTxStore read(long tid) { + return new FateTxStoreImpl(tid, false); } @Override @@ -519,56 +600,4 @@ public List list() { throw new IllegalStateException(e); } } - - @Override - public long timeCreated(long tid) { - verifyReserved(tid); - - try { - Stat stat = zk.getZooKeeper().exists(getTXPath(tid), false); - return stat.getCtime(); - } catch (Exception e) { - return 0; - } - } - - @Override - public List> getStack(long tid) { - String txpath = getTXPath(tid); - - outer: while (true) { - List ops; - try { - ops = zk.getChildren(txpath); - } catch (KeeperException.NoNodeException e) { - return Collections.emptyList(); - } catch (KeeperException | InterruptedException e1) { - throw new IllegalStateException(e1); - } - - ops = new ArrayList<>(ops); - ops.sort(Collections.reverseOrder()); - - ArrayList> dops = new ArrayList<>(); - - for (String child : ops) { - if (child.startsWith("repo_")) { - byte[] ser; - try { - ser = zk.getData(txpath + "/" + child); - @SuppressWarnings("unchecked") - var repo = (ReadOnlyRepo) deserialize(ser); - dops.add(repo); - } catch (KeeperException.NoNodeException e) { - // children changed so start over - continue outer; - } catch (KeeperException | InterruptedException e) { - throw new IllegalStateException(e); - } - } - } - - return dops; - } - } } diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index fd31a95e6c8..ce8dda313b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -21,15 +21,17 @@ import static org.apache.accumulo.core.fate.FateTxId.formatTid; import java.io.Serializable; -import java.util.EnumSet; import java.util.List; +import java.util.Optional; import java.util.function.Function; import org.apache.accumulo.core.fate.Fate; -import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.StackOverflowException; -import org.apache.accumulo.core.fate.TStore; +import org.apache.accumulo.core.fate.WrappedFateTxStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,59 +42,84 @@ public class FateLogger { // reproducible problems with FATE transactions. private static final Logger storeLog = LoggerFactory.getLogger(PREFIX + "store"); - public static TStore wrap(TStore store, Function,String> toLogString) { + private static class LoggingFateTxStore extends WrappedFateTxStore { - // only logging operations that change the persisted data, not operations that only read data - return new TStore<>() { + private final Function,String> toLogString; - @Override - public long reserve() { - return store.reserve(); + private LoggingFateTxStore(FateTxStore wrapped, Function,String> toLogString) { + super(wrapped); + this.toLogString = toLogString; + } + + @Override + public void push(Repo repo) throws StackOverflowException { + super.push(repo); + if (storeLog.isTraceEnabled()) { + storeLog.trace("{} pushed {}", formatTid(getID()), toLogString.apply(repo)); } + } - @Override - public void reserve(long tid) { - store.reserve(tid); + @Override + public void pop() { + super.pop(); + if (storeLog.isTraceEnabled()) { + storeLog.trace("{} popped", formatTid(getID())); } + } - @Override - public boolean tryReserve(long tid) { - return store.tryReserve(tid); + @Override + public void setStatus(ReadOnlyFateStore.TStatus status) { + super.setStatus(status); + if (storeLog.isTraceEnabled()) { + storeLog.trace("{} setStatus to {}", formatTid(getID()), status); } + } - @Override - public void unreserve(long tid, long deferTime) { - store.unreserve(tid, deferTime); + @Override + public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { + super.setTransactionInfo(txInfo, val); + if (storeLog.isTraceEnabled()) { + storeLog.trace("{} setting {} to {}", formatTid(getID()), txInfo, val); } + } - @Override - public List> getStack(long tid) { - return store.getStack(tid); + @Override + public void delete() { + super.delete(); + if (storeLog.isTraceEnabled()) { + storeLog.trace("{} deleted fate transaction", formatTid(getID())); } + } + } + + public static FateStore wrap(FateStore store, Function,String> toLogString) { + + // only logging operations that change the persisted data, not operations that only read data + return new FateStore<>() { @Override - public TStatus getStatus(long tid) { - return store.getStatus(tid); + public FateTxStore reserve() { + return new LoggingFateTxStore<>(store.reserve(), toLogString); } @Override - public TStatus waitForStatusChange(long tid, EnumSet expected) { - return store.waitForStatusChange(tid, expected); + public FateTxStore reserve(long tid) { + return new LoggingFateTxStore<>(store.reserve(tid), toLogString); } @Override - public Serializable getTransactionInfo(long tid, Fate.TxInfo txInfo) { - return store.getTransactionInfo(tid, txInfo); + public Optional> tryReserve(long tid) { + return store.tryReserve(tid).map(ftxs -> new LoggingFateTxStore<>(ftxs, toLogString)); } @Override - public List list() { - return store.list(); + public ReadOnlyFateTxStore read(long tid) { + return store.read(tid); } @Override - public long timeCreated(long tid) { - return store.timeCreated(tid); + public List list() { + return store.list(); } @Override @@ -103,51 +130,6 @@ public long create() { } return tid; } - - @Override - public Repo top(long tid) { - return store.top(tid); - } - - @Override - public void push(long tid, Repo repo) throws StackOverflowException { - store.push(tid, repo); - if (storeLog.isTraceEnabled()) { - storeLog.trace("{} pushed {}", formatTid(tid), toLogString.apply(repo)); - } - } - - @Override - public void pop(long tid) { - store.pop(tid); - if (storeLog.isTraceEnabled()) { - storeLog.trace("{} popped", formatTid(tid)); - } - } - - @Override - public void setStatus(long tid, TStatus status) { - store.setStatus(tid, status); - if (storeLog.isTraceEnabled()) { - storeLog.trace("{} setStatus to {}", formatTid(tid), status); - } - } - - @Override - public void setTransactionInfo(long tid, Fate.TxInfo txInfo, Serializable val) { - store.setTransactionInfo(tid, txInfo, val); - if (storeLog.isTraceEnabled()) { - storeLog.trace("{} setting {} to {}", formatTid(tid), txInfo, val); - } - } - - @Override - public void delete(long tid) { - store.delete(tid); - if (storeLog.isTraceEnabled()) { - storeLog.trace("{} deleted fate transaction", formatTid(tid)); - } - } }; } } diff --git a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java index f36d7494b43..d2530ce1f3a 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java @@ -24,7 +24,7 @@ import java.util.Set; import org.apache.accumulo.core.fate.AgeOffStore.TimeSource; -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.Test; @@ -50,25 +50,25 @@ public void testBasic() throws InterruptedException, KeeperException { aoStore.ageOff(); long txid1 = aoStore.create(); - aoStore.reserve(txid1); - aoStore.setStatus(txid1, TStatus.IN_PROGRESS); - aoStore.unreserve(txid1, 0); + var txStore1 = aoStore.reserve(txid1); + txStore1.setStatus(TStatus.IN_PROGRESS); + txStore1.unreserve(0); aoStore.ageOff(); long txid2 = aoStore.create(); - aoStore.reserve(txid2); - aoStore.setStatus(txid2, TStatus.IN_PROGRESS); - aoStore.setStatus(txid2, TStatus.FAILED); - aoStore.unreserve(txid2, 0); + var txStore2 = aoStore.reserve(txid2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); + txStore2.unreserve(0); tts.time = 6; long txid3 = aoStore.create(); - aoStore.reserve(txid3); - aoStore.setStatus(txid3, TStatus.IN_PROGRESS); - aoStore.setStatus(txid3, TStatus.SUCCESSFUL); - aoStore.unreserve(txid3, 0); + var txStore3 = aoStore.reserve(txid3); + txStore3.setStatus(TStatus.IN_PROGRESS); + txStore3.setStatus(TStatus.SUCCESSFUL); + txStore3.unreserve(0); Long txid4 = aoStore.create(); @@ -99,21 +99,21 @@ public void testNonEmpty() throws InterruptedException, KeeperException { TestTimeSource tts = new TestTimeSource(); TestStore testStore = new TestStore(); long txid1 = testStore.create(); - testStore.reserve(txid1); - testStore.setStatus(txid1, TStatus.IN_PROGRESS); - testStore.unreserve(txid1, 0); + var txStore1 = testStore.reserve(txid1); + txStore1.setStatus(TStatus.IN_PROGRESS); + txStore1.unreserve(0); long txid2 = testStore.create(); - testStore.reserve(txid2); - testStore.setStatus(txid2, TStatus.IN_PROGRESS); - testStore.setStatus(txid2, TStatus.FAILED); - testStore.unreserve(txid2, 0); + var txStore2 = testStore.reserve(txid2); + txStore2.setStatus(TStatus.IN_PROGRESS); + txStore2.setStatus(TStatus.FAILED); + txStore2.unreserve(0); long txid3 = testStore.create(); - testStore.reserve(txid3); - testStore.setStatus(txid3, TStatus.IN_PROGRESS); - testStore.setStatus(txid3, TStatus.SUCCESSFUL); - testStore.unreserve(txid3, 0); + var txStore3 = testStore.reserve(txid3); + txStore3.setStatus(TStatus.IN_PROGRESS); + txStore3.setStatus(TStatus.SUCCESSFUL); + txStore3.unreserve(0); Long txid4 = testStore.create(); @@ -134,9 +134,9 @@ public void testNonEmpty() throws InterruptedException, KeeperException { assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); assertEquals(1, new HashSet<>(aoStore.list()).size()); - aoStore.reserve(txid1); - aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS); - aoStore.unreserve(txid1, 0); + txStore1 = aoStore.reserve(txid1); + txStore1.setStatus(TStatus.FAILED_IN_PROGRESS); + txStore1.unreserve(0); tts.time = 30; @@ -145,9 +145,9 @@ public void testNonEmpty() throws InterruptedException, KeeperException { assertEquals(Set.of(txid1), new HashSet<>(aoStore.list())); assertEquals(1, new HashSet<>(aoStore.list()).size()); - aoStore.reserve(txid1); - aoStore.setStatus(txid1, TStatus.FAILED); - aoStore.unreserve(txid1, 0); + txStore1 = aoStore.reserve(txid1); + txStore1.setStatus(TStatus.FAILED); + txStore1.unreserve(0); aoStore.ageOff(); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 9f6d44b27ca..5bfd60d2bd8 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -18,17 +18,20 @@ */ package org.apache.accumulo.core.fate; +import java.io.Serializable; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; /** * Transient in memory store for transactions. */ -public class TestStore extends ZooStore { +public class TestStore implements FateStore { private long nextId = 1; private Map statuses = new HashMap<>(); @@ -41,62 +44,127 @@ public long create() { } @Override - public void reserve(long tid) { + public FateTxStore reserve(long tid) { if (reserved.contains(tid)) { throw new IllegalStateException(); // zoo store would wait, but do not expect test to reserve } // twice... if test change, then change this reserved.add(tid); + return new TestFateTxStore(tid); } @Override - public boolean tryReserve(long tid) { + public FateTxStore reserve() { + throw new UnsupportedOperationException(); + } + + @Override + public Optional> tryReserve(long tid) { synchronized (this) { if (!reserved.contains(tid)) { reserve(tid); - return true; + return Optional.of(new TestFateTxStore(tid)); } - return false; + return Optional.empty(); } } - @Override - public void unreserve(long tid, long deferTime) { - if (!reserved.remove(tid)) { - throw new IllegalStateException(); + private class TestFateTxStore implements FateTxStore { + + private final long tid; + + TestFateTxStore(long tid) { + this.tid = tid; } - } - @Override - public org.apache.accumulo.core.fate.TStore.TStatus getStatus(long tid) { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); + @Override + public Repo top() { + throw new UnsupportedOperationException(); } - TStatus status = statuses.get(tid); - if (status == null) { - return TStatus.UNKNOWN; + @Override + public List> getStack() { + throw new UnsupportedOperationException(); } - return status; - } - @Override - public void setStatus(long tid, org.apache.accumulo.core.fate.TStore.TStatus status) { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); + @Override + public TStatus getStatus() { + if (!reserved.contains(tid)) { + throw new IllegalStateException(); + } + + TStatus status = statuses.get(tid); + if (status == null) { + return TStatus.UNKNOWN; + } + return status; + } + + @Override + public TStatus waitForStatusChange(EnumSet expected) { + throw new UnsupportedOperationException(); + } + + @Override + public Serializable getTransactionInfo(Fate.TxInfo txInfo) { + throw new UnsupportedOperationException(); + } + + @Override + public long timeCreated() { + throw new UnsupportedOperationException(); } - if (!statuses.containsKey(tid)) { - throw new IllegalStateException(); + + @Override + public long getID() { + return tid; + } + + @Override + public void push(Repo repo) throws StackOverflowException { + throw new UnsupportedOperationException(); + } + + @Override + public void pop() { + throw new UnsupportedOperationException(); + } + + @Override + public void setStatus(TStatus status) { + if (!reserved.contains(tid)) { + throw new IllegalStateException(); + } + if (!statuses.containsKey(tid)) { + throw new IllegalStateException(); + } + statuses.put(tid, status); + } + + @Override + public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) { + throw new UnsupportedOperationException(); + } + + @Override + public void delete() { + if (!reserved.contains(tid)) { + throw new IllegalStateException(); + } + statuses.remove(tid); + } + + @Override + public void unreserve(long deferTime) { + if (!reserved.remove(tid)) { + throw new IllegalStateException(); + } } - statuses.put(tid, status); } @Override - public void delete(long tid) { - if (!reserved.contains(tid)) { - throw new IllegalStateException(); - } - statuses.remove(tid); + public ReadOnlyFateTxStore read(long tid) { + throw new UnsupportedOperationException(); } @Override diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java index 8dc2981a03c..144013dc1eb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java @@ -55,7 +55,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.AdminUtil; import org.apache.accumulo.core.fate.FateTxId; -import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.zookeeper.ZooCache; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -783,7 +783,8 @@ private void executeFateOpsCommand(ServerContext context, FateOpsCommand fateOps if (fateOpsCommand.print) { final Set sortedTxs = new TreeSet<>(); fateOpsCommand.txList.forEach(s -> sortedTxs.add(parseTidFromUserInput(s))); - EnumSet statusFilter = getCmdLineStatusFilters(fateOpsCommand.states); + EnumSet statusFilter = + getCmdLineStatusFilters(fateOpsCommand.states); admin.print(zs, zk, zTableLocksPath, new Formatter(System.out), sortedTxs, statusFilter); // print line break at the end System.out.println(); @@ -835,7 +836,7 @@ private boolean cancelFateOperation(ClientContext context, long txid) throws Acc } private void summarizeFateTx(ServerContext context, FateOpsCommand cmd, AdminUtil admin, - ReadOnlyTStore zs, ServiceLock.ServiceLockPath tableLocksPath) + ReadOnlyFateStore zs, ServiceLock.ServiceLockPath tableLocksPath) throws InterruptedException, AccumuloException, AccumuloSecurityException, KeeperException { ZooReaderWriter zk = context.getZooReaderWriter(); @@ -854,7 +855,7 @@ private void summarizeFateTx(ServerContext context, FateOpsCommand cmd, AdminUti } }); - EnumSet statusFilter = getCmdLineStatusFilters(cmd.states); + EnumSet statusFilter = getCmdLineStatusFilters(cmd.states); FateSummaryReport report = new FateSummaryReport(idsToNameMap, statusFilter); @@ -881,12 +882,12 @@ private void printLines(List lines) { * * @return a set of status filters, or an empty set if none provides */ - private EnumSet getCmdLineStatusFilters(List states) { - EnumSet statusFilter = null; + private EnumSet getCmdLineStatusFilters(List states) { + EnumSet statusFilter = null; if (!states.isEmpty()) { - statusFilter = EnumSet.noneOf(ReadOnlyTStore.TStatus.class); + statusFilter = EnumSet.noneOf(ReadOnlyFateStore.TStatus.class); for (String element : states) { - statusFilter.add(ReadOnlyTStore.TStatus.valueOf(element)); + statusFilter.add(ReadOnlyFateStore.TStatus.valueOf(element)); } } return statusFilter; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java index d74c5ac6694..f99e36d704f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java @@ -33,7 +33,7 @@ import java.util.TreeSet; import org.apache.accumulo.core.fate.AdminUtil; -import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -55,7 +55,7 @@ public class FateSummaryReport { private final transient Map idsToNameMap; public FateSummaryReport(Map idsToNameMap, - EnumSet statusFilter) { + EnumSet statusFilter) { this.idsToNameMap = idsToNameMap; if (statusFilter != null) { statusFilter.forEach(f -> this.statusFilterNames.add(f.name())); diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java index 40cc9553058..bed17e92c40 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/SummaryReportTest.java @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.AdminUtil; -import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,7 +67,7 @@ public void noTablenameReport() { AdminUtil.TransactionStatus status1 = createMock(AdminUtil.TransactionStatus.class); expect(status1.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(1)).anyTimes(); - expect(status1.getStatus()).andReturn(ReadOnlyTStore.TStatus.IN_PROGRESS).anyTimes(); + expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status1.getTop()).andReturn(null).anyTimes(); expect(status1.getTxName()).andReturn(null).anyTimes(); expect(status1.getTxid()).andReturn("abcdabcd").anyTimes(); diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java index f171a7e3e8d..48e5c598453 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java @@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.fate.AdminUtil; -import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +50,7 @@ void orderingByDuration() { AdminUtil.TransactionStatus status1 = createMock(AdminUtil.TransactionStatus.class); expect(status1.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(1)).anyTimes(); - expect(status1.getStatus()).andReturn(ReadOnlyTStore.TStatus.IN_PROGRESS).anyTimes(); + expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status1.getTop()).andReturn("step1").anyTimes(); expect(status1.getTxName()).andReturn("runningTx1").anyTimes(); expect(status1.getTxid()).andReturn("abcdabcd").anyTimes(); @@ -59,7 +59,7 @@ void orderingByDuration() { AdminUtil.TransactionStatus status2 = createMock(AdminUtil.TransactionStatus.class); expect(status2.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(7)).anyTimes(); - expect(status2.getStatus()).andReturn(ReadOnlyTStore.TStatus.IN_PROGRESS).anyTimes(); + expect(status2.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status2.getTop()).andReturn("step2").anyTimes(); expect(status2.getTxName()).andReturn("runningTx2").anyTimes(); expect(status2.getTxid()).andReturn("123456789").anyTimes(); @@ -93,7 +93,7 @@ public void lockTest() { AdminUtil.TransactionStatus status1 = createMock(AdminUtil.TransactionStatus.class); expect(status1.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(1)).anyTimes(); - expect(status1.getStatus()).andReturn(ReadOnlyTStore.TStatus.IN_PROGRESS).anyTimes(); + expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes(); expect(status1.getTop()).andReturn("step1").anyTimes(); expect(status1.getTxName()).andReturn("runningTx").anyTimes(); expect(status1.getTxid()).andReturn("abcdabcd").anyTimes(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java index e94e4907bc9..d04fc9455c1 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java @@ -68,7 +68,7 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.manager.thrift.BulkImportState; import org.apache.accumulo.core.manager.thrift.FateOperation; import org.apache.accumulo.core.manager.thrift.FateService; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java index 57a561aa7b4..e31ff65398e 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java @@ -24,7 +24,7 @@ import java.util.TreeMap; import org.apache.accumulo.core.fate.AdminUtil; -import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.server.ServerContext; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -101,7 +101,7 @@ Map getOpTypeCounters() { * @return the current FATE metric values. */ public static FateMetricValues getFromZooKeeper(final ServerContext context, - final String fateRootPath, final ReadOnlyTStore zooStore) { + final String fateRootPath, final ReadOnlyFateStore zooStore) { FateMetricValues.Builder builder = FateMetricValues.builder(); @@ -116,7 +116,7 @@ public static FateMetricValues getFromZooKeeper(final ServerContext context, // states are enumerated - create new map with counts initialized to 0. Map states = new TreeMap<>(); - for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) { + for (ReadOnlyFateStore.TStatus t : ReadOnlyFateStore.TStatus.values()) { states.put(t.name(), 0L); } @@ -132,7 +132,7 @@ public static FateMetricValues getFromZooKeeper(final ServerContext context, states.merge(stateName, 1L, Long::sum); // incr count for op type for for in_progress transactions. - if (ReadOnlyTStore.TStatus.IN_PROGRESS.equals(tx.getStatus())) { + if (ReadOnlyFateStore.TStatus.IN_PROGRESS.equals(tx.getStatus())) { String opType = tx.getTxName(); if (opType == null || opType.isEmpty()) { opType = "UNKNOWN"; @@ -189,7 +189,7 @@ static class Builder { // states are enumerated - create new map with counts initialized to 0. txStateCounters = new TreeMap<>(); - for (ReadOnlyTStore.TStatus t : ReadOnlyTStore.TStatus.values()) { + for (ReadOnlyFateStore.TStatus t : ReadOnlyFateStore.TStatus.values()) { txStateCounters.put(t.name(), 0L); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java index 758884d5b46..1fc7dcf4752 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetrics.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.metrics.MetricsProducer; import org.apache.accumulo.core.metrics.MetricsUtil; @@ -49,7 +49,7 @@ public class FateMetrics implements MetricsProducer { private static final String OP_TYPE_TAG = "op.type"; private final ServerContext context; - private final ReadOnlyTStore zooStore; + private final ReadOnlyFateStore zooStore; private final String fateRootPath; private final long refreshDelay; @@ -93,7 +93,7 @@ private void update() { fateErrorsGauge.set(metricValues.getZkConnectionErrors()); for (Entry vals : metricValues.getTxStateCounters().entrySet()) { - switch (ReadOnlyTStore.TStatus.valueOf(vals.getKey())) { + switch (ReadOnlyFateStore.TStatus.valueOf(vals.getKey())) { case NEW: newTxGauge.set(vals.getValue()); break; @@ -134,20 +134,22 @@ public void registerMetrics(final MeterRegistry registry) { fateErrorsGauge = registry.gauge(METRICS_FATE_ERRORS, Tags.concat(MetricsUtil.getCommonTags(), "type", "zk.connection"), new AtomicLong(0)); newTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), "state", - ReadOnlyTStore.TStatus.NEW.name().toLowerCase()), new AtomicLong(0)); + ReadOnlyFateStore.TStatus.NEW.name().toLowerCase()), new AtomicLong(0)); submittedTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.SUBMITTED.name().toLowerCase()), new AtomicLong(0)); + "state", ReadOnlyFateStore.TStatus.SUBMITTED.name().toLowerCase()), new AtomicLong(0)); inProgressTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.IN_PROGRESS.name().toLowerCase()), new AtomicLong(0)); + "state", ReadOnlyFateStore.TStatus.IN_PROGRESS.name().toLowerCase()), new AtomicLong(0)); failedInProgressTxGauge = - registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), "state", - ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase()), new AtomicLong(0)); + registry.gauge(METRICS_FATE_TX, + Tags.concat(MetricsUtil.getCommonTags(), "state", + ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS.name().toLowerCase()), + new AtomicLong(0)); failedTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.FAILED.name().toLowerCase()), new AtomicLong(0)); + "state", ReadOnlyFateStore.TStatus.FAILED.name().toLowerCase()), new AtomicLong(0)); successfulTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.SUCCESSFUL.name().toLowerCase()), new AtomicLong(0)); + "state", ReadOnlyFateStore.TStatus.SUCCESSFUL.name().toLowerCase()), new AtomicLong(0)); unknownTxGauge = registry.gauge(METRICS_FATE_TX, Tags.concat(MetricsUtil.getCommonTags(), - "state", ReadOnlyTStore.TStatus.UNKNOWN.name().toLowerCase()), new AtomicLong(0)); + "state", ReadOnlyFateStore.TStatus.UNKNOWN.name().toLowerCase()), new AtomicLong(0)); update(); diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java index ffd6c4fba1f..2bcaad0e1bb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java @@ -34,7 +34,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.fate.ReadOnlyTStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.util.threads.ThreadPools; @@ -272,7 +272,7 @@ public UpgradeStatus getStatus() { justification = "Want to immediately stop all manager threads on upgrade error") private void abortIfFateTransactions(ServerContext context) { try { - final ReadOnlyTStore fate = new ZooStore<>( + final ReadOnlyFateStore fate = new ZooStore<>( context.getZooKeeperRoot() + Constants.ZFATE, context.getZooReaderWriter()); if (!fate.list().isEmpty()) { throw new AccumuloException("Aborting upgrade because there are" diff --git a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java index 5834179fc09..8efb614e7d9 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/zookeeper/FateIT.java @@ -19,12 +19,12 @@ package org.apache.accumulo.test.fate.zookeeper; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.IN_PROGRESS; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.NEW; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUBMITTED; -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUCCESSFUL; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED; +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL; import static org.apache.accumulo.harness.AccumuloITBase.ZOOKEEPER_TESTING_SERVER; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; @@ -47,7 +47,7 @@ import org.apache.accumulo.core.fate.AgeOffStore; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateTxId; -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; import org.apache.accumulo.core.fate.Repo; import org.apache.accumulo.core.fate.ZooStore; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -57,6 +57,7 @@ import org.apache.accumulo.manager.tableOps.TraceRepo; import org.apache.accumulo.manager.tableOps.Utils; import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.util.Wait; import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer; import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.AfterAll; @@ -237,7 +238,7 @@ public void testCancelWhileNew() throws Exception { assertTrue(fate.cancel(txid)); assertTrue(FAILED_IN_PROGRESS == getTxStatus(zk, txid) || FAILED == getTxStatus(zk, txid)); fate.seedTransaction("TestOperation", txid, new TestOperation(NS, TID), true, "Test Op"); - assertTrue(FAILED_IN_PROGRESS == getTxStatus(zk, txid) || FAILED == getTxStatus(zk, txid)); + Wait.waitFor(() -> FAILED == getTxStatus(zk, txid)); fate.delete(txid); } finally { fate.shutdown(); @@ -276,6 +277,7 @@ public void testCancelWhileSubmittedAndRunning() throws Exception { assertEquals(SUBMITTED, getTxStatus(zk, txid)); // This is false because the transaction runner has reserved the FaTe // transaction. + Wait.waitFor(() -> IN_PROGRESS == getTxStatus(zk, txid)); assertFalse(fate.cancel(txid)); callStarted.await(); finishCall.countDown();