Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get unassigned leases in leasesToTake #1320

Merged
merged 2 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,15 @@ public void update(final Lease lease) {
childShardIds(lease.childShardIds);
}

/**
* @param leaseDurationNanos duration of lease in nanoseconds
* @param asOfNanos time in nanoseconds to check expiration as-of
* @return true if lease lease is ready to be taken
*/
public boolean isAvailable(long leaseDurationNanos, long asOfNanos) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer a Duration type instead of long nanos.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have this all the way up I'm not going to change it right now.

return isUnassigned() || isExpired(leaseDurationNanos, asOfNanos);
}

/**
* @param leaseDurationNanos duration of lease in nanoseconds
* @param asOfNanos time in nanoseconds to check expiration as-of
Expand All @@ -190,6 +199,13 @@ public boolean isExpired(long leaseDurationNanos, long asOfNanos) {
}
}

/**
* @return true if lease is not currently owned
*/
private boolean isUnassigned() {
return leaseOwner == null;
}

/**
* Sets lastCounterIncrementNanos
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
updateAllLeases(timeProvider);
success = true;
} catch (ProvisionedThroughputException e) {
log.info("Worker {} could not find expired leases on try {} out of {}", workerIdentifier, i,
log.info("Worker {} could not find available leases on try {} out of {}", workerIdentifier, i,
TAKE_RETRIES);
lastException = e;
}
Expand All @@ -203,9 +203,9 @@ synchronized Map<String, Lease> takeLeases(Callable<Long> timeProvider)
return takenLeases;
}

List<Lease> expiredLeases = getExpiredLeases();
List<Lease> availableLeases = getAvailableLeases();

Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases, timeProvider);
Set<Lease> leasesToTake = computeLeasesToTake(availableLeases, timeProvider);
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);

Set<String> untakenLeaseKeys = new HashSet<>();
Expand Down Expand Up @@ -309,7 +309,7 @@ static String stringJoin(Collection<String> strings, String delimiter) {
*
* @param timeProvider callable that supplies the current time
*
* @return list of expired leases, possibly empty, never null.
* @return list of available leases, possibly empty, never null.
*
* @throws ProvisionedThroughputException if listLeases fails due to lack of provisioned throughput
* @throws InvalidStateException if the lease table does not exist
Expand Down Expand Up @@ -370,45 +370,36 @@ private void updateAllLeases(Callable<Long> timeProvider)
}

/**
* @return list of leases that were expired as of our last scan.
* @return list of leases that available as of our last scan.
*/
private List<Lease> getExpiredLeases() {
List<Lease> expiredLeases = new ArrayList<>();

for (Lease lease : allLeases.values()) {
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) {
expiredLeases.add(lease);
}
}

return expiredLeases;
private List<Lease> getAvailableLeases() {
return allLeases.values().stream()
.filter(lease->lease.isAvailable(leaseDurationNanos, lastScanTimeNanos))
.collect(Collectors.toList());
}

