Skip to content

Commit

Permalink
refactor fates storage layer (apache#4017)
Browse files Browse the repository at this point in the history
Refactored the storage layer of FATE to return an object when reserving
a fate transaction.  This object allows mutating the storage related to
that FATE transaction.  This replaces methods where the fate transaction
id had to to always be passed.

These changes are a subset of the changes in apache#3964, but only focusing on
the storage layer refactoring and nothing else.

Co-authored-by: Christopher L. Shannon (cshannon) <[email protected]>
  • Loading branch information
keith-turner and cshannon authored Dec 6, 2023
1 parent c340c7a commit 73f6586
Show file tree
Hide file tree
Showing 21 changed files with 1,011 additions and 853 deletions.
129 changes: 67 additions & 62 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@
import java.util.Map.Entry;
import java.util.Set;

import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.zookeeper.FateLock;
import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
Expand Down Expand Up @@ -210,14 +212,14 @@ public Map<String,List<String>> getDanglingWaitingLocks() {
/**
* Returns a list of the FATE transactions, optionally filtered by transaction id and status. This
* method does not process lock information, if lock information is desired, use
* {@link #getStatus(ReadOnlyTStore, ZooReader, ServiceLockPath, Set, EnumSet)}
* {@link #getStatus(ReadOnlyFateStore, ZooReader, ServiceLockPath, Set, EnumSet)}
*
* @param zs read-only zoostore
* @param filterTxid filter results to include for provided transaction ids.
* @param filterStatus filter results to include only provided status types
* @return list of FATE transactions that match filter criteria
*/
public List<TransactionStatus> getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTxid,
public List<TransactionStatus> getTransactionStatus(ReadOnlyFateStore<T> zs, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus) {

FateStatus status = getTransactionStatus(zs, filterTxid, filterStatus,
Expand All @@ -239,7 +241,7 @@ public List<TransactionStatus> getTransactionStatus(ReadOnlyTStore<T> zs, Set<Lo
* @throws KeeperException if zookeeper exception occurs
* @throws InterruptedException if process is interrupted.
*/
public FateStatus getStatus(ReadOnlyTStore<T> zs, ZooReader zk,
public FateStatus getStatus(ReadOnlyFateStore<T> zs, ZooReader zk,
ServiceLock.ServiceLockPath lockPath, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
throws KeeperException, InterruptedException {
Map<Long,List<String>> heldLocks = new HashMap<>();
Expand Down Expand Up @@ -332,7 +334,7 @@ private void findLocks(ZooReader zk, final ServiceLock.ServiceLockPath lockPath,
* @param waitingLocks populated list of locks held by transaction - or an empty map if none.
* @return current fate and lock status
*/
private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTxid,
private FateStatus getTransactionStatus(ReadOnlyFateStore<T> zs, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus, Map<Long,List<String>> heldLocks,
Map<Long,List<String>> waitingLocks) {

Expand All @@ -341,9 +343,9 @@ private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTx

for (Long tid : transactions) {

zs.reserve(tid);
ReadOnlyFateTxStore<T> txStore = zs.read(tid);

String txName = (String) zs.getTransactionInfo(tid, Fate.TxInfo.TX_NAME);
String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);

List<String> hlocks = heldLocks.remove(tid);

Expand All @@ -358,16 +360,14 @@ private FateStatus getTransactionStatus(ReadOnlyTStore<T> zs, Set<Long> filterTx
}

String top = null;
ReadOnlyRepo<T> repo = zs.top(tid);
ReadOnlyRepo<T> repo = txStore.top();
if (repo != null) {
top = repo.getName();
}

TStatus status = zs.getStatus(tid);
TStatus status = txStore.getStatus();

long timeCreated = zs.timeCreated(tid);

zs.unreserve(tid, 0);
long timeCreated = txStore.timeCreated();

if (includeByStatus(status, filterStatus) && includeByTxid(tid, filterTxid)) {
statuses.add(new TransactionStatus(tid, status, txName, hlocks, wlocks, top, timeCreated));
Expand All @@ -386,14 +386,14 @@ private boolean includeByTxid(Long tid, Set<Long> filterTxid) {
return (filterTxid == null) || filterTxid.isEmpty() || filterTxid.contains(tid);
}

public void printAll(ReadOnlyTStore<T> zs, ZooReader zk,
public void printAll(ReadOnlyFateStore<T> zs, ZooReader zk,
ServiceLock.ServiceLockPath tableLocksPath) throws KeeperException, InterruptedException {
print(zs, zk, tableLocksPath, new Formatter(System.out), null, null);
}

public void print(ReadOnlyTStore<T> zs, ZooReader zk, ServiceLock.ServiceLockPath tableLocksPath,
Formatter fmt, Set<Long> filterTxid, EnumSet<TStatus> filterStatus)
throws KeeperException, InterruptedException {
public void print(ReadOnlyFateStore<T> zs, ZooReader zk,
ServiceLock.ServiceLockPath tableLocksPath, Formatter fmt, Set<Long> filterTxid,
EnumSet<TStatus> filterStatus) throws KeeperException, InterruptedException {
FateStatus fateStatus = getStatus(zs, zk, tableLocksPath, filterTxid, filterStatus);

for (TransactionStatus txStatus : fateStatus.getTransactions()) {
Expand All @@ -417,7 +417,7 @@ public void print(ReadOnlyTStore<T> zs, ZooReader zk, ServiceLock.ServiceLockPat
}
}

public boolean prepDelete(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath path,
public boolean prepDelete(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath path,
String txidStr) {
if (!checkGlobalLock(zk, path)) {
return false;
Expand All @@ -431,30 +431,32 @@ public boolean prepDelete(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath path
return false;
}
boolean state = false;
zs.reserve(txid);
TStatus ts = zs.getStatus(txid);
switch (ts) {
case UNKNOWN:
System.out.printf("Invalid transaction ID: %016x%n", txid);
break;

case SUBMITTED:
case IN_PROGRESS:
case NEW:
case FAILED:
case FAILED_IN_PROGRESS:
case SUCCESSFUL:
System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
zs.delete(txid);
state = true;
break;
FateTxStore<T> txStore = zs.reserve(txid);
try {
TStatus ts = txStore.getStatus();
switch (ts) {
case UNKNOWN:
System.out.printf("Invalid transaction ID: %016x%n", txid);
break;

case SUBMITTED:
case IN_PROGRESS:
case NEW:
case FAILED:
case FAILED_IN_PROGRESS:
case SUCCESSFUL:
System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
txStore.delete();
state = true;
break;
}
} finally {
txStore.unreserve(0);
}

zs.unreserve(txid, 0);
return state;
}

public boolean prepFail(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath,
public boolean prepFail(FateStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockManagerPath,
String txidStr) {
if (!checkGlobalLock(zk, zLockManagerPath)) {
return false;
Expand All @@ -468,33 +470,36 @@ public boolean prepFail(TStore<T> zs, ZooReaderWriter zk, ServiceLockPath zLockM
return false;
}
boolean state = false;
zs.reserve(txid);
TStatus ts = zs.getStatus(txid);
switch (ts) {
case UNKNOWN:
System.out.printf("Invalid transaction ID: %016x%n", txid);
break;

case SUBMITTED:
case IN_PROGRESS:
case NEW:
System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
state = true;
break;

case SUCCESSFUL:
System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts);
break;

case FAILED:
case FAILED_IN_PROGRESS:
System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts);
state = true;
break;
FateTxStore<T> txStore = zs.reserve(txid);
try {
TStatus ts = txStore.getStatus();
switch (ts) {
case UNKNOWN:
System.out.printf("Invalid transaction ID: %016x%n", txid);
break;

case SUBMITTED:
case IN_PROGRESS:
case NEW:
System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
txStore.setStatus(TStatus.FAILED_IN_PROGRESS);
state = true;
break;

case SUCCESSFUL:
System.out.printf("Transaction already completed: %016x (%s)%n", txid, ts);
break;

case FAILED:
case FAILED_IN_PROGRESS:
System.out.printf("Transaction already failed: %016x (%s)%n", txid, ts);
state = true;
break;
}
} finally {
txStore.unreserve(0);
}

zs.unreserve(txid, 0);
return state;
}

Expand Down
Loading

0 comments on commit 73f6586

Please sign in to comment.