Skip to content

Commit

Permalink
bugfix: revert update of in memory lease if DDB lease renewal throws …
Browse files Browse the repository at this point in the history
…error
  • Loading branch information
lucienlu-aws committed Jun 19, 2024
1 parent 78fb42e commit 5766e39
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,13 @@ public boolean updateLease(

long startTime = System.currentTimeMillis();
boolean success = false;
Lease authoritativeLeaseCopy = authoritativeLease.copy();
try {
log.info("Updating lease from {} to {}", authoritativeLease, lease);
synchronized (authoritativeLease) {
authoritativeLease.update(lease);
boolean updatedLease = leaseRefresher.updateLease(authoritativeLease);
log.info("lucienlu-test: after updateLease");
if (updatedLease) {
// Updates increment the counter
authoritativeLease.lastCounterIncrementNanos(System.nanoTime());
Expand Down Expand Up @@ -358,6 +360,10 @@ public boolean updateLease(
success = true;
return updatedLease;
}
} catch (Exception e) {
// On failure, revert changes to in memory lease
authoritativeLease.update(authoritativeLeaseCopy);
throw e;
} finally {
MetricsUtil.addSuccessAndLatency(scope, "UpdateLease", success, startTime, MetricsLevel.DETAILED);
MetricsUtil.endScope(scope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

Expand All @@ -63,7 +66,7 @@ private static Lease newLease(String leaseKey) {
System.nanoTime(),
null,
null,
null,
1L,
new HashSet<>(),
new HashSet<>(),
null,
Expand Down Expand Up @@ -134,4 +137,35 @@ public void testLeaseRenewerDoesNotRenewExpiredLease()
// Clear the list to avoid triggering expectation mismatch in after().
leasesToRenew.clear();
}

@Test
public void testLeaseRenewerDoesNotUpdateInMemoryLeaseIfDDBFailsUpdate()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
String leaseKey = "leaseToUpdate";
Lease lease = newLease(leaseKey);
lease.checkpoint(ExtendedSequenceNumber.LATEST);
leasesToRenew = new ArrayList<>();
leasesToRenew.add(lease);
renewer.addLeasesToRenew(leasesToRenew);

doReturn(true).when(leaseRefresher).renewLease(lease);
renewer.renewLeases();

Lease updatedLease = newLease(leaseKey);
updatedLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON);

doThrow(new DependencyException(new RuntimeException()))
.when(leaseRefresher)
.updateLease(updatedLease);

try {
UUID concurrencyToken = renewer.getCurrentlyHeldLease(leaseKey).concurrencyToken();
renewer.updateLease(updatedLease, concurrencyToken, "test", "dummyShardId");
fail();
} catch (DependencyException e) {
// expected
}
assertEquals(0L, (long) lease.leaseCounter()); // leaseCounter should not be incremented due to DDB failure
assertEquals(ExtendedSequenceNumber.LATEST, lease.checkpoint());
}
}

0 comments on commit 5766e39

Please sign in to comment.