/**
* Compute the number of leases I should try to take based on the state of the system.
*
* @param expiredLeases list of leases we determined to be expired
* @param availableLeases list of leases we determined to be available
* @param timeProvider callable which returns the current time in nanos
* @return set of leases to take.
*/
@VisibleForTesting
Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timeProvider) throws DependencyException {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
Set<Lease> computeLeasesToTake(List<Lease> availableLeases, Callable<Long> timeProvider) throws DependencyException {
Map<String, Integer> leaseCounts = computeLeaseCounts(availableLeases);
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);

final int numAvailableLeases = expiredLeases.size();
int numLeases = 0;
int numWorkers = 0;
final int numAvailableLeases = availableLeases.size();
final int numLeases = allLeases.size();
final int numWorkers = leaseCounts.size();
int numLeasesToReachTarget = 0;
int leaseSpillover = 0;
int veryOldLeaseCount = 0;

try {
numLeases = allLeases.size();
numWorkers = leaseCounts.size();

if (numLeases == 0) {
// If there are no leases, I shouldn't try to take any.
return leasesToTake;
Expand Down Expand Up @@ -475,19 +466,19 @@ Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timePro
return leasesToTake;
}

// Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases);
// Shuffle availableLeases so workers don't all try to contend for the same leases.
Collections.shuffle(availableLeases);

if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
if (availableLeases.size() > 0) {
// If we have available leases, get up to <needed> leases from availableLeases
for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(availableLeases.remove(0));
}
} else {
// If there are no expired leases and we need a lease, consider stealing.
// If there are no available leases and we need a lease, consider stealing.
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
for (Lease leaseToSteal : leasesToSteal) {
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
log.info("Worker {} needed {} leases but none were available, so it will steal lease {} from {}",
workerIdentifier, numLeasesToReachTarget, leaseToSteal.leaseKey(),
leaseToSteal.leaseOwner());
leasesToTake.add(leaseToSteal);
Expand All @@ -502,7 +493,7 @@ Set<Lease> computeLeasesToTake(List<Lease> expiredLeases, Callable<Long> timePro
leasesToTake.size());
}
} finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("ExpiredLeases", numAvailableLeases, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);
Expand Down Expand Up @@ -598,19 +589,19 @@ private List<Lease> chooseLeasesToSteal(Map<String, Integer> leaseCounts, int ne
* Count leases by host. Always includes myself, but otherwise only includes hosts that are currently holding
* leases.
*
* @param expiredLeases list of leases that are currently expired
* @param availableLeases list of leases that are currently available
* @return map of workerIdentifier to lease count
*/
@VisibleForTesting
Map<String, Integer> computeLeaseCounts(List<Lease> expiredLeases) {
Map<String, Integer> computeLeaseCounts(List<Lease> availableLeases) {
Map<String, Integer> leaseCounts = new HashMap<>();
// The set will give much faster lookup than the original list, an
// important optimization when the list is large
Set<Lease> expiredLeasesSet = new HashSet<>(expiredLeases);
Set<Lease> availableLeasesSet = new HashSet<>(availableLeases);

// Compute the number of leases per worker by looking through allLeases and ignoring leases that have expired.
// Compute the number of leases per worker by looking through allLeases and ignoring leases that are available.
for (Lease lease : allLeases.values()) {
if (!expiredLeasesSet.contains(lease)) {
if (!availableLeasesSet.contains(lease)) {
String leaseOwner = lease.leaseOwner();
Integer oldCount = leaseCounts.get(leaseOwner);
if (oldCount == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package software.amazon.kinesis.leases;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;

import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;

@RunWith(MockitoJUnitRunner.class)
public class LeaseTest {

private static final long MOCK_CURRENT_TIME = 10000000000L;
private static final long LEASE_DURATION_MILLIS = 1000L;

private static final long LEASE_DURATION_NANOS = TimeUnit.MILLISECONDS.toNanos(LEASE_DURATION_MILLIS);

//Write a unit test for software.amazon.kinesis.leases.Lease to test leaseOwner as null and epired
@Test
public void testLeaseOwnerNullAndExpired() {
long expiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS - 1;
Lease lease = createLease(null, "leaseKey", expiredTime);
Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME));
Assert.assertNull(lease.leaseOwner());
}

@Test
public void testLeaseOwnerNotNullAndExpired() {
long expiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS - 1;
Lease lease = createLease("leaseOwner", "leaseKey", expiredTime);
Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME));
Assert.assertEquals("leaseOwner", lease.leaseOwner());
}

@Test
public void testLeaseOwnerNotNullAndNotExpired() {
long notExpiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS + 1;
Lease lease = createLease("leaseOwner", "leaseKey", notExpiredTime);
Assert.assertFalse(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME));
Assert.assertEquals("leaseOwner", lease.leaseOwner());
}

@Test
public void testLeaseOwnerNullAndNotExpired() {
long notExpiredTime = MOCK_CURRENT_TIME - LEASE_DURATION_NANOS + 1;
Lease lease = createLease(null, "leaseKey", notExpiredTime);
Assert.assertTrue(lease.isAvailable(LEASE_DURATION_NANOS, MOCK_CURRENT_TIME));
Assert.assertNull(lease.leaseOwner());
}

private Lease createLease(String leaseOwner, String leaseKey, long lastCounterIncrementNanos) {
final Lease lease = new Lease();
lease.checkpoint(new ExtendedSequenceNumber("checkpoint"));
lease.ownerSwitchesSinceCheckpoint(0L);
lease.leaseCounter(0L);
lease.leaseOwner(leaseOwner);
lease.parentShardIds(Collections.singleton("parentShardId"));
lease.childShardIds(new HashSet<>());
lease.leaseKey(leaseKey);
lease.lastCounterIncrementNanos(lastCounterIncrementNanos);
return lease;
}
}
Loading