|
41 | 41 | import java.util.concurrent.CountDownLatch;
|
42 | 42 | import java.util.concurrent.ExecutorService;
|
43 | 43 | import java.util.concurrent.Executors;
|
| 44 | +import java.util.concurrent.Semaphore; |
44 | 45 | import java.util.concurrent.TimeUnit;
|
45 | 46 | import java.util.concurrent.atomic.AtomicBoolean;
|
46 | 47 | import java.util.concurrent.atomic.AtomicInteger;
|
|
57 | 58 | import org.junit.jupiter.api.extension.ExtendWith;
|
58 | 59 | import org.junit.jupiter.params.ParameterizedTest;
|
59 | 60 | import org.junit.jupiter.params.provider.ValueSource;
|
| 61 | +import wiremock.org.checkerframework.checker.units.qual.A; |
60 | 62 |
|
61 | 63 | @ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
|
62 | 64 | public class StreamProducerTest {
|
@@ -88,6 +90,67 @@ void tearDown() {
|
88 | 90 | environment.close();
|
89 | 91 | }
|
90 | 92 |
|
| 93 | + private static AtomicLong rate() { |
| 94 | + AtomicLong count = new AtomicLong(); |
| 95 | + AtomicLong tick = new AtomicLong(System.nanoTime()); |
| 96 | + |
| 97 | + Executors.newSingleThreadScheduledExecutor() |
| 98 | + .scheduleAtFixedRate( |
| 99 | + () -> { |
| 100 | + long now = System.nanoTime(); |
| 101 | + long before = tick.getAndSet(now); |
| 102 | + long elapsed = now - before; |
| 103 | + long sent = count.getAndSet(0); |
| 104 | + System.out.println("Rate " + (sent * 1_000_000_000L / elapsed) + " msg/s"); |
| 105 | + }, |
| 106 | + 1, |
| 107 | + 1, |
| 108 | + TimeUnit.SECONDS); |
| 109 | + return count; |
| 110 | + } |
| 111 | + |
| 112 | + @Test |
| 113 | + void test() { |
| 114 | + AtomicLong count = rate(); |
| 115 | + Producer producer = environment.producerBuilder().stream(stream) |
| 116 | + .maxUnconfirmedMessages(10) |
| 117 | + .build(); |
| 118 | + |
| 119 | + while(true) { |
| 120 | + producer.send(producer.messageBuilder().build(), s -> { }); |
| 121 | + count.incrementAndGet(); |
| 122 | + } |
| 123 | + |
| 124 | + } |
| 125 | + |
| 126 | + @Test |
| 127 | + void client() throws Exception { |
| 128 | + int permits = 10; |
| 129 | + Semaphore semaphore = new Semaphore(permits); |
| 130 | + Client client = cf.get(new Client.ClientParameters().publishConfirmListener(new Client.PublishConfirmListener() { |
| 131 | + @Override |
| 132 | + public void handle(byte publisherId, long publishingId) { |
| 133 | + semaphore.release(); |
| 134 | + } |
| 135 | + })); |
| 136 | + |
| 137 | + byte pubId = (byte) 0; |
| 138 | + client.declarePublisher(pubId, null, stream); |
| 139 | + |
| 140 | + AtomicLong count = rate(); |
| 141 | + |
| 142 | + List<Message> messages = IntStream.range(0, permits).mapToObj(ignored -> client |
| 143 | + .messageBuilder() |
| 144 | + .addData("hello".getBytes(StandardCharsets.UTF_8)) |
| 145 | + .build()).collect(Collectors.toList()); |
| 146 | + while (true) { |
| 147 | + semaphore.acquire(permits); |
| 148 | + client.publish(pubId, messages); |
| 149 | + count.addAndGet(permits); |
| 150 | + } |
| 151 | + |
| 152 | + } |
| 153 | + |
91 | 154 | @Test
|
92 | 155 | void send() throws Exception {
|
93 | 156 | int batchSize = 10;
|
|
0 commit comments