Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][dingo-executor] Fix pessimistic transaction update and resolve … #1320

Merged
merged 1 commit into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) {
LogUtils.debug(log,
"{}, repeat primary key :{} keyValue is null", txnId,
Arrays.toString(primaryLockKeyBytes));
context.getUpdateResidualDeleteKey().set(true);
@Nullable Object[] finalTuple1 = tuple;
vertex.getOutList().forEach(o -> o.transformToNext(context, finalTuple1));
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,21 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
Op.DELETE,
dataKey
);
vertex.getTask().getPartData().put(
new TxnPartData(tableId, partId),
(!isVector && !isDocument)
);
// first lock and kvGet is null
if (localStore.get(rollBackKey) != null) {
vertex.getTask().getPartData().put(
new TxnPartData(tableId, partId),
(!isVector && !isDocument)
);
profile.time(start);
return true;
} else {
rollBackKey = ByteUtils.getKeyByOp(
CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.PUT, dataKey
);
if (localStore.get(rollBackKey) != null) {
localStore.delete(rollBackKey);
}
KeyValue kv = wrap(codec::encode).apply(tuple);
if (kv == null) {
// Delete non-existent keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,12 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
LogUtils.warn(log, "{} updated is false key is {}", txnId, Arrays.toString(key));
// data is not exist local store
if (oldKeyValue == null) {
Op op = Op.PUT;
if (context.getUpdateResidualDeleteKey().get()) {
op = Op.DELETE;
}
byte[] rollBackKey = ByteUtils.getKeyByOp(
CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.DELETE, dataKey
CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, op, dataKey
);
KeyValue rollBackKeyValue = new KeyValue(rollBackKey, null);
LogUtils.debug(log, "{}, updated is false residual key is:{}",
Expand Down Expand Up @@ -248,6 +252,12 @@ protected boolean pushTuple(Context context, Object[] tuple, Vertex vertex) {
);
return true;
} else {
rollBackKey = ByteUtils.getKeyByOp(
CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.PUT, dataKey
);
if (localStore.get(rollBackKey) != null) {
localStore.delete(rollBackKey);
}
KeyValue kv = wrap(codec::encode).apply(newTuple2);
CodecService.getDefault().setId(kv.getKey(), partId.domain);
// extraKeyValue [12_jobId_tableId_partId_a_none, oldValue]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

@Setter
@Getter
Expand All @@ -34,6 +35,8 @@ public class Context {
private CommonId indexId;
private RangeDistribution distribution;
private List<Boolean> keyState;
@Builder.Default
private AtomicBoolean updateResidualDeleteKey = new AtomicBoolean(false);

private boolean isDuplicateKey;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,17 +429,26 @@ public synchronized void commit(JobManager jobManager) {
crossNodePreWriteSeconds(jobManager, currentLocation, jobId);
} else {
if (twoPhaseCommitData.getUseAsyncCommit().get()) {
if (transactionConfig.isAsyncCommitSleep()) {
try {
Thread.sleep(transactionConfig.getAsyncCommitSleepTime());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// Async Commit PreWriteSecondKeys
LogUtils.info(log, "{} Async Commit PreWriteSecondKeys", transactionOf());
LogUtils.info(log, "{} Async Commit Start PreWriteSecondKeys", transactionOf());
parallelPreWriteSecondKeys(twoPhaseCommitData);
if (twoPhaseCommitData.getUseAsyncCommit().get()) {
commitTs = twoPhaseCommitData.getMinCommitTs().get();
// todo calculateMaxCommitTS and checkSchemaValid
}
LogUtils.info(log, "{} Async Commit PreWriteSecondKeys End", transactionOf());
} else {
LogUtils.info(log, "{} parallelPreWrite", transactionOf());
LogUtils.info(log, "{} start parallelPreWrite", transactionOf());
twoPhaseCommitData.getUseAsyncCommit().set(false);
parallelPreWriteSecondKeys(twoPhaseCommitData);
LogUtils.info(log, "{} parallelPreWrite end", transactionOf());
}
}
commitProfile.endPreWriteSecond();
Expand Down Expand Up @@ -497,25 +506,27 @@ public synchronized void commit(JobManager jobManager) {
}

if (twoPhaseCommitData.getUseAsyncCommit().get()) {
try {
CompletableFuture<Void> commit_future = CompletableFuture.runAsync(
() -> {
LogUtils.info(log, "{} asyncCommitJobRun", transactionOf());
asyncCommitJobRun(twoPhaseCommitData);
},
Executors.executor("exec-asyncTxnCommit")
).exceptionally(
ex -> {
LogUtils.error(log, ex.toString(), ex);
this.status = TransactionStatus.COMMIT_FAIL;
return null;
}
);
commitFuture = commit_future;
} finally {
LogUtils.info(log, "{} Async Commit End commit_ts:{}, Status:{}, Cost:{}ms", transactionOf(),
commitTs, status, (System.currentTimeMillis() - preWriteStart));
if (transactionConfig.isAsyncCommitSleep()) {
try {
Thread.sleep(transactionConfig.getAsyncCommitSleepTime());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
CompletableFuture<Void> commit_future = CompletableFuture.runAsync(
() -> {
// LogUtils.info(log, "{} start asyncCommitJobRun", transactionOf());
asyncCommitJobRun(twoPhaseCommitData, preWriteStart);
},
Executors.executor("exec-asyncTxnCommit")
).exceptionally(
ex -> {
LogUtils.error(log, ex.toString(), ex);
this.status = TransactionStatus.COMMIT_FAIL;
return null;
}
);
commitFuture = commit_future;
} else {
try {
if (cancel.get()) {
Expand Down Expand Up @@ -722,7 +733,7 @@ private void crossNodeCommitJobRun(JobManager jobManager, Location currentLocati
while (iterator.hasNext()) {
iterator.next();
}
LogUtils.info(log, "{} commitJobRun end", transactionOf());
LogUtils.info(log, "{} crossNode commitJobRun end", transactionOf());
} catch (Throwable throwable) {
LogUtils.error(log, throwable.getMessage(), throwable);
} finally {
Expand All @@ -731,7 +742,7 @@ private void crossNodeCommitJobRun(JobManager jobManager, Location currentLocati
}
}

private void asyncCommitJobRun(TwoPhaseCommitData twoPhaseCommitData) {
private void asyncCommitJobRun(TwoPhaseCommitData twoPhaseCommitData, long preWriteStart) {
try {
MdcUtils.setTxnId(txnId.toString());
LogUtils.info(log, "{} Start AsyncCommitPrimaryKey, commitTs:{}", transactionOf(), commitTs);
Expand All @@ -748,6 +759,14 @@ private void asyncCommitJobRun(TwoPhaseCommitData twoPhaseCommitData) {
this.status = TransactionStatus.COMMIT_PRIMARY_KEY;
twoPhaseCommitData.setPrimaryKey(primaryKey);
twoPhaseCommitData.setCommitTs(commitTs);
LogUtils.info(log, "{} AsyncCommitPrimaryKey end, commitTs:{}", transactionOf(), commitTs);
if (transactionConfig.isAsyncCommitSleep()) {
try {
Thread.sleep(transactionConfig.getAsyncCommitSleepTime());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
partDataMap.keySet().parallelStream()
.map($ -> TwoPhaseCommitUtils.commitSecondKeys($, twoPhaseCommitData))
.forEach(future -> future.join());
Expand All @@ -756,6 +775,8 @@ private void asyncCommitJobRun(TwoPhaseCommitData twoPhaseCommitData) {
} catch (Throwable throwable) {
LogUtils.error(log, throwable.getMessage(), throwable);
} finally {
LogUtils.info(log, "{} Async Commit End commit_ts:{}, Status:{}, Cost:{}ms", transactionOf(),
commitTs, status, (System.currentTimeMillis() - preWriteStart));
MdcUtils.removeTxnId();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,16 @@ public boolean isAsyncCommit() {
public boolean isCrossNodeCommit() {
return "on".equalsIgnoreCase(sessionVariables.getProperty("enable_use_cross_node_commit"));
}

public boolean isAsyncCommitSleep() {
return "on".equalsIgnoreCase(sessionVariables.getProperty("enable_async_commit_sleep"));
}

public long getAsyncCommitSleepTime() {
Optional<String> retryCountOpt = Optional.ofNullable(
sessionVariables.getProperty("async_commit_sleep_time"));
return (retryCountOpt
.map(Long::parseLong)
.orElse(5000L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,16 @@ public CacheToObject primaryLockTo() {
byte[] updateKey = Arrays.copyOf(insertKey, insertKey.length);
updateKey[updateKey.length - 2] = (byte) Op.PUT.getCode();
byte[] noneKey = ByteUtils.getKeyByOp(CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.DELETE, updateKey);
List<byte[]> bytes = new ArrayList<>(4);
byte[] nonePutKey = ByteUtils.getKeyByOp(CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK, Op.PUT, updateKey);
List<byte[]> bytes = new ArrayList<>(5);
bytes.add(insertKey);
bytes.add(deleteKey);
bytes.add(updateKey);
bytes.add(noneKey);
bytes.add(nonePutKey);
List<KeyValue> keyValues = cache.getKeys(bytes);
cache.deleteKey(noneKey);
cache.deleteKey(nonePutKey);
if (keyValues != null && keyValues.size() > 0) {
if (keyValues.size() > 1) {
throw new RuntimeException(txnId + " PrimaryKey is not existed than two in local store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public boolean push(Context context, @Nullable Object[] tuple, Vertex vertex) {
lockKey[0] = (byte) CommonId.CommonType.TXN_CACHE_RESIDUAL_LOCK.getCode();
lockKey[lockKey.length - 2] = (byte) Op.DELETE.getCode();
store.delete(lockKey);
lockKey[lockKey.length - 2] = (byte) Op.PUT.getCode();
store.delete(lockKey);
LogUtils.info(log, "PessimisticRollBack key is {}, forUpdateTs:{}, jobId:{}", Arrays.toString(key), forUpdateTs, jobId);
CommonId partId = param.getPartId();
if (partId == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ public static List<Object[]> getGlobalVariablesList() {
values.add(new Object[]{"dingo_constraint_check_in_place", "off"});
values.add(new Object[]{"dingo_enable_async_commit", "on"});
values.add(new Object[]{"enable_use_cross_node_commit", "off"});
values.add(new Object[]{"enable_async_commit_sleep", "off"});
values.add(new Object[]{"async_commit_sleep_time", String.valueOf(5000)});
return values;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private static void checkSecondaries(int isolationLevel, long startTs, LockInfo
long asyncCommitTs = txnCheckSecondaryLocksResponse.getCommitTs();
List<LockInfo> locks = txnCheckSecondaryLocksResponse.getLocks();
// Check locks to see if any have been committed or rolled back.
if (locks.size() < secondKeys.size()) {
if (locks == null || locks.size() < secondKeys.size()) {
// A lock is missing - the transaction must either have been rolled back or committed.
if (!asyncResolveData.isMissingLock()) {
// commitTS == 0 => lock has been rolled back.
Expand Down
Loading