Skip to content
This repository has been archived by the owner on Jun 1, 2022. It is now read-only.

Commit

Permalink
compute a packet's expiresAt just before sending on the link (#433)
Browse files Browse the repository at this point in the history
* compute a packet's expiresAt just before sending on the link
fixes a bug where the expiresAt could already be expired since it was being computed before calling executorService.submit() and the executorService is queueing up tasks (due to being backed up)
* add sleep so that test more reliably passes on circle

Signed-off-by: nhartner <[email protected]>
  • Loading branch information
nhartner authored Feb 5, 2020
1 parent 9a94323 commit fec7025
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;

/**
Expand Down Expand Up @@ -565,7 +566,7 @@ private void sendMoneyPacketized() {
final InterledgerCondition executionCondition;
executionCondition = generatedFulfillableFulfillment(sharedSecret, streamPacketData).getCondition();

final InterledgerPreparePacket preparePacket = InterledgerPreparePacket.builder()
final Supplier<InterledgerPreparePacket> preparePacket = () -> InterledgerPreparePacket.builder()
.destination(destinationAddress)
.amount(amountToSend)
.executionCondition(executionCondition)
Expand All @@ -577,7 +578,7 @@ private void sendMoneyPacketized() {
// capture
// rollback

final PrepareAmounts prepareAmounts = PrepareAmounts.from(preparePacket, streamPacket);
final PrepareAmounts prepareAmounts = PrepareAmounts.from(preparePacket.get(), streamPacket);
if (!paymentTracker.auth(prepareAmounts)) {
// if we can't auth, just skip this iteration of the loop until everything else completes
tryingToSendTooMuch = true;
Expand Down Expand Up @@ -608,17 +609,18 @@ private void sendMoneyPacketized() {
@VisibleForTesting
void schedule(
final AtomicBoolean timeoutReached,
final InterledgerPreparePacket preparePacket,
final Supplier<InterledgerPreparePacket> preparePacketSupplier,
final StreamPacket streamPacket,
final PrepareAmounts prepareAmounts
) {
Objects.requireNonNull(timeoutReached);
Objects.requireNonNull(preparePacket);
Objects.requireNonNull(preparePacketSupplier);
Objects.requireNonNull(streamPacket);
Objects.requireNonNull(prepareAmounts);

try {
executorService.submit(() -> {
InterledgerPreparePacket preparePacket = preparePacketSupplier.get();
if (!timeoutReached.get()) {
try {
InterledgerResponsePacket responsePacket = sendPacketAndCheckForFailure(preparePacket);
Expand Down Expand Up @@ -649,10 +651,10 @@ void schedule(
} catch (RejectedExecutionException e) {
// If we get here, it means the task was unable to be scheduled, so we need to unwind the congestion
// controller to prevent deadlock.
congestionController.reject(preparePacket.getAmount(), InterledgerRejectPacket.builder()
congestionController.reject(preparePacketSupplier.get().getAmount(), InterledgerRejectPacket.builder()
.code(InterledgerErrorCode.F00_BAD_REQUEST)
.message(
String.format("Unable to schedule sendMoney task. preparePacket=%s error=%s", preparePacket,
String.format("Unable to schedule sendMoney task. preparePacket=%s error=%s", preparePacketSupplier.get(),
e.getMessage())
)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
Expand All @@ -52,6 +53,7 @@

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -96,7 +98,6 @@ public class SendMoneyAggregatorTest {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);

when(congestionControllerMock.getMaxAmount()).thenReturn(UnsignedLong.ONE);
when(streamEncryptionServiceMock.encrypt(any(), any())).thenReturn(new byte[32]);
when(streamEncryptionServiceMock.decrypt(any(), any())).thenReturn(new byte[32]);
Expand Down Expand Up @@ -238,7 +239,7 @@ public void failureToSchedulePutsMoneyBack() {
.build();

expectedException.expect(RejectedExecutionException.class);
sendMoneyAggregator.schedule(new AtomicBoolean(false), prepare, sampleStreamPacket(),
sendMoneyAggregator.schedule(new AtomicBoolean(false), () -> prepare, sampleStreamPacket(),
PrepareAmounts.from(prepare, sampleStreamPacket()));
verify(congestionControllerMock, times(1)).reject(UnsignedLong.ONE, expectedReject);
}
Expand Down Expand Up @@ -541,6 +542,45 @@ public InterledgerResponsePacket answer(InvocationOnMock invocationOnMock) throw
verify(linkMock, atMost(11)).sendPacket(any());
}

/**
* Test that if there is a delay between when a packet is schedule and when the executor sends the packet,
* that the prepare packet expiresAt is calculated just before sending (not before scheduling).
*/
@Test
public void packetExpiryIsComputedJustBeforeSending() throws InterruptedException {
SendMoneyRequest request = SendMoneyRequest.builder()
.sharedSecret(sharedSecret)
.sourceAddress(sourceAddress)
.senderAmountMode(SenderAmountMode.SENDER_AMOUNT)
.destinationAddress(destinationAddress)
.amount(originalAmountToSend)
.timeout(Optional.of(Duration.ofSeconds(60)))
.denomination(Denominations.XRP)
.paymentTracker(new FixedSenderAmountPaymentTracker(UnsignedLong.valueOf(10l), new NoOpExchangeRateCalculator()))
.build();

// use a SleepyExecutorService with a non-trivial sleep so that we can verify that expiresAt is calculated
// AFTER the scheduler woke up to process the packet.
long scheduleDelayMillis = 100;
ExecutorService executor = new SleepyExecutorService(Executors.newFixedThreadPool(1), scheduleDelayMillis);
Instant minExpectedExpiresAt = DateUtils.now().plusMillis(scheduleDelayMillis);

this.sendMoneyAggregator = new SendMoneyAggregator(
executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock,
streamEncryptionServiceMock, request, Optional.empty());

sendMoneyAggregator.schedule(new AtomicBoolean(false),
() -> samplePreparePacket(), sampleStreamPacket(),
PrepareAmounts.builder().amountToSend(UnsignedLong.ONE).minimumAmountToAccept(UnsignedLong.ONE).build());
Thread.sleep(scheduleDelayMillis);
ArgumentCaptor<InterledgerPreparePacket> prepareCaptor = ArgumentCaptor.forClass(InterledgerPreparePacket.class);

verify(linkMock).sendPacket(prepareCaptor.capture());

Instant actualExpiresAt = prepareCaptor.getValue().getExpiresAt();
assertThat(actualExpiresAt).isAfter(minExpectedExpiresAt);
}

/**
* Helper method to set the soldierOn mock values for clearer test coverage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@ class SleepyExecutorService implements ExecutorService {
private final ExecutorService delegate;
private final long sleep;

SleepyExecutorService(ExecutorService delegate, long sleep) {
/**
* Wraps an ExecutorService and sleeps before delegating submit/execute calls to the underlying delegate.
*
* @param delegate wrapped ExecutorService that is delegated to
* @param sleepMillis how long to sleep before execution
*/
SleepyExecutorService(ExecutorService delegate, long sleepMillis) {
this.delegate = delegate;
this.sleep = sleep;
this.sleep = sleepMillis;
}

@Override
Expand Down

0 comments on commit fec7025

Please sign in to comment.