From 9d65a87c8657d17d9f0f42976bdba21cfbe8df1a Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 17 Aug 2023 09:03:02 +0800 Subject: [PATCH 01/13] fix --- .../pulsar/client/impl/RawReaderTest.java | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index a201ef104e7b3..658421ed1ee04 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -116,7 +116,7 @@ public static String extractKey(RawMessage m) { @Test public void testHasMessageAvailableWithoutBatch() throws Exception { int numKeys = 10; - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/testHasMessageAvailableWithoutBatch"; Set keys = publishMessages(topic, numKeys); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); while (true) { @@ -133,12 +133,13 @@ public void testHasMessageAvailableWithoutBatch() throws Exception { } } Assert.assertTrue(keys.isEmpty()); + reader.closeAsync().get(3, TimeUnit.SECONDS); } @Test public void testHasMessageAvailableWithBatch() throws Exception { int numKeys = 20; - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch"; Set keys = publishMessages(topic, numKeys, true); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); int messageCount = 0; @@ -163,6 +164,7 @@ public void testHasMessageAvailableWithBatch() throws Exception { } Assert.assertEquals(messageCount, numKeys); Assert.assertTrue(keys.isEmpty()); + reader.closeAsync().get(3, TimeUnit.SECONDS); } @Test @@ -185,12 +187,13 @@ public void testRawReader() throws Exception { } } Assert.assertTrue(keys.isEmpty()); + reader.closeAsync().get(3, TimeUnit.SECONDS); } @Test public void testSeekToStart() throws Exception { int numKeys = 10; - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/testSeekToStart"; publishMessages(topic, numKeys); @@ -219,12 +222,13 @@ public void testSeekToStart() throws Exception { } } Assert.assertTrue(readKeys.isEmpty()); + reader.closeAsync().get(3, TimeUnit.SECONDS); } @Test public void testSeekToMiddle() throws Exception { int numKeys = 10; - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/testSeekToMiddle"; publishMessages(topic, numKeys); @@ -262,6 +266,7 @@ public void testSeekToMiddle() throws Exception { } } Assert.assertTrue(readKeys.isEmpty()); + reader.closeAsync().get(3, TimeUnit.SECONDS); } /** @@ -270,7 +275,7 @@ public void testSeekToMiddle() throws Exception { @Test public void testFlowControl() throws Exception { int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5; - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/testFlowControl"; publishMessages(topic, numMessages); @@ -296,6 +301,7 @@ public void testFlowControl() throws Exception { } Assert.assertEquals(timeouts, 1); Assert.assertEquals(keys.size(), numMessages); + reader.closeAsync().get(3, TimeUnit.SECONDS); } @Test @@ -324,6 +330,7 @@ public void testFlowControlBatch() throws Exception { } } Assert.assertEquals(keys.size(), numMessages); + reader.closeAsync().get(3, TimeUnit.SECONDS); } @Test @@ -459,7 +466,7 @@ public void testAcknowledgeWithProperties() throws Exception { Assert.assertEquals( ledger.openCursor(subscription).getProperties().get("foobar"), Long.valueOf(0xdeadbeefdecaL))); - + reader.closeAsync().get(3, TimeUnit.SECONDS); } @Test From 13fafe19fbf55658a7327a361ba86cf88f662911 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 17 Aug 2023 14:42:49 +0800 Subject: [PATCH 02/13] address comment --- .../pulsar/client/impl/RawReaderTest.java | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 658421ed1ee04..4be3be5c0ec3b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeoutException; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.MessageId; @@ -116,7 +117,7 @@ public static String extractKey(RawMessage m) { @Test public void testHasMessageAvailableWithoutBatch() throws Exception { int numKeys = 10; - String topic = "persistent://my-property/my-ns/testHasMessageAvailableWithoutBatch"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); Set keys = publishMessages(topic, numKeys); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); while (true) { @@ -139,7 +140,7 @@ public void testHasMessageAvailableWithoutBatch() throws Exception { @Test public void testHasMessageAvailableWithBatch() throws Exception { int numKeys = 20; - String topic = "persistent://my-property/my-ns/testHasMessageAvailableWithBatch"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); Set keys = publishMessages(topic, numKeys, true); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); int messageCount = 0; @@ -171,7 +172,7 @@ public void testHasMessageAvailableWithBatch() throws Exception { public void testRawReader() throws Exception { int numKeys = 10; - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); Set keys = publishMessages(topic, numKeys); @@ -193,7 +194,7 @@ public void testRawReader() throws Exception { @Test public void testSeekToStart() throws Exception { int numKeys = 10; - String topic = "persistent://my-property/my-ns/testSeekToStart"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); publishMessages(topic, numKeys); @@ -228,7 +229,7 @@ public void testSeekToStart() throws Exception { @Test public void testSeekToMiddle() throws Exception { int numKeys = 10; - String topic = "persistent://my-property/my-ns/testSeekToMiddle"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); publishMessages(topic, numKeys); @@ -275,7 +276,7 @@ public void testSeekToMiddle() throws Exception { @Test public void testFlowControl() throws Exception { int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5; - String topic = "persistent://my-property/my-ns/testFlowControl"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); publishMessages(topic, numMessages); @@ -307,7 +308,7 @@ public void testFlowControl() throws Exception { @Test public void testFlowControlBatch() throws Exception { int numMessages = RawReaderImpl.DEFAULT_RECEIVER_QUEUE_SIZE * 5; - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); publishMessages(topic, numMessages, true); @@ -335,7 +336,7 @@ public void testFlowControlBatch() throws Exception { @Test public void testBatchingExtractKeysAndIds() throws Exception { - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); try (Producer producer = pulsarClient.newProducer().topic(topic) .maxPendingMessages(3) @@ -370,7 +371,7 @@ public void testBatchingExtractKeysAndIds() throws Exception { @Test public void testBatchingRebatch() throws Exception { - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); try (Producer producer = pulsarClient.newProducer().topic(topic) .maxPendingMessages(3) @@ -399,7 +400,7 @@ public void testBatchingRebatch() throws Exception { @Test public void testBatchingRebatchWithBrokerEntryMetadata() throws Exception { - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); try (Producer producer = pulsarClient.newProducer().topic(topic) .maxPendingMessages(3) @@ -435,7 +436,7 @@ public void testBatchingRebatchWithBrokerEntryMetadata() throws Exception { public void testAcknowledgeWithProperties() throws Exception { int numKeys = 10; - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); Set keys = publishMessages(topic, numKeys); @@ -473,7 +474,7 @@ public void testAcknowledgeWithProperties() throws Exception { public void testReadCancellationOnClose() throws Exception { int numKeys = 10; - String topic = "persistent://my-property/my-ns/my-raw-topic"; + String topic = "persistent://my-property/my-ns/" + BrokerTestUtil.newUniqueName("reader"); publishMessages(topic, numKeys/2); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); From 84beb1fda13afe765c9fe9c06281e20105064001 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 17 Aug 2023 21:51:38 +0800 Subject: [PATCH 03/13] add log --- .../apache/pulsar/client/impl/RawReaderTest.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 4be3be5c0ec3b..7fb0ee50cb37b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.pulsar.broker.BrokerTestUtil; @@ -144,10 +145,13 @@ public void testHasMessageAvailableWithBatch() throws Exception { Set keys = publishMessages(topic, numKeys, true); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); int messageCount = 0; + AtomicBoolean error = new AtomicBoolean(false); + List ids = new ArrayList<>(); + List keys2 = new ArrayList<>(); while (true) { boolean hasMsg = reader.hasMessageAvailableAsync().get(); if (hasMsg && (messageCount == numKeys)) { - Assert.fail("HasMessageAvailable shows still has message when there is no message"); + error.set(true); } if (hasMsg) { try (RawMessage m = reader.readNextAsync().get()) { @@ -155,7 +159,11 @@ public void testHasMessageAvailableWithBatch() throws Exception { messageCount += meta.getNumMessagesInBatch(); RawBatchConverter.extractIdsAndKeysAndSize(m).forEach(batchInfo -> { String key = batchInfo.getMiddle(); - Assert.assertTrue(keys.remove(key)); + keys2.add(key); + ids.add(batchInfo.getLeft()); + if (!error.get()) { + Assert.assertTrue(keys.remove(key)); + } }); } @@ -163,6 +171,9 @@ public void testHasMessageAvailableWithBatch() throws Exception { break; } } + if (error.get()) { + Assert.assertEquals(ids, keys2); + } Assert.assertEquals(messageCount, numKeys); Assert.assertTrue(keys.isEmpty()); reader.closeAsync().get(3, TimeUnit.SECONDS); From 6b2d1b96cbb972b56c3cd709809a935df6c91b00 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 18 Aug 2023 12:24:31 +0800 Subject: [PATCH 04/13] add log --- .../java/org/apache/pulsar/client/impl/RawReaderTest.java | 4 ++++ .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 7fb0ee50cb37b..0f5f060956386 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.pulsar.broker.BrokerTestUtil; @@ -53,6 +54,7 @@ import org.testng.annotations.Test; @Test(groups = "broker-impl") +@Slf4j public class RawReaderTest extends MockedPulsarServiceBaseTest { private static final String subscription = "foobar-sub"; @@ -64,6 +66,7 @@ public void setup() throws Exception { "org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor", "org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor" )); + conf.setSystemTopicEnabled(false); conf.setExposingBrokerEntryMetadataToClientEnabled(true); super.internalSetup(); @@ -161,6 +164,7 @@ public void testHasMessageAvailableWithBatch() throws Exception { String key = batchInfo.getMiddle(); keys2.add(key); ids.add(batchInfo.getLeft()); + log.info("ids : {}, keys2 : {}", ids, keys2); if (!error.get()) { Assert.assertTrue(keys.remove(key)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a929fe9aa6bb2..418202a64e25b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2319,12 +2319,16 @@ public CompletableFuture hasMessageAvailableAsync() { } else { // read before, use lastDequeueMessage for comparison if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { + log.info("hasMoreMessages, lastMessageIdInBroker: {}, lastDequeuedMessageId: {}", + lastMessageIdInBroker, lastDequeuedMessageId); completehasMessageAvailableWithValue(booleanFuture, true); return booleanFuture; } getLastMessageIdAsync().thenAccept(messageId -> { lastMessageIdInBroker = messageId; + log.info("getLastMessageIdAsync, lastMessageIdInBroker : {}, lastDequeuedMessageId : {}", + lastMessageIdInBroker, lastDequeuedMessageId); completehasMessageAvailableWithValue(booleanFuture, hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)); }).exceptionally(e -> { From 75ece2537b17a301f683db26a5d728a57971f4fb Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 18 Aug 2023 14:18:11 +0800 Subject: [PATCH 05/13] remove timeout --- .../java/org/apache/pulsar/client/impl/NegativeAcksTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 876fa98bce414..a6b77a1c72775 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -321,7 +321,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception { negativeAcksTracker.close(); } - @Test(timeOut = 10000) + @Test public void testNegativeAcksWithBatchAckEnabled() throws Exception { cleanup(); conf.setAcknowledgmentAtBatchIndexLevelEnabled(true); From d31bed6275846adf10b1990c1e2f8f786420ada7 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 18 Aug 2023 14:56:11 +0800 Subject: [PATCH 06/13] fix test --- .../org/apache/pulsar/client/impl/TransactionEndToEndTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 34cc3bc1ca526..ebd58616b064d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -741,6 +741,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri for (int i = 0; i < messageCnt; i++){ producer.newMessage().value("hello".getBytes()).sendAsync(); } + producer.flush(); Message message = null; Thread.sleep(1000L); for (int i = 0; i < messageCnt; i++) { From 33681e65a3aa8da5e59e5615a34ddbdddde246f5 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 18 Aug 2023 16:33:46 +0800 Subject: [PATCH 07/13] fix test --- .../org/apache/pulsar/client/impl/TransactionEndToEndTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index ebd58616b064d..b51874a251248 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -529,7 +529,7 @@ protected void txnAckTest(boolean batchEnable, int maxBatchSize, // after transaction abort, the messages could be received Transaction commitTxn = getTxn(); for (int i = 0; i < messageCnt; i++) { - message = consumer.receive(2, TimeUnit.SECONDS); + message = consumer.receive(); Assert.assertNotNull(message); consumer.acknowledgeAsync(message.getMessageId(), commitTxn).get(); log.info("receive msgId: {}, count: {}", message.getMessageId(), i); From b64ee2223ac0557522ed75e59cde8e190c7b0fcf Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Fri, 18 Aug 2023 21:10:06 +0800 Subject: [PATCH 08/13] add log --- .../main/java/org/apache/pulsar/client/impl/RawReaderImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 9a1c972b2cc98..1fe2e538a87e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -158,6 +158,7 @@ void tryCompletePending() { MessageIdData messageId = messageAndCnx.msg.getMessageIdData(); lastDequeuedMessageId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), messageId.getPartition(), numMsg - 1); + log.info("update lastDequeuedMessageId: {}, lastDequeuedMessageId"); ClientCnx currentCnx = cnx(); if (currentCnx == messageAndCnx.cnx) { From 71403af88601b76c4a293c35118f10e91f2fbab8 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Sat, 19 Aug 2023 07:52:48 +0800 Subject: [PATCH 09/13] update --- .../apache/pulsar/client/impl/RawReaderImpl.java | 1 - .../apache/pulsar/client/impl/RawReaderTest.java | 15 ++------------- .../apache/pulsar/client/impl/ConsumerImpl.java | 4 ---- 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 1fe2e538a87e9..9a1c972b2cc98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -158,7 +158,6 @@ void tryCompletePending() { MessageIdData messageId = messageAndCnx.msg.getMessageIdData(); lastDequeuedMessageId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), messageId.getPartition(), numMsg - 1); - log.info("update lastDequeuedMessageId: {}, lastDequeuedMessageId"); ClientCnx currentCnx = cnx(); if (currentCnx == messageAndCnx.cnx) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 0f5f060956386..cba20b5743e22 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -148,13 +148,10 @@ public void testHasMessageAvailableWithBatch() throws Exception { Set keys = publishMessages(topic, numKeys, true); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); int messageCount = 0; - AtomicBoolean error = new AtomicBoolean(false); - List ids = new ArrayList<>(); - List keys2 = new ArrayList<>(); while (true) { boolean hasMsg = reader.hasMessageAvailableAsync().get(); if (hasMsg && (messageCount == numKeys)) { - error.set(true); + Assert.fail("HasMessageAvailable shows still has message when there is no message"); } if (hasMsg) { try (RawMessage m = reader.readNextAsync().get()) { @@ -162,12 +159,7 @@ public void testHasMessageAvailableWithBatch() throws Exception { messageCount += meta.getNumMessagesInBatch(); RawBatchConverter.extractIdsAndKeysAndSize(m).forEach(batchInfo -> { String key = batchInfo.getMiddle(); - keys2.add(key); - ids.add(batchInfo.getLeft()); - log.info("ids : {}, keys2 : {}", ids, keys2); - if (!error.get()) { - Assert.assertTrue(keys.remove(key)); - } + Assert.assertTrue(keys.remove(key)); }); } @@ -175,9 +167,6 @@ public void testHasMessageAvailableWithBatch() throws Exception { break; } } - if (error.get()) { - Assert.assertEquals(ids, keys2); - } Assert.assertEquals(messageCount, numKeys); Assert.assertTrue(keys.isEmpty()); reader.closeAsync().get(3, TimeUnit.SECONDS); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 418202a64e25b..a929fe9aa6bb2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2319,16 +2319,12 @@ public CompletableFuture hasMessageAvailableAsync() { } else { // read before, use lastDequeueMessage for comparison if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { - log.info("hasMoreMessages, lastMessageIdInBroker: {}, lastDequeuedMessageId: {}", - lastMessageIdInBroker, lastDequeuedMessageId); completehasMessageAvailableWithValue(booleanFuture, true); return booleanFuture; } getLastMessageIdAsync().thenAccept(messageId -> { lastMessageIdInBroker = messageId; - log.info("getLastMessageIdAsync, lastMessageIdInBroker : {}, lastDequeuedMessageId : {}", - lastMessageIdInBroker, lastDequeuedMessageId); completehasMessageAvailableWithValue(booleanFuture, hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)); }).exceptionally(e -> { From 06f9c56d9af5edee5cd491b2e4e8c898607d5fcb Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Sat, 19 Aug 2023 08:00:59 +0800 Subject: [PATCH 10/13] fix checkstyle --- .../test/java/org/apache/pulsar/client/impl/RawReaderTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index cba20b5743e22..4af61f0192359 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -30,7 +30,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.commons.lang3.tuple.ImmutableTriple; From 28ad51803a34612a7920650e44b0415f2322d9df Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Sat, 19 Aug 2023 11:24:15 +0800 Subject: [PATCH 11/13] fix checkstyle --- .../org/apache/pulsar/client/impl/MultiTopicsReaderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java index a41aac9bd457f..d9bbc6a9d742a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java @@ -211,7 +211,7 @@ public void testReadMessageWithBatchingWithMessageInclusive() throws Exception { reader.close(); } - @Test(timeOut = 10000) + @Test public void testReaderWithTimeLong() throws Exception { String ns = "my-property/my-ns"; String topic = "persistent://" + ns + "/testReadFromPartition" + UUID.randomUUID(); From 2618d2c783d8c60f164f2a6f5cb3b0dadd75f992 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Sat, 19 Aug 2023 14:44:07 +0800 Subject: [PATCH 12/13] add log --- .../java/org/apache/pulsar/client/impl/RawReaderImpl.java | 2 +- .../java/org/apache/pulsar/client/impl/RawReaderTest.java | 8 +++++--- .../java/org/apache/pulsar/client/impl/ConsumerImpl.java | 4 ++++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 9a1c972b2cc98..e42880fa68ab7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -158,7 +158,7 @@ void tryCompletePending() { MessageIdData messageId = messageAndCnx.msg.getMessageIdData(); lastDequeuedMessageId = new BatchMessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), messageId.getPartition(), numMsg - 1); - + log.info("update lastDequeuedMessageId: {}, lastDequeuedMessageId"); ClientCnx currentCnx = cnx(); if (currentCnx == messageAndCnx.cnx) { increaseAvailablePermits(currentCnx, numMsg); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 4af61f0192359..82859dc9b8ded 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -147,17 +147,19 @@ public void testHasMessageAvailableWithBatch() throws Exception { Set keys = publishMessages(topic, numKeys, true); RawReader reader = RawReader.create(pulsarClient, topic, subscription).get(); int messageCount = 0; + List ids = new ArrayList<>(); + List keys2 = new ArrayList<>(); while (true) { boolean hasMsg = reader.hasMessageAvailableAsync().get(); - if (hasMsg && (messageCount == numKeys)) { - Assert.fail("HasMessageAvailable shows still has message when there is no message"); - } if (hasMsg) { try (RawMessage m = reader.readNextAsync().get()) { MessageMetadata meta = Commands.parseMessageMetadata(m.getHeadersAndPayload()); messageCount += meta.getNumMessagesInBatch(); RawBatchConverter.extractIdsAndKeysAndSize(m).forEach(batchInfo -> { String key = batchInfo.getMiddle(); + keys2.add(key); + ids.add(batchInfo.getLeft()); + log.info("ids : {}, keys2 : {}", ids, keys2); Assert.assertTrue(keys.remove(key)); }); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a929fe9aa6bb2..09f383dbe7b98 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -2319,11 +2319,15 @@ public CompletableFuture hasMessageAvailableAsync() { } else { // read before, use lastDequeueMessage for comparison if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) { + log.info("hasMoreMessages, lastMessageIdInBroker: {}, lastDequeuedMessageId: {}", + lastMessageIdInBroker, lastDequeuedMessageId); completehasMessageAvailableWithValue(booleanFuture, true); return booleanFuture; } getLastMessageIdAsync().thenAccept(messageId -> { + log.info("getLastMessageIdAsync, lastMessageIdInBroker : {}, lastDequeuedMessageId : {}", + lastMessageIdInBroker, lastDequeuedMessageId); lastMessageIdInBroker = messageId; completehasMessageAvailableWithValue(booleanFuture, hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)); From 207fba57cfb17ea4043b24d473435db349cac92f Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Sat, 19 Aug 2023 15:15:10 +0800 Subject: [PATCH 13/13] remove some ci --- .github/workflows/pulsar-ci.yaml | 847 +------------------------------ 1 file changed, 1 insertion(+), 846 deletions(-) diff --git a/.github/workflows/pulsar-ci.yaml b/.github/workflows/pulsar-ci.yaml index 64b85cb14c580..314193d7ce108 100644 --- a/.github/workflows/pulsar-ci.yaml +++ b/.github/workflows/pulsar-ci.yaml @@ -90,80 +90,7 @@ jobs: run: | build/pulsar_ci_tool.sh check_ready_to_test - build-and-license-check: - needs: preconditions - name: Build and License check - env: - JOB_NAME: Build and License check - GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - runs-on: ubuntu-22.04 - timeout-minutes: 60 - if: ${{ needs.preconditions.outputs.docs_only != 'true' }} - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Setup ssh access to build runner VM - # ssh access is enabled for builds in own forks - if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - uses: ./.github/actions/ssh-access - continue-on-error: true - with: - limit-access-to-actor: true - - - name: Cache local Maven repository - uses: actions/cache@v3 - timeout-minutes: 5 - with: - path: | - ~/.m2/repository/*/*/* - !~/.m2/repository/org/apache/pulsar - key: ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-dependencies-core-modules- - - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: 17 - - - name: Check source code license headers - run: mvn -B -T 8 -ntp initialize apache-rat:check license:check - - - name: Check source code style - run: mvn -B -T 8 -ntp initialize checkstyle:check - - - name: Build core-modules - run: | - mvn -B -T 1C -ntp -Pcore-modules,-main clean install -DskipTests -Dlicense.skip=true -Drat.skip=true -Dcheckstyle.skip=true - - - name: Check binary licenses - run: src/check-binary-license.sh ./distribution/server/target/apache-pulsar-*-bin.tar.gz - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: Save maven build results to Github artifact cache so that the results can be reused - run: | - cd $HOME - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh store_tar_to_github_actions_artifacts pulsar-maven-repository-binaries \ - tar --exclude '.m2/repository/org/apache/pulsar/pulsar-*-distribution' \ - -I zstd -cf - .m2/repository/org/apache/pulsar - cd $GITHUB_WORKSPACE - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh store_tar_to_github_actions_artifacts pulsar-server-distribution \ - tar -I zstd -cf - distribution/server/target/apache-pulsar-*-bin.tar.gz - - - name: Wait for ssh connection when build fails - # ssh access is enabled for builds in own forks - uses: ./.github/actions/ssh-access - if: ${{ failure() && github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - continue-on-error: true - with: - action: wait + unit-tests: @@ -453,779 +380,7 @@ jobs: with: action: wait - integration-tests: - name: CI - Integration - ${{ matrix.name }} - runs-on: ubuntu-22.04 - timeout-minutes: ${{ matrix.timeout || 60 }} - needs: ['preconditions', 'pulsar-java-test-image'] - if: ${{ needs.preconditions.outputs.docs_only != 'true' }} - env: - JOB_NAME: CI - Integration - ${{ matrix.name }} - PULSAR_TEST_IMAGE_NAME: apachepulsar/java-test-image:latest - GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - strategy: - fail-fast: false - matrix: - include: - - name: Backwards Compatibility - group: BACKWARDS_COMPAT - no_coverage: true - - - name: Cli - group: CLI - - - name: Messaging - group: MESSAGING - - - name: LoadBalance - group: LOADBALANCE - no_coverage: true - - - name: Shade on Java 8 - group: SHADE_RUN - runtime_jdk: 8 - setup: ./build/run_integration_group.sh SHADE_BUILD - no_coverage: true - - - name: Shade on Java 11 - group: SHADE_RUN - runtime_jdk: 11 - setup: ./build/run_integration_group.sh SHADE_BUILD - no_coverage: true - - - name: Shade on Java 17 - group: SHADE_RUN - setup: ./build/run_integration_group.sh SHADE_BUILD - no_coverage: true - - - name: Standalone - group: STANDALONE - - - name: Transaction - group: TRANSACTION - - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Setup ssh access to build runner VM - # ssh access is enabled for builds in own forks - if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - uses: ./.github/actions/ssh-access - continue-on-error: true - with: - limit-access-to-actor: true - - - name: Cache Maven dependencies - uses: actions/cache@v3 - timeout-minutes: 5 - with: - path: | - ~/.m2/repository/*/*/* - !~/.m2/repository/org/apache/pulsar - key: ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-dependencies-core-modules- - - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: 17 - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: Restore maven build results from Github artifact cache - run: | - cd $HOME - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_tar_from_github_actions_artifacts pulsar-maven-repository-binaries - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh snapshot_pulsar_maven_artifacts - - - name: Load docker image apachepulsar/java-test-image:latest from Github artifact cache - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh docker_load_image_from_github_actions_artifacts pulsar-java-test-image - - - name: Run setup commands - if: ${{ matrix.setup }} - run: | - ${{ matrix.setup }} - - - name: Set up runtime JDK ${{ matrix.runtime_jdk }} - uses: actions/setup-java@v3 - if: ${{ matrix.runtime_jdk }} - with: - distribution: 'temurin' - java-version: ${{ matrix.runtime_jdk }} - - - name: Run integration test group '${{ matrix.group }}' - run: | - if [[ "${{ matrix.no_coverage }}" != "true" && "${{ needs.preconditions.outputs.collect_coverage }}" == "true" ]]; then - coverage_args="--coverage" - fi - ./build/run_integration_group.sh ${{ matrix.group }} $coverage_args - - - name: Upload coverage to build artifacts - if: ${{ !matrix.no_coverage && needs.preconditions.outputs.collect_coverage == 'true' }} - run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh upload_inttest_coverage_files ${{ matrix.group }} - - - name: print JVM thread dumps when cancelled - if: cancelled() - run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh print_thread_dumps - - - name: Aggregates all test reports to ./test-reports and ./surefire-reports directories - if: ${{ always() }} - uses: ./.github/actions/copy-test-reports - - - name: Publish Test Report - uses: apache/pulsar-test-infra/action-junit-report@master - if: ${{ always() }} - with: - report_paths: 'test-reports/TEST-*.xml' - annotate_only: 'true' - - - name: Upload Surefire reports - uses: actions/upload-artifact@v3 - if: ${{ !success() }} - with: - name: Integration-${{ matrix.group }}-surefire-reports - path: surefire-reports - retention-days: 7 - - - name: Upload container logs - uses: actions/upload-artifact@v3 - if: ${{ !success() }} - continue-on-error: true - with: - name: Integration-${{ matrix.group }}-container-logs - path: tests/integration/target/container-logs - retention-days: 7 - - - name: Wait for ssh connection when build fails - # ssh access is enabled for builds in own forks - uses: ./.github/actions/ssh-access - if: ${{ failure() && github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - continue-on-error: true - with: - action: wait - - integration-tests-upload-coverage: - name: CI - Integration - Upload Coverage - runs-on: ubuntu-22.04 - timeout-minutes: 30 - needs: ['preconditions', 'integration-tests'] - if: ${{ needs.preconditions.outputs.collect_coverage == 'true' }} - env: - PULSAR_TEST_IMAGE_NAME: apachepulsar/java-test-image:latest - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Setup ssh access to build runner VM - # ssh access is enabled for builds in own forks - if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - uses: ./.github/actions/ssh-access - continue-on-error: true - with: - limit-access-to-actor: true - - - name: Cache Maven dependencies - uses: actions/cache@v3 - timeout-minutes: 5 - with: - path: | - ~/.m2/repository/*/*/* - !~/.m2/repository/org/apache/pulsar - key: ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-dependencies-core-modules- - - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: 17 - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: Restore maven build results from Github artifact cache - run: | - cd $HOME - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_tar_from_github_actions_artifacts pulsar-maven-repository-binaries - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh snapshot_pulsar_maven_artifacts - - - name: Load docker image apachepulsar/java-test-image:latest from Github artifact cache - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh docker_load_image_from_github_actions_artifacts pulsar-java-test-image - - - name: Restore coverage files from build artifacts and create Jacoco reports - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_inttest_coverage_files - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh create_inttest_coverage_report - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh create_test_coverage_report - cd $GITHUB_WORKSPACE/target - zip -qr jacoco_test_coverage_report_inttests.zip jacoco_test_coverage_report jacoco_inttest_coverage_report || true - - - name: Upload Jacoco report files to build artifacts - uses: actions/upload-artifact@v3 - with: - name: Jacoco-coverage-report-inttests - path: target/jacoco_test_coverage_report_inttests.zip - retention-days: 3 - - - name: Upload to Codecov - uses: ./.github/actions/upload-coverage - with: - flags: inttests - - - name: Delete coverage files from build artifacts - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh delete_inttest_coverage_files - - - name: Wait for ssh connection when build fails - # ssh access is enabled for builds in own forks - uses: ./.github/actions/ssh-access - if: ${{ failure() && github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - continue-on-error: true - with: - action: wait - - delete-integration-test-docker-image-artifact: - name: "Delete integration test docker image artifact" - runs-on: ubuntu-22.04 - timeout-minutes: 10 - needs: [ - 'preconditions', - 'integration-tests', - 'integration-tests-upload-coverage' - ] - if: ${{ needs.preconditions.outputs.docs_only != 'true' }} - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: Delete docker image from GitHub Actions Artifacts - run: | - gh-actions-artifact-client.js delete pulsar-java-test-image.zst - - pulsar-test-latest-version-image: - name: Build Pulsar docker image - runs-on: ubuntu-22.04 - timeout-minutes: 60 - needs: ['preconditions', 'build-and-license-check'] - if: ${{ needs.preconditions.outputs.docs_only != 'true' }} - env: - GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Setup ssh access to build runner VM - # ssh access is enabled for builds in own forks - if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - uses: ./.github/actions/ssh-access - continue-on-error: true - with: - limit-access-to-actor: true - - - name: Clean Disk - uses: ./.github/actions/clean-disk - - - name: Cache local Maven repository - uses: actions/cache@v3 - timeout-minutes: 5 - with: - path: | - ~/.m2/repository/*/*/* - !~/.m2/repository/org/apache/pulsar - key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} - ${{ runner.os }}-m2-dependencies-core-modules- - - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: 17 - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: restore maven build results from Github artifact cache - run: | - cd $HOME - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_tar_from_github_actions_artifacts pulsar-maven-repository-binaries - - - name: Pick ubuntu mirror for the docker image build - run: | - # pick the closest ubuntu mirror and set it to UBUNTU_MIRROR environment variable - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh pick_ubuntu_mirror - - - name: Build latest-version-image docker image - run: | - # build docker image - # include building of Pulsar SQL, Connectors, Offloaders and server distros - mvn -B -am -pl pulsar-sql/presto-distribution,distribution/io,distribution/offloaders,distribution/server,distribution/shell,tests/docker-images/latest-version-image install \ - -DUBUNTU_MIRROR="${UBUNTU_MIRROR}" -DUBUNTU_SECURITY_MIRROR="${UBUNTU_SECURITY_MIRROR}" \ - -Pmain,docker -Dmaven.test.skip=true -Ddocker.squash=true \ - -Dspotbugs.skip=true -Dlicense.skip=true -Dcheckstyle.skip=true -Drat.skip=true - - # check full build artifacts licenses - - name: Check binary licenses - run: src/check-binary-license.sh ./distribution/server/target/apache-pulsar-*-bin.tar.gz && src/check-binary-license.sh ./distribution/shell/target/apache-pulsar-shell-*-bin.tar.gz - - - name: Clean up disk space - run: | - # release disk space since saving docker image consumes local disk space - # - echo "::group::Available diskspace before cleaning" - time df -BM / /mnt - echo "::endgroup::" - echo "::group::Clean build directory" - # docker build changes some files to root ownership, fix this before deleting files - sudo chown -R $USER:$GROUP . - # clean build directories - time git clean -fdx - echo "::endgroup::" - echo "::group::Available diskspace after cleaning build directory" - time df -BM / /mnt - echo "::endgroup::" - echo "::group::Delete maven repository" - # delete maven repository - time rm -rf ~/.m2/repository - echo "::endgroup::" - echo "::group::Available diskspace after cleaning maven repository" - time df -BM / /mnt - echo "::endgroup::" - - - name: save docker image apachepulsar/pulsar-test-latest-version:latest to Github artifact cache - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh docker_save_image_to_github_actions_artifacts apachepulsar/pulsar-test-latest-version:latest pulsar-test-latest-version-image - - - name: Wait for ssh connection when build fails - # ssh access is enabled for builds in own forks - uses: ./.github/actions/ssh-access - if: ${{ failure() && github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - continue-on-error: true - with: - action: wait - - system-tests: - name: CI - System - ${{ matrix.name }} - runs-on: ubuntu-22.04 - timeout-minutes: 60 - needs: ['preconditions', 'pulsar-test-latest-version-image'] - if: ${{ needs.preconditions.outputs.docs_only != 'true' }} - env: - JOB_NAME: CI - System - ${{ matrix.name }} - PULSAR_TEST_IMAGE_NAME: apachepulsar/pulsar-test-latest-version:latest - GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - strategy: - fail-fast: false - matrix: - include: - - name: Tiered FileSystem - group: TIERED_FILESYSTEM - - - name: Tiered JCloud - group: TIERED_JCLOUD - - - name: Function - group: FUNCTION - - - name: Schema - group: SCHEMA - - - name: Pulsar Connectors - Thread - group: PULSAR_CONNECTORS_THREAD - - - name: Pulsar Connectors - Process - group: PULSAR_CONNECTORS_PROCESS - - - name: Pulsar IO - group: PULSAR_IO - - - name: Sql - group: SQL - - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Setup ssh access to build runner VM - # ssh access is enabled for builds in own forks - if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - uses: ./.github/actions/ssh-access - continue-on-error: true - with: - limit-access-to-actor: true - - - name: Cache local Maven repository - uses: actions/cache@v3 - timeout-minutes: 5 - with: - path: | - ~/.m2/repository/*/*/* - !~/.m2/repository/org/apache/pulsar - key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} - ${{ runner.os }}-m2-dependencies-core-modules- - - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: 17 - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: Restore maven build results from Github artifact cache - run: | - cd $HOME - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_tar_from_github_actions_artifacts pulsar-maven-repository-binaries - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh snapshot_pulsar_maven_artifacts - - - name: Load docker image apachepulsar/pulsar-test-latest-version:latest from Github artifact cache - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh docker_load_image_from_github_actions_artifacts pulsar-test-latest-version-image - - - name: Run setup commands - if: ${{ matrix.setup }} - run: | - ${{ matrix.setup }} - - - name: Run system test group '${{ matrix.group }}' - run: | - if [[ "${{ matrix.no_coverage }}" != "true" && "${{ needs.preconditions.outputs.collect_coverage }}" == "true" ]]; then - coverage_args="--coverage" - fi - ./build/run_integration_group.sh ${{ matrix.group }} $coverage_args - - - name: Upload coverage to build artifacts - if: ${{ !matrix.no_coverage && needs.preconditions.outputs.collect_coverage == 'true' }} - run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh upload_systest_coverage_files ${{ matrix.group }} - - - name: print JVM thread dumps when cancelled - if: cancelled() - run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh print_thread_dumps - - - name: Aggregates all test reports to ./test-reports and ./surefire-reports directories - if: ${{ always() }} - uses: ./.github/actions/copy-test-reports - - - name: Publish Test Report - uses: apache/pulsar-test-infra/action-junit-report@master - if: ${{ always() }} - with: - report_paths: 'test-reports/TEST-*.xml' - annotate_only: 'true' - - - name: Upload container logs - uses: actions/upload-artifact@v3 - if: ${{ !success() }} - continue-on-error: true - with: - name: System-${{ matrix.group }}-container-logs - path: tests/integration/target/container-logs - retention-days: 7 - - - name: Upload Surefire reports - uses: actions/upload-artifact@v3 - if: ${{ !success() }} - with: - name: System-${{ matrix.name }}-surefire-reports - path: surefire-reports - retention-days: 7 - - - name: Wait for ssh connection when build fails - # ssh access is enabled for builds in own forks - uses: ./.github/actions/ssh-access - if: ${{ failure() && github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - continue-on-error: true - with: - action: wait - - system-tests-upload-coverage: - name: CI - System - Upload Coverage - runs-on: ubuntu-22.04 - timeout-minutes: 30 - needs: ['preconditions', 'system-tests'] - if: ${{ needs.preconditions.outputs.collect_coverage == 'true' }} - env: - PULSAR_TEST_IMAGE_NAME: apachepulsar/pulsar-test-latest-version:latest - - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Setup ssh access to build runner VM - # ssh access is enabled for builds in own forks - if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - uses: ./.github/actions/ssh-access - continue-on-error: true - with: - limit-access-to-actor: true - - - name: Cache local Maven repository - uses: actions/cache@v3 - timeout-minutes: 5 - with: - path: | - ~/.m2/repository/*/*/* - !~/.m2/repository/org/apache/pulsar - key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} - ${{ runner.os }}-m2-dependencies-core-modules- - - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: 17 - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: Restore maven build results from Github artifact cache - run: | - cd $HOME - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_tar_from_github_actions_artifacts pulsar-maven-repository-binaries - - - name: Load docker image apachepulsar/pulsar-test-latest-version:latest from Github artifact cache - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh docker_load_image_from_github_actions_artifacts pulsar-test-latest-version-image - - - name: Restore coverage files from build artifacts and create Jacoco reports - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_systest_coverage_files - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh create_inttest_coverage_report - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh create_test_coverage_report - cd $GITHUB_WORKSPACE/target - zip -qr jacoco_test_coverage_report_systests.zip jacoco_test_coverage_report jacoco_inttest_coverage_report || true - - - name: Upload Jacoco report files to build artifacts - uses: actions/upload-artifact@v3 - with: - name: Jacoco-coverage-report-systests - path: target/jacoco_test_coverage_report_systests.zip - retention-days: 3 - - - name: Upload to Codecov - uses: ./.github/actions/upload-coverage - with: - flags: systests - - - name: Delete coverage files from build artifacts - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh delete_systest_coverage_files - - - name: Wait for ssh connection when build fails - # ssh access is enabled for builds in own forks - uses: ./.github/actions/ssh-access - if: ${{ failure() && github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - continue-on-error: true - with: - action: wait - - flaky-system-tests: - name: CI Flaky - System - ${{ matrix.name }} - runs-on: ubuntu-22.04 - timeout-minutes: 60 - needs: [ 'preconditions', 'pulsar-test-latest-version-image' ] - if: ${{ needs.preconditions.outputs.docs_only != 'true' }} - env: - JOB_NAME: CI Flaky - System - ${{ matrix.name }} - PULSAR_TEST_IMAGE_NAME: apachepulsar/pulsar-test-latest-version:latest - GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - strategy: - fail-fast: false - matrix: - include: - - name: Plugin - group: PLUGIN - - - name: Pulsar IO - Oracle - group: PULSAR_IO_ORA - - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Setup ssh access to build runner VM - # ssh access is enabled for builds in own forks - if: ${{ github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - uses: ./.github/actions/ssh-access - continue-on-error: true - with: - limit-access-to-actor: true - - - name: Cache local Maven repository - uses: actions/cache@v3 - timeout-minutes: 5 - with: - path: | - ~/.m2/repository/*/*/* - !~/.m2/repository/org/apache/pulsar - key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-dependencies-core-modules-${{ hashFiles('**/pom.xml') }} - ${{ runner.os }}-m2-dependencies-core-modules- - - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: 17 - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: Restore maven build results from Github artifact cache - run: | - cd $HOME - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh restore_tar_from_github_actions_artifacts pulsar-maven-repository-binaries - - - name: Load docker image apachepulsar/pulsar-test-latest-version:latest from Github artifact cache - run: | - $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh docker_load_image_from_github_actions_artifacts pulsar-test-latest-version-image - - - name: Run setup commands - if: ${{ matrix.setup }} - run: | - ${{ matrix.setup }} - - - name: Run system test group '${{ matrix.group }}' - run: | - ./build/run_integration_group.sh ${{ matrix.group }} - - - name: print JVM thread dumps when cancelled - if: cancelled() - run: $GITHUB_WORKSPACE/build/pulsar_ci_tool.sh print_thread_dumps - - - name: Aggregates all test reports to ./test-reports and ./surefire-reports directories - if: ${{ always() }} - uses: ./.github/actions/copy-test-reports - - - name: Publish Test Report - uses: apache/pulsar-test-infra/action-junit-report@master - if: ${{ always() }} - with: - report_paths: 'test-reports/TEST-*.xml' - annotate_only: 'true' - - - name: Upload container logs - uses: actions/upload-artifact@v3 - if: ${{ !success() }} - continue-on-error: true - with: - name: System-${{ matrix.group }}-container-logs - path: tests/integration/target/container-logs - retention-days: 7 - - - name: Upload Surefire reports - uses: actions/upload-artifact@v3 - if: ${{ !success() }} - with: - name: System-${{ matrix.name }}-surefire-reports - path: surefire-reports - retention-days: 7 - - - name: Wait for ssh connection when build fails - # ssh access is enabled for builds in own forks - uses: ./.github/actions/ssh-access - if: ${{ failure() && github.repository != 'apache/pulsar' && github.event_name == 'pull_request' }} - continue-on-error: true - with: - action: wait - - delete-system-test-docker-image-artifact: - name: "Delete system test docker image artifact" - runs-on: ubuntu-22.04 - timeout-minutes: 10 - needs: [ - 'preconditions', - 'system-tests', - 'system-tests-upload-coverage', - 'flaky-system-tests' - ] - if: ${{ needs.preconditions.outputs.docs_only != 'true' }} - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Install gh-actions-artifact-client.js - uses: apache/pulsar-test-infra/gh-actions-artifact-client/dist@master - - - name: Delete docker image from GitHub Actions Artifacts - run: | - gh-actions-artifact-client.js delete pulsar-test-latest-version-image.zst - - macos-build: - name: Build Pulsar on MacOS - runs-on: macos-11 - timeout-minutes: 120 - needs: ['preconditions', 'integration-tests'] - if: ${{ needs.preconditions.outputs.docs_only != 'true' }} - env: - GRADLE_ENTERPRISE_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} - steps: - - name: checkout - uses: actions/checkout@v3 - - - name: Tune Runner VM - uses: ./.github/actions/tune-runner-vm - - - name: Cache Maven dependencies - uses: actions/cache@v3 - timeout-minutes: 5 - with: - path: | - ~/.m2/repository/*/*/* - !~/.m2/repository/org/apache/pulsar - key: ${{ runner.os }}-m2-dependencies-all-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-m2-dependencies-all- - - - name: Set up JDK 17 - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: 17 - - name: build package - run: mvn -B clean package -DskipTests -T 1C -ntp owasp-dep-check: name: OWASP dependency check