Skip to content

Commit cf53b40

Browse files
authored
[NEW FEATURE] Add on_cancel callback handler for the reliable_listener [API-2183] (#1215)
* Added the new reliable_listener.h on_cancel callback method. * Updated the reliable topic listener example to also use the `on_cancel` callback.
1 parent 1ef184b commit cf53b40

File tree

5 files changed

+69
-14
lines changed

5 files changed

+69
-14
lines changed

examples/distributed-topic/reliabletopic/Publisher.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ publish_with_default_config()
2121
auto client = hazelcast::new_client().get();
2222

2323
auto topic = client.get_reliable_topic("MyReliableTopic").get();
24-
topic->publish(std::string("My first message")).get();
24+
topic->publish(std::string("My first message with default config")).get();
2525
}
2626

2727
void
@@ -37,7 +37,8 @@ publish_with_non_default_config()
3737

3838
auto topic = client.get_reliable_topic(topicName).get();
3939

40-
topic->publish(std::string("My first message")).get();
40+
topic->publish(std::string("My first message with non-default config"))
41+
.get();
4142
}
4243

4344
int

examples/distributed-topic/reliabletopic/Subscriber.cpp

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
#include <hazelcast/client/topic/reliable_listener.h>
1818

1919
hazelcast::client::topic::reliable_listener
20-
make_listener(std::atomic<int>& n_received_messages, int64_t sequence_id = -1)
20+
make_listener(std::atomic<int>& n_received_messages,
21+
const std::string& topic_name,
22+
int64_t sequence_id = -1)
2123
{
2224
using namespace hazelcast::client::topic;
2325

@@ -27,13 +29,17 @@ make_listener(std::atomic<int>& n_received_messages, int64_t sequence_id = -1)
2729

2830
auto object = message.get_message_object().get<std::string>();
2931
if (object) {
30-
std::cout << "[GenericListener::onMessage] Received message: "
31-
<< *object << " for topic:" << message.get_name();
32+
std::cout << "[on_received] Received message: " << *object
33+
<< " for topic: " << message.get_name() << std::endl;
3234
} else {
33-
std::cout << "[GenericListener::onMessage] Received message with "
34-
"NULL object for topic:"
35-
<< message.get_name();
35+
std::cout << "[on_received] Received message with "
36+
"NULL object for topic: "
37+
<< message.get_name() << std::endl;
3638
}
39+
})
40+
.on_cancel([topic_name]() {
41+
std::cout << "[on_cancel] Cancelling listener for topic "
42+
<< topic_name << std::endl;
3743
});
3844
}
3945

@@ -46,8 +52,8 @@ listen_with_default_config()
4652
auto topic = client.get_reliable_topic(topicName).get();
4753

4854
std::atomic<int> numberOfMessagesReceived{ 0 };
49-
auto listenerId =
50-
topic->add_message_listener(make_listener(numberOfMessagesReceived));
55+
auto listenerId = topic->add_message_listener(
56+
make_listener(numberOfMessagesReceived, topicName));
5157

5258
std::cout << "Registered the listener with listener id:" << listenerId
5359
<< std::endl;
@@ -79,8 +85,8 @@ listen_with_config()
7985
auto topic = client.get_reliable_topic(topicName).get();
8086

8187
std::atomic<int> numberOfMessagesReceived{ 0 };
82-
auto listenerId =
83-
topic->add_message_listener(make_listener(numberOfMessagesReceived));
88+
auto listenerId = topic->add_message_listener(
89+
make_listener(numberOfMessagesReceived, topicName));
8490

8591
std::cout << "Registered the listener with listener id:" << listenerId
8692
<< std::endl;

hazelcast/include/hazelcast/client/reliable_topic.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ class HAZELCAST_API reliable_topic
312312
"Terminating MessageListener %1% on topic: %2%. "
313313
"Reason: HazelcastInstance is shutting down") %
314314
id_ % name_));
315+
break;
315316
case protocol::DISTRIBUTED_OBJECT_DESTROYED:
316317
HZ_LOG(logger_,
317318
finest,
@@ -320,6 +321,7 @@ class HAZELCAST_API reliable_topic
320321
"Terminating MessageListener %1% on topic: %2%. "
321322
"Reason: Topic is destroyed") %
322323
id_ % name_));
324+
break;
323325
default:
324326
HZ_LOG(logger_,
325327
warning,
@@ -339,6 +341,7 @@ class HAZELCAST_API reliable_topic
339341
if (topic_ptr) {
340342
topic_ptr->runners_map_.remove(id_);
341343
}
344+
listener_.on_cancel_();
342345
return true;
343346
}
344347

hazelcast/include/hazelcast/client/topic/reliable_listener.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,37 @@ class HAZELCAST_API reliable_listener final
166166
return std::move(*this);
167167
}
168168

169+
/**
170+
* Set an handler function which will be called when the listener is
171+
* cancelled.
172+
*
173+
* \param h a `void` function object with no parameters
174+
*/
175+
template<typename Handler,
176+
typename = util::enable_if_rvalue_ref_trait<Handler&&>>
177+
reliable_listener& on_cancel(Handler&& h) &
178+
{
179+
on_cancel_ = std::move(h);
180+
return *this;
181+
}
182+
183+
/**
184+
* \copydoc reliable_listener::on_cancel
185+
*/
186+
template<typename Handler,
187+
typename = util::enable_if_rvalue_ref_trait<Handler&&>>
188+
reliable_listener&& on_cancel(Handler&& h) &&
189+
{
190+
on_cancel(std::move(h));
191+
return std::move(*this);
192+
}
193+
169194
private:
170195
using received_handler_t = std::function<void(message&&)>;
171196
using store_sequence_id_handler_t = std::function<void(int64_t)>;
172197
using exception_handler_t =
173198
std::function<bool(const exception::iexception&)>;
199+
using on_cancel_handler_t = std::function<void()>;
174200

175201
bool loss_tolerant_;
176202
int64_t initial_sequence_id_;
@@ -180,6 +206,7 @@ class HAZELCAST_API reliable_listener final
180206
exception_handler_t terminal_{ [](const exception::iexception&) {
181207
return false;
182208
} };
209+
on_cancel_handler_t on_cancel_{util::noop<>};
183210
};
184211

185212
} // namespace topic

hazelcast/test/src/HazelcastTests8.cpp

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,12 +1142,14 @@ class ReliableTopicTest : public ClientTest
11421142
: latch1(latch_count)
11431143
, latch_for_seq_id(latch_count)
11441144
, latch_for_termination(latch_count)
1145+
, latch_for_cancel(latch_count)
11451146
, start_sequence(start_sequence)
11461147
{}
11471148

11481149
boost::latch latch1;
11491150
boost::latch latch_for_seq_id;
11501151
boost::latch latch_for_termination;
1152+
boost::latch latch_for_cancel;
11511153
int64_t start_sequence;
11521154
std::vector<topic::message> messages;
11531155
std::vector<int64_t> seq_ids;
@@ -1178,16 +1180,24 @@ class ReliableTopicTest : public ClientTest
11781180
return true;
11791181
};
11801182

1183+
auto cancel_handler =
1184+
[state]() -> bool {
1185+
state->latch_for_cancel.count_down();
1186+
return true;
1187+
};
1188+
11811189
if (is_lvalue) {
11821190
topic::reliable_listener tmp_listener(false, state->start_sequence);
11831191
return tmp_listener.on_received(std::move(on_received))
11841192
.on_store_sequence_id(std::move(on_store_sequence_id))
1185-
.terminate_on_exception(std::move(terminate_on_exception));
1193+
.terminate_on_exception(std::move(terminate_on_exception))
1194+
.on_cancel(std::move(cancel_handler));
11861195
} else {
11871196
return topic::reliable_listener(false, state->start_sequence)
11881197
.on_received(std::move(on_received))
11891198
.on_store_sequence_id(std::move(on_store_sequence_id))
1190-
.terminate_on_exception(std::move(terminate_on_exception));
1199+
.terminate_on_exception(std::move(terminate_on_exception))
1200+
.on_cancel(std::move(cancel_handler));
11911201
}
11921202
}
11931203

@@ -1248,6 +1258,8 @@ TEST_F(ReliableTopicTest, testBasics)
12481258
// remove listener
12491259
ASSERT_TRUE(topic_->remove_message_listener(listener_id_));
12501260
ASSERT_FALSE(topic_->remove_message_listener(listener_id_));
1261+
1262+
ASSERT_OPEN_EVENTUALLY(state->latch_for_cancel);
12511263
}
12521264

12531265
TEST_F(ReliableTopicTest, testListenerSequence)
@@ -1273,6 +1285,8 @@ TEST_F(ReliableTopicTest, testListenerSequence)
12731285

12741286
// remove listener
12751287
ASSERT_TRUE(topic_->remove_message_listener(listener_id_));
1288+
1289+
ASSERT_OPEN_EVENTUALLY(state->latch_for_cancel);
12761290
}
12771291

12781292
TEST_F(ReliableTopicTest, removeMessageListener_whenExisting)
@@ -1295,6 +1309,8 @@ TEST_F(ReliableTopicTest, removeMessageListener_whenExisting)
12951309
ASSERT_EQ(boost::cv_status::timeout,
12961310
state->latch1.wait_for(boost::chrono::seconds(2)));
12971311
ASSERT_EQ(0, state->messages.size());
1312+
1313+
ASSERT_OPEN_EVENTUALLY(state->latch_for_cancel);
12981314
}
12991315

13001316
TEST_F(ReliableTopicTest, removeMessageListener_whenNonExisting)
@@ -1556,6 +1572,7 @@ TEST_F(ReliableTopicTest, testTerminateCase)
15561572
ASSERT_NO_THROW(topic_->publish(item).get());
15571573

15581574
ASSERT_OPEN_EVENTUALLY(state->latch_for_termination);
1575+
ASSERT_OPEN_EVENTUALLY(state->latch_for_cancel);
15591576

15601577
// listener is removed when the exception occured
15611578
}
@@ -1573,6 +1590,7 @@ TEST_F(ReliableTopicTest, testTerminateCaseForLValue)
15731590
ASSERT_NO_THROW(topic_->publish(item).get());
15741591

15751592
ASSERT_OPEN_EVENTUALLY(state->latch_for_termination);
1593+
ASSERT_OPEN_EVENTUALLY(state->latch_for_cancel);
15761594

15771595
// listener is removed when the exception occured
15781596
}

0 commit comments

Comments
 (0)