Skip to content

Commit

Permalink
Ignore missing transactions during recovery (#407)
Browse files Browse the repository at this point in the history
  • Loading branch information
diegosalvi authored and eolivelli committed Jul 8, 2019
1 parent 31a8a32 commit 2b7e6e4
Show file tree
Hide file tree
Showing 2 changed files with 329 additions and 49 deletions.
111 changes: 69 additions & 42 deletions herddb-core/src/main/java/herddb/core/TableManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,6 @@

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;

import herddb.codec.RecordSerializer;
import herddb.core.PageSet.DataPageMetaData;
import herddb.core.stats.TableManagerStats;
Expand Down Expand Up @@ -114,6 +79,39 @@
import herddb.utils.LocalLockManager;
import herddb.utils.LockHandle;
import herddb.utils.SystemProperties;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;

/**
* Handles Data of a Table
Expand Down Expand Up @@ -146,6 +144,15 @@ public final class TableManager implements AbstractTableManager, Page.Owner {
private static final boolean USE_LEGACY_LOCK_MANAGER = SystemProperties
.getBooleanSystemProperty("herddb.tablemanager.legacylocks", false);

/**
* Ignores insert/update/delete failures due to missing transactions during recovery. The operation in
* recovery will be ignored.
*
* Mutable and visible for tests.
*/
static boolean IGNORE_MISSING_TRANSACTIONS_ON_RECOVERY = SystemProperties
.getBooleanSystemProperty("herddb.tablemanager.ignoreMissingTransactionsOnRecovery", false);

private final ConcurrentMap<Long, DataPage> newPages;

private final ConcurrentMap<Long, DataPage> pages;
Expand Down Expand Up @@ -1473,9 +1480,16 @@ public void apply(CommitLogResult writeResult, LogEntry entry, boolean recovery)
if (entry.transactionId > 0) {
Transaction transaction = tableSpaceManager.getTransaction(entry.transactionId);
if (transaction == null) {
throw new DataStorageManagerException("no such transaction " + entry.transactionId);
/* Ignore missing transaction only if during recovery and ignore property is active */
if (recovery && IGNORE_MISSING_TRANSACTIONS_ON_RECOVERY) {
LOGGER.log(Level.WARNING, "Ignoring delete of {0} due to missing transaction {1}",
new Object[] { entry.key, entry.transactionId });
} else {
throw new DataStorageManagerException("no such transaction " + entry.transactionId);
}
} else {
transaction.registerDeleteOnTable(this.table.name, key, writeResult);
}
transaction.registerDeleteOnTable(this.table.name, key, writeResult);
} else {
applyDelete(key);
}
Expand All @@ -1487,9 +1501,16 @@ public void apply(CommitLogResult writeResult, LogEntry entry, boolean recovery)
if (entry.transactionId > 0) {
Transaction transaction = tableSpaceManager.getTransaction(entry.transactionId);
if (transaction == null) {
throw new DataStorageManagerException("no such transaction " + entry.transactionId);
/* Ignore missing transaction only if during recovery and ignore property is active */
if (recovery && IGNORE_MISSING_TRANSACTIONS_ON_RECOVERY) {
LOGGER.log(Level.WARNING, "Ignoring update of {0} due to missing transaction {1}",
new Object[] { entry.key, entry.transactionId });
} else {
throw new DataStorageManagerException("no such transaction " + entry.transactionId);
}
} else {
transaction.registerRecordUpdate(this.table.name, key, value, writeResult);
}
transaction.registerRecordUpdate(this.table.name, key, value, writeResult);
} else {
applyUpdate(key, value);
}
Expand All @@ -1501,9 +1522,16 @@ public void apply(CommitLogResult writeResult, LogEntry entry, boolean recovery)
if (entry.transactionId > 0) {
Transaction transaction = tableSpaceManager.getTransaction(entry.transactionId);
if (transaction == null) {
throw new DataStorageManagerException("no such transaction " + entry.transactionId);
/* Ignore missing transaction only if during recovery and ignore property is active */
if (recovery && IGNORE_MISSING_TRANSACTIONS_ON_RECOVERY) {
LOGGER.log(Level.WARNING, "Ignoring insert of {0} due to missing transaction {1}",
new Object[] { entry.key, entry.transactionId });
} else {
throw new DataStorageManagerException("no such transaction " + entry.transactionId);
}
} else {
transaction.registerInsertOnTable(table.name, key, value, writeResult);
}
transaction.registerInsertOnTable(table.name, key, value, writeResult);
} else {
applyInsert(key, value, false);
}
Expand All @@ -1512,7 +1540,6 @@ public void apply(CommitLogResult writeResult, LogEntry entry, boolean recovery)
case LogEntryType.TRUNCATE_TABLE: {
applyTruncate();
}
;
break;
default:
throw new IllegalArgumentException("unhandled entry type " + entry.type);
Expand Down
Loading

0 comments on commit 2b7e6e4

Please sign in to comment.