Skip to content

Commit

Permalink
Merge pull request #5 from ThisaruGuruge/fix-subscriber-not-updating
Browse files Browse the repository at this point in the history
Fix Previously Subscribed Subscribers Cannot Receive Events from New Publishers
  • Loading branch information
ThisaruGuruge authored Oct 27, 2022
2 parents a259911 + 19e92e7 commit 81516ec
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 13 deletions.
4 changes: 2 additions & 2 deletions ballerina/Ballerina.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
org = "xlibb"
name = "pubsub"
version = "1.1.0"
version = "1.1.1"
authors = ["xlibb"]
keywords = ["pubsub"]
repository = "https://github.com/xlibb/module-pubsub"
Expand All @@ -10,4 +10,4 @@ distribution = "2201.2.0"
icon = "icon.png"

[[platform.java11.dependency]]
path = "../native/build/libs/pubsub-native-1.1.0.jar"
path = "../native/build/libs/pubsub-native-1.1.1-SNAPSHOT.jar"
4 changes: 2 additions & 2 deletions ballerina/Dependencies.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ modules = [
[[package]]
org = "xlibb"
name = "pipe"
version = "1.1.1"
version = "1.1.2"
dependencies = [
{org = "ballerina", name = "jballerina.java"}
]
Expand All @@ -51,7 +51,7 @@ modules = [
[[package]]
org = "xlibb"
name = "pubsub"
version = "1.1.0"
version = "1.1.1"
dependencies = [
{org = "ballerina", name = "jballerina.java"},
{org = "ballerina", name = "lang.runtime"},
Expand Down
14 changes: 10 additions & 4 deletions ballerina/tests/errors_test.bal
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ function testSubscribingToClosedPubSub() returns error? {
function testClosingStreams() returns error? {
PubSub pubsub = new();
string topic = "topic";
check pubsub.publish(topic, "hello", 5);
stream<string, error?> newStream1 = check pubsub.subscribe(topic);
stream<string, error?> newStream2 = check pubsub.subscribe(topic);
check newStream2.close();
Expand Down Expand Up @@ -133,7 +132,15 @@ function testClosingPubSubWithClosedStream() returns error? {
PubSub pubsub = new();
stream<string, error?> subscriberStream = check pubsub.subscribe("topic");
check subscriberStream.close();
check pubsub.forceShutdown();
Error? result = pubsub.forceShutdown();
test:assertTrue(result is Error);
string expectedMessage = "Failed to shut down the pubsub";
Error pubsubError = <Error>result;
test:assertEquals(pubsubError.message(), expectedMessage);
error? cause = pubsubError.cause();
test:assertTrue(cause is error);
expectedMessage = "Closing of a closed pipe is not allowed.";
test:assertEquals((<error>cause).message(), expectedMessage);
}

@test:Config {
Expand All @@ -143,10 +150,9 @@ function testTimeoutErrorsInPubSub() returns error? {
PubSub pubsub = new();
string topicName = "topic";
string expectedValue = "event";
Error? publish = pubsub.publish(topicName, expectedValue, 1);
stream<string, error?> subscriber_1 = check pubsub.subscribe("topic", 0);
stream<string, error?> subscriber_2 = check pubsub.subscribe("topic", 5);
publish = pubsub.publish(topicName, expectedValue, 1);
Error? publish = pubsub.publish(topicName, expectedValue, 1);
test:assertTrue(publish is Error);
string expectedError = "Failed to publish events to some subscribers.";
test:assertEquals((<Error>publish).message(), expectedError);
Expand Down
3 changes: 0 additions & 3 deletions ballerina/tests/pubsub_test.bal
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ function testPubSub() returns error? {
json expectedValue = {
hello: "World"
};
check pubsub.publish(topicName, expectedValue, 5);
stream<json, error?> subscribe = check pubsub.subscribe(topicName, 10);
check pubsub.publish(topicName, expectedValue, 5);
record {|json value;|}? msg = check subscribe.next();
Expand All @@ -39,7 +38,6 @@ function testPubSub() returns error? {
}
function testGracefulShutdown() returns error? {
PubSub pubsub = new();
check pubsub.publish("topic", "hello", 5);
stream<string, error?> subscribe = check pubsub.subscribe("topic");
check pubsub.gracefulShutdown();
record {|string value;|}|error? msg = subscribe.next();
Expand All @@ -63,7 +61,6 @@ function testGracefulShutdown() returns error? {
}
function testForceShutdown() returns error? {
PubSub pubsub = new();
check pubsub.publish("topic", "hello", 5);
stream<string, error?> subscribe = check pubsub.subscribe("topic");
check pubsub.forceShutdown();
record {|string value;|}|error? msg = subscribe.next();
Expand Down
6 changes: 5 additions & 1 deletion changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ This file contains all the notable changes done to the Ballerina Pub/Sub package

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Fixed
- [[#4] Fix Previous Subscribers not Receiving Events from New Publishers](https://github.com/xlibb/module-pubsub/issues/4)

## [1.1.0] - 2022-10-27

### Changed

- Moved to the xlibb organization in Ballerina central. Version 1 was released in the `nuvindu` organization in Ballerina central.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ stdlibTimeVersion=2.2.2
observeVersion=1.0.5
observeInternalVersion=1.0.4

pipeVersion=1.1.1
pipeVersion=1.1.2
2 changes: 2 additions & 0 deletions native/src/main/java/io/xlibb/pubsub/PubSub.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ public static void addSubscriber(BObject pubsub, BString topicName, BObject pipe
}
BArray pipes = ValueCreator.createArrayValue(TypeCreator.createArrayType(pipe.getType()));
pipes.append(pipe);
topics.put(topicName, pipes);
} else {
BArray pipes = (BArray) topics.get(topicName);
pipes.append(pipe);
topics.put(topicName, pipes);
}
}
}

0 comments on commit 81516ec

Please sign in to comment.