diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java index 88a6f9b24..923de2cfb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewer.java @@ -322,11 +322,13 @@ public boolean updateLease( long startTime = System.currentTimeMillis(); boolean success = false; + Lease authoritativeLeaseCopy = authoritativeLease.copy(); try { log.info("Updating lease from {} to {}", authoritativeLease, lease); synchronized (authoritativeLease) { authoritativeLease.update(lease); boolean updatedLease = leaseRefresher.updateLease(authoritativeLease); + log.info("lucienlu-test: after updateLease"); if (updatedLease) { // Updates increment the counter authoritativeLease.lastCounterIncrementNanos(System.nanoTime()); @@ -358,6 +360,10 @@ public boolean updateLease( success = true; return updatedLease; } + } catch (Exception e) { + // On failure, revert changes to in memory lease + authoritativeLease.update(authoritativeLeaseCopy); + throw e; } finally { MetricsUtil.addSuccessAndLatency(scope, "UpdateLease", success, startTime, MetricsLevel.DETAILED); MetricsUtil.endScope(scope); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java index 8a700c190..16a443c17 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRenewerTest.java @@ -34,13 +34,16 @@ import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -63,7 +66,7 @@ private static Lease newLease(String leaseKey) { System.nanoTime(), null, null, - null, + 1L, new HashSet<>(), new HashSet<>(), null, @@ -134,4 +137,35 @@ public void testLeaseRenewerDoesNotRenewExpiredLease() // Clear the list to avoid triggering expectation mismatch in after(). leasesToRenew.clear(); } + + @Test + public void testLeaseRenewerDoesNotUpdateInMemoryLeaseIfDDBFailsUpdate() + throws DependencyException, InvalidStateException, ProvisionedThroughputException { + String leaseKey = "leaseToUpdate"; + Lease lease = newLease(leaseKey); + lease.checkpoint(ExtendedSequenceNumber.LATEST); + leasesToRenew = new ArrayList<>(); + leasesToRenew.add(lease); + renewer.addLeasesToRenew(leasesToRenew); + + doReturn(true).when(leaseRefresher).renewLease(lease); + renewer.renewLeases(); + + Lease updatedLease = newLease(leaseKey); + updatedLease.checkpoint(ExtendedSequenceNumber.TRIM_HORIZON); + + doThrow(new DependencyException(new RuntimeException())) + .when(leaseRefresher) + .updateLease(updatedLease); + + try { + UUID concurrencyToken = renewer.getCurrentlyHeldLease(leaseKey).concurrencyToken(); + renewer.updateLease(updatedLease, concurrencyToken, "test", "dummyShardId"); + fail(); + } catch (DependencyException e) { + // expected + } + assertEquals(0L, (long) lease.leaseCounter()); // leaseCounter should not be incremented due to DDB failure + assertEquals(ExtendedSequenceNumber.LATEST, lease.checkpoint()); + } }