-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Get unassigned leases in leasesToTake #1320
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -171,6 +171,15 @@ public void update(final Lease lease) { | |
childShardIds(lease.childShardIds); | ||
} | ||
|
||
/** | ||
* @param leaseDurationNanos duration of lease in nanoseconds | ||
* @param asOfNanos time in nanoseconds to check expiration as-of | ||
* @return true if lease lease is ready te taken | ||
*/ | ||
public boolean isAvailable(long leaseDurationNanos, long asOfNanos) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer a Duration type instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because we have this all the way up I'm not going to change it right now. |
||
return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos); | ||
} | ||
|
||
/** | ||
* @param leaseDurationNanos duration of lease in nanoseconds | ||
* @param asOfNanos time in nanoseconds to check expiration as-of | ||
|
@@ -190,6 +199,13 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) { | |
} | ||
} | ||
|
||
/** | ||
* @return true if lease is not currently owned | ||
*/ | ||
private boolean isUnassigned() { | ||
return leaseOwner == null; | ||
} | ||
|
||
/** | ||
* Sets lastCounterIncrementNanos | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -186,7 +186,7 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider) | |
updateAllLeases(timeProvider); | ||
success = true; | ||
} catch (ProvisionedThroughputException e) { | ||
log.info("Worker {} could not find expired leases on try {} out of {}", workerIdentifier, i, | ||
log.info("Worker {} could not find available leases on try {} out of {}", workerIdentifier, i, | ||
TAKE_RETRIES); | ||
lastException = e; | ||
} | ||
|
@@ -203,9 +203,9 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider) | |
return takenLeases; | ||
} | ||
|
||
List<Lease> expiredLeases = getExpiredLeases(); | ||
List<Lease> availableLeases = getAvailableLeases(); | ||
|
||
Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases, timeProvider); | ||
Set<Lease> leasesToTake = computeLeasesToTake(availableLeases, timeProvider); | ||
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake); | ||
|
||
Set<String> untakenLeaseKeys = new HashSet<>(); | ||
|
@@ -309,7 +309,7 @@ static String stringJoin(Collection<String> strings, String delimiter) { | |
* | ||
* @param timeProvider callable that supplies the current time | ||
* | ||
* @return list of expired leases, possibly empty, never null. | ||
* @return list of available leases, possibly empty, never null. | ||
* | ||
* @throws ProvisionedThroughputException if listLeases fails due to lack of provisioned throughput | ||
* @throws InvalidStateException if the lease table does not exist | ||
|
@@ -370,45 +370,42 @@ private void updateAllLeases(Callable<Long> timeProvider) | |
} | ||
|
||
/** | ||
* @return list of leases that were expired as of our last scan. | ||
* @return list of leases that available as of our last scan. | ||
*/ | ||
private List<Lease> getExpiredLeases() { | ||
List<Lease> expiredLeases = new ArrayList<>(); | ||
private List<Lease> getAvailableLeases() { | ||
List<Lease> availableLeases = new ArrayList<>(); | ||
|
||
for (Lease lease : allLeases.values()) { | ||
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) { | ||
expiredLeases.add(lease); | ||
if (lease.isAvailable(leaseDurationNanos, lastScanTimeNanos)) { | ||
availableLeases.add(lease); | ||
} | ||
} | ||
|
||
return expiredLeases; | ||
return availableLeases; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in next commit |
||
} | ||
|
||
/** | ||
* Compute the number of leases I should try to take based on the state of the system. | ||
* | ||
* @param expiredLeases list of leases we determined to be expired | ||
* @param availableLeases list of leases we determined to be available | ||
* @param timeProvider callable which returns the current time in nanos | ||
* @return set of leases to take. | ||
*/ | ||
@VisibleForTesting | ||
Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timeProvider) throws DependencyException { | ||
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases); | ||
Set<Lease> computeLeasesToTake(List<Lease> availableLeases, Callable<Long> timeProvider) throws DependencyException { | ||
Map<String, Integer> leaseCounts = computeLeaseCounts(availableLeases); | ||
Set<Lease> leasesToTake = new HashSet<>(); | ||
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION); | ||
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier); | ||
|
||
final int numAvailableLeases = expiredLeases.size(); | ||
int numLeases = 0; | ||
int numWorkers = 0; | ||
final int numAvailableLeases = availableLeases.size(); | ||
final int numLeases = allLeases.size(); | ||
final int numWorkers = leaseCounts.size(); | ||
int numLeasesToReachTarget = 0; | ||
int leaseSpillover = 0; | ||
int veryOldLeaseCount = 0; | ||
|
||
try { | ||
numLeases = allLeases.size(); | ||
numWorkers = leaseCounts.size(); | ||
|
||
if (numLeases == 0) { | ||
// If there are no leases, I shouldn't try to take any. | ||
return leasesToTake; | ||
|
@@ -475,19 +472,19 @@ Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timePro | |
return leasesToTake; | ||
} | ||
|
||
// Shuffle expiredLeases so workers don't all try to contend for the same leases. | ||
Collections.shuffle(expiredLeases); | ||
// Shuffle availableLeases so workers don't all try to contend for the same leases. | ||
Collections.shuffle(availableLeases); | ||
|
||
if (expiredLeases.size() > 0) { | ||
// If we have expired leases, get up to <needed> leases from expiredLeases | ||
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) { | ||
leasesToTake.add(expiredLeases.remove(0)); | ||
if (availableLeases.size() > 0) { | ||
// If we have available leases, get up to <needed> leases from availableLeases | ||
for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) { | ||
leasesToTake.add(availableLeases.remove(0)); | ||
} | ||
} else { | ||
// If there are no expired leases and we need a lease, consider stealing. | ||
// If there are no available leases and we need a lease, consider stealing. | ||
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target); | ||
for (Lease leaseToSteal : leasesToSteal) { | ||
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}", | ||
log.info("Worker {} needed {} leases but none were available, so it will steal lease {} from {}", | ||
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(), | ||
leaseToSteal.leaseOwner()); | ||
leasesToTake.add(leaseToSteal); | ||
|
@@ -502,7 +499,7 @@ Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timePro | |
leasesToTake.size()); | ||
} | ||
} finally { | ||
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); | ||
scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY); | ||
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY); | ||
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED); | ||
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED); | ||
|
@@ -598,19 +595,19 @@ private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int ne | |
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding | ||
* leases. | ||
* | ||
* @param expiredLeases list of leases that are currently expired | ||
* @param availableLeases list of leases that are currently available | ||
* @return map of workerIdentifier to lease count | ||
*/ | ||
@VisibleForTesting | ||
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) { | ||
Map<String, Integer> computeLeaseCounts(List<Lease> availableLeases) { | ||
Map<String, Integer> leaseCounts = new HashMap<>(); | ||
// The set will give much faster lookup than the original list, an | ||
// important optimization when the list is large | ||
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases); | ||
Set<Lease> availableLeasesSet = new HashSet<>(availableLeases); | ||
|
||
// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired. | ||
// Compute the number of leases per worker by looking through allLeases and ignoring leases that are available. | ||
for (Lease lease : allLeases.values()) { | ||
if (!expiredLeasesSet.contains(lease)) { | ||
if (!availableLeasesSet.contains(lease)) { | ||
String leaseOwner = lease.leaseOwner(); | ||
Integer oldCount = leaseCounts.get(leaseOwner); | ||
if (oldCount == null) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package software.amazon.kinesis.leases; | ||
|
||
import org.junit.Assert; | ||
import org.junit.Test; | ||
import org.junit.runner.RunWith; | ||
import org.mockito.runners.MockitoJUnitRunner; | ||
|
||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; | ||
|
||
import java.util.Collections; | ||
import java.util.HashSet; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
@RunWith(MockitoJUnitRunner.class) | ||
public class LeaseTest { | ||
|
||
private static final long MOCK_CURRENT_TIME = 10000000000L; | ||
private static final long LEASE_DURATION_MILLIS = 1000L; | ||
|
||
private static final long LEASE_DURATION_NANOS = TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS); | ||
|
||
//Write a unit test for software.amazon.kinesis.leases.Lease to test leaseOwner as null and epired | ||
@Test | ||
public void testLeaseOwnerNullAndExpired() { | ||
long expiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS - 1; | ||
Lease lease = createLease(null, "leaseKey", expiredTime); | ||
Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME)); | ||
Assert.assertNull(lease.leaseOwner()); | ||
} | ||
|
||
@Test | ||
public void testLeaseOwnerNotNullAndExpired() { | ||
long expiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS - 1; | ||
Lease lease = createLease("leaseOwner", "leaseKey", expiredTime); | ||
Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME)); | ||
Assert.assertEquals("leaseOwner", lease.leaseOwner()); | ||
} | ||
|
||
@Test | ||
public void testLeaseOwnerNotNullAndNotExpired() { | ||
long notExpiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS + 1; | ||
Lease lease = createLease("leaseOwner", "leaseKey", notExpiredTime); | ||
Assert.assertFalse(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME)); | ||
Assert.assertEquals("leaseOwner", lease.leaseOwner()); | ||
} | ||
|
||
@Test | ||
public void testLeaseOwnerNullAndNotExpired() { | ||
long notExpiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS + 1; | ||
Lease lease = createLease(null, "leaseKey", notExpiredTime); | ||
Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME)); | ||
Assert.assertNull(lease.leaseOwner()); | ||
} | ||
|
||
private Lease createLease(String leaseOwner, String leaseKey, long lastCounterIncrementNanos) { | ||
final Lease lease = new Lease(); | ||
lease.checkpoint(new ExtendedSequenceNumber("checkpoint")); | ||
lease.ownerSwitchesSinceCheckpoint(0L); | ||
lease.leaseCounter(0L); | ||
lease.leaseOwner(leaseOwner); | ||
lease.parentShardIds(Collections.singleton("parentShardId")); | ||
lease.childShardIds(new HashSet<>()); | ||
lease.leaseKey(leaseKey); | ||
lease.lastCounterIncrementNanos(lastCounterIncrementNanos); | ||
return lease; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in next commit