diff --git a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/SimpleStreamSender.java b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/SimpleStreamSender.java index cffde820..c1481814 100644 --- a/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/SimpleStreamSender.java +++ b/stream-parent/stream-client/src/main/java/org/interledger/stream/sender/SimpleStreamSender.java @@ -63,6 +63,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; /** @@ -565,7 +566,7 @@ private void sendMoneyPacketized() { final InterledgerCondition executionCondition; executionCondition = generatedFulfillableFulfillment(sharedSecret, streamPacketData).getCondition(); - final InterledgerPreparePacket preparePacket = InterledgerPreparePacket.builder() + final Supplier preparePacket = () -> InterledgerPreparePacket.builder() .destination(destinationAddress) .amount(amountToSend) .executionCondition(executionCondition) @@ -577,7 +578,7 @@ private void sendMoneyPacketized() { // capture // rollback - final PrepareAmounts prepareAmounts = PrepareAmounts.from(preparePacket, streamPacket); + final PrepareAmounts prepareAmounts = PrepareAmounts.from(preparePacket.get(), streamPacket); if (!paymentTracker.auth(prepareAmounts)) { // if we can't auth, just skip this iteration of the loop until everything else completes tryingToSendTooMuch = true; @@ -608,17 +609,18 @@ private void sendMoneyPacketized() { @VisibleForTesting void schedule( final AtomicBoolean timeoutReached, - final InterledgerPreparePacket preparePacket, + final Supplier preparePacketSupplier, final StreamPacket streamPacket, final PrepareAmounts prepareAmounts ) { Objects.requireNonNull(timeoutReached); - Objects.requireNonNull(preparePacket); + Objects.requireNonNull(preparePacketSupplier); Objects.requireNonNull(streamPacket); Objects.requireNonNull(prepareAmounts); try { executorService.submit(() -> { + InterledgerPreparePacket preparePacket = preparePacketSupplier.get(); if (!timeoutReached.get()) { try { InterledgerResponsePacket responsePacket = sendPacketAndCheckForFailure(preparePacket); @@ -649,10 +651,10 @@ void schedule( } catch (RejectedExecutionException e) { // If we get here, it means the task was unable to be scheduled, so we need to unwind the congestion // controller to prevent deadlock. - congestionController.reject(preparePacket.getAmount(), InterledgerRejectPacket.builder() + congestionController.reject(preparePacketSupplier.get().getAmount(), InterledgerRejectPacket.builder() .code(InterledgerErrorCode.F00_BAD_REQUEST) .message( - String.format("Unable to schedule sendMoney task. preparePacket=%s error=%s", preparePacket, + String.format("Unable to schedule sendMoney task. preparePacket=%s error=%s", preparePacketSupplier.get(), e.getMessage()) ) .build()); diff --git a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SendMoneyAggregatorTest.java b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SendMoneyAggregatorTest.java index 649794f1..40a2ffa4 100644 --- a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SendMoneyAggregatorTest.java +++ b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SendMoneyAggregatorTest.java @@ -44,6 +44,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -52,6 +53,7 @@ import java.io.IOException; import java.time.Duration; +import java.time.Instant; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -96,7 +98,6 @@ public class SendMoneyAggregatorTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - when(congestionControllerMock.getMaxAmount()).thenReturn(UnsignedLong.ONE); when(streamEncryptionServiceMock.encrypt(any(), any())).thenReturn(new byte[32]); when(streamEncryptionServiceMock.decrypt(any(), any())).thenReturn(new byte[32]); @@ -238,7 +239,7 @@ public void failureToSchedulePutsMoneyBack() { .build(); expectedException.expect(RejectedExecutionException.class); - sendMoneyAggregator.schedule(new AtomicBoolean(false), prepare, sampleStreamPacket(), + sendMoneyAggregator.schedule(new AtomicBoolean(false), () -> prepare, sampleStreamPacket(), PrepareAmounts.from(prepare, sampleStreamPacket())); verify(congestionControllerMock, times(1)).reject(UnsignedLong.ONE, expectedReject); } @@ -541,6 +542,45 @@ public InterledgerResponsePacket answer(InvocationOnMock invocationOnMock) throw verify(linkMock, atMost(11)).sendPacket(any()); } + /** + * Test that if there is a delay between when a packet is schedule and when the executor sends the packet, + * that the prepare packet expiresAt is calculated just before sending (not before scheduling). + */ + @Test + public void packetExpiryIsComputedJustBeforeSending() throws InterruptedException { + SendMoneyRequest request = SendMoneyRequest.builder() + .sharedSecret(sharedSecret) + .sourceAddress(sourceAddress) + .senderAmountMode(SenderAmountMode.SENDER_AMOUNT) + .destinationAddress(destinationAddress) + .amount(originalAmountToSend) + .timeout(Optional.of(Duration.ofSeconds(60))) + .denomination(Denominations.XRP) + .paymentTracker(new FixedSenderAmountPaymentTracker(UnsignedLong.valueOf(10l), new NoOpExchangeRateCalculator())) + .build(); + + // use a SleepyExecutorService with a non-trivial sleep so that we can verify that expiresAt is calculated + // AFTER the scheduler woke up to process the packet. + long scheduleDelayMillis = 100; + ExecutorService executor = new SleepyExecutorService(Executors.newFixedThreadPool(1), scheduleDelayMillis); + Instant minExpectedExpiresAt = DateUtils.now().plusMillis(scheduleDelayMillis); + + this.sendMoneyAggregator = new SendMoneyAggregator( + executor, streamConnectionMock, streamCodecContextMock, linkMock, congestionControllerMock, + streamEncryptionServiceMock, request, Optional.empty()); + + sendMoneyAggregator.schedule(new AtomicBoolean(false), + () -> samplePreparePacket(), sampleStreamPacket(), + PrepareAmounts.builder().amountToSend(UnsignedLong.ONE).minimumAmountToAccept(UnsignedLong.ONE).build()); + Thread.sleep(scheduleDelayMillis); + ArgumentCaptor prepareCaptor = ArgumentCaptor.forClass(InterledgerPreparePacket.class); + + verify(linkMock).sendPacket(prepareCaptor.capture()); + + Instant actualExpiresAt = prepareCaptor.getValue().getExpiresAt(); + assertThat(actualExpiresAt).isAfter(minExpectedExpiresAt); + } + /** * Helper method to set the soldierOn mock values for clearer test coverage. */ diff --git a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SleepyExecutorService.java b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SleepyExecutorService.java index 608a099e..9566cf31 100644 --- a/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SleepyExecutorService.java +++ b/stream-parent/stream-client/src/test/java/org/interledger/stream/sender/SleepyExecutorService.java @@ -20,9 +20,15 @@ class SleepyExecutorService implements ExecutorService { private final ExecutorService delegate; private final long sleep; - SleepyExecutorService(ExecutorService delegate, long sleep) { + /** + * Wraps an ExecutorService and sleeps before delegating submit/execute calls to the underlying delegate. + * + * @param delegate wrapped ExecutorService that is delegated to + * @param sleepMillis how long to sleep before execution + */ + SleepyExecutorService(ExecutorService delegate, long sleepMillis) { this.delegate = delegate; - this.sleep = sleep; + this.sleep = sleepMillis; } @Override