Skip to content

Commit

Permalink
Bump pulsar to 2.7.1.4-rc-202104151209 (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower authored Apr 19, 2021
1 parent c735b1a commit f79a0ee
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 10 deletions.
2 changes: 1 addition & 1 deletion mqtt-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>2.7.1.3</version>
<version>2.7.1.4-rc-202104151209</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>io.streamnative.pulsar.handlers</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.streamnative.pulsar.handlers.mqtt.utils.NettyUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.zookeeper.KeeperException;

/**
* Publish handler implementation for Qos 1.
Expand All @@ -51,6 +53,13 @@ public void receivePublish(Channel channel, MqttPublishMessage msg) {
sendPubAck(topic, clientId, msg.variableHeader().packetId());
} else {
log.error("[{}] Write {} to Pulsar topic failed.", topic, msg, e);
Throwable cause = e.getCause();
if (cause instanceof BrokerServiceException.ServerMetadataException) {
cause = cause.getCause();
if (cause instanceof KeeperException.NoNodeException) {
channel.close();
}
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import io.streamnative.pulsar.handlers.mqtt.AbstractQosPublishHandler;
import io.streamnative.pulsar.handlers.mqtt.ConnectionDescriptorStore;
import io.streamnative.pulsar.handlers.mqtt.MQTTServerConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;


/**
* Publish handler implementation for Qos 2.
*/
@Slf4j
public class Qos2PublishHandler extends AbstractQosPublishHandler {

public Qos2PublishHandler(PulsarService pulsarService, MQTTServerConfiguration configuration,
Expand All @@ -33,6 +35,7 @@ public Qos2PublishHandler(PulsarService pulsarService, MQTTServerConfiguration c

@Override
public void receivePublish(Channel channel, MqttPublishMessage msg) {

log.error("[{}] Failed to write data due to QoS2 does not support.", msg.variableHeader().topicName());
channel.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public static CompletableFuture<Subscription> getOrCreateSubscription(PulsarServ
promise.complete(subscription);
}
}
}).exceptionally(ex -> {
promise.completeExceptionally(ex);
return null;
});
return promise;
}
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<version>2.7.1.3</version>
<version>2.7.1.4-rc-202104151209</version>
<name>StreamNative :: Pulsar Protocol Handler :: MoP Parent</name>
<description>Parent for MQTT on Pulsar implemented using Pulsar Protocol Handler.</description>

Expand Down Expand Up @@ -48,7 +48,7 @@
<lombok.version>1.18.4</lombok.version>
<mockito.version>2.22.0</mockito.version>
<testng.version>6.14.3</testng.version>
<pulsar.version>2.7.1.3</pulsar.version>
<pulsar.version>2.7.1.4-rc-202104151209</pulsar.version>
<awaitility.version>4.0.2</awaitility.version>
<mqtt.codec.version>4.1.49.Final</mqtt.codec.version>
<commons-lang3.version>3.4</commons-lang3.version>
Expand Down
2 changes: 1 addition & 1 deletion tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-mqtt-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>2.7.1.3</version>
<version>2.7.1.4-rc-202104151209</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -31,13 +31,13 @@
*/
@Slf4j
public class ProxyTest extends MQTTTestBase {
@BeforeClass
@BeforeMethod
@Override
protected void setup() throws Exception {
super.setup();
}

@AfterClass
@AfterMethod
@Override
protected void cleanup() throws Exception {
super.cleanup();
Expand All @@ -53,7 +53,7 @@ public Object[][] mqttTopicNames() {
};
}

@Test(dataProvider = "mqttTopicNames")
@Test(dataProvider = "mqttTopicNames", timeOut = 60000)
public void mqttProxyTest(String topicName) throws Exception {
setBrokerCount(3);
int proxyPort = getProxyPort();
Expand Down

0 comments on commit f79a0ee

Please sign in to comment.