Skip to content

Commit 1ab33ca

Browse files
authored
Merge pull request #10058 from rabbitmq/stream-filtering-in-stomp
Support stream filtering in STOMP
2 parents 5157f25 + c26201c commit 1ab33ca

File tree

7 files changed

+260
-24
lines changed

7 files changed

+260
-24
lines changed

deps/rabbitmq_stomp/include/rabbit_stomp_headers.hrl

+5-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
-define(HEADER_PERSISTENT, "persistent").
2828
-define(HEADER_PREFETCH_COUNT, "prefetch-count").
2929
-define(HEADER_X_STREAM_OFFSET, "x-stream-offset").
30+
-define(HEADER_X_STREAM_FILTER, "x-stream-filter").
31+
-define(HEADER_X_STREAM_MATCH_UNFILTERED, "x-stream-match-unfiltered").
3032
-define(HEADER_PRIORITY, "priority").
3133
-define(HEADER_RECEIPT, "receipt").
3234
-define(HEADER_REDELIVERED, "redelivered").
@@ -50,6 +52,7 @@
5052
-define(HEADER_X_MESSAGE_TTL, "x-message-ttl").
5153
-define(HEADER_X_QUEUE_NAME, "x-queue-name").
5254
-define(HEADER_X_QUEUE_TYPE, "x-queue-type").
55+
-define(HEADER_X_STREAM_FILTER_SIZE_BYTES, "x-stream-filter-size-bytes").
5356

5457
-define(MESSAGE_ID_SEPARATOR, "@@").
5558

@@ -67,7 +70,8 @@
6770
?HEADER_X_MAX_LENGTH_BYTES,
6871
?HEADER_X_MAX_PRIORITY,
6972
?HEADER_X_MESSAGE_TTL,
70-
?HEADER_X_QUEUE_TYPE
73+
?HEADER_X_QUEUE_TYPE,
74+
?HEADER_X_STREAM_FILTER_SIZE_BYTES
7175
]).
7276

7377
-define(HEADER_PARAMS, [

deps/rabbitmq_stomp/src/rabbit_stomp_frame.erl

+22-9
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
boolean_header/2, boolean_header/3,
1616
integer_header/2, integer_header/3,
1717
binary_header/2, binary_header/3]).
18-
-export([stream_offset_header/2]).
18+
-export([stream_offset_header/1, stream_filter_header/1]).
1919
-export([serialize/1, serialize/2]).
2020

2121
initial_state() -> none.
@@ -211,20 +211,33 @@ binary_header(F, K) ->
211211

212212
binary_header(F, K, D) -> default_value(binary_header(F, K), D).
213213

214-
stream_offset_header(F, D) ->
215-
case binary_header(F, ?HEADER_X_STREAM_OFFSET, D) of
216-
<<"first">> ->
214+
stream_offset_header(F) ->
215+
case binary_header(F, ?HEADER_X_STREAM_OFFSET) of
216+
{ok, <<"first">>} ->
217217
{longstr, <<"first">>};
218-
<<"last">> ->
218+
{ok, <<"last">>} ->
219219
{longstr, <<"last">>};
220-
<<"next">> ->
220+
{ok, <<"next">>} ->
221221
{longstr, <<"next">>};
222-
<<"offset=", OffsetValue/binary>> ->
222+
{ok, <<"offset=", OffsetValue/binary>>} ->
223223
{long, binary_to_integer(OffsetValue)};
224-
<<"timestamp=", TimestampValue/binary>> ->
224+
{ok, <<"timestamp=", TimestampValue/binary>>} ->
225225
{timestamp, binary_to_integer(TimestampValue)};
226226
_ ->
227-
D
227+
not_found
228+
end.
229+
230+
stream_filter_header(F) ->
231+
case binary_header(F, ?HEADER_X_STREAM_FILTER) of
232+
{ok, Str} ->
233+
{array, lists:reverse(
234+
lists:foldl(fun(V, Acc) ->
235+
[{longstr, V}] ++ Acc
236+
end,
237+
[],
238+
binary:split(Str, <<",">>, [global])))};
239+
not_found ->
240+
not_found
228241
end.
229242

230243
serialize(Frame) ->

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

+37-7
Original file line numberDiff line numberDiff line change
@@ -676,13 +676,7 @@ do_subscribe(Destination, DestHdr, Frame,
676676
{stop, normal, close_connection(State)};
677677
error ->
678678
ExchangeAndKey = parse_routing(Destination, DfltTopicEx),
679-
StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame, undefined),
680-
Arguments = case StreamOffset of
681-
undefined ->
682-
[];
683-
{Type, Value} ->
684-
[{<<"x-stream-offset">>, Type, Value}]
685-
end,
679+
Arguments = subscribe_arguments(Frame),
686680
try
687681
amqp_channel:subscribe(Channel,
688682
#'basic.consume'{
@@ -722,6 +716,42 @@ do_subscribe(Destination, DestHdr, Frame,
722716
Err
723717
end.
724718

719+
subscribe_arguments(Frame) ->
720+
subscribe_arguments([?HEADER_X_STREAM_OFFSET,
721+
?HEADER_X_STREAM_FILTER,
722+
?HEADER_X_STREAM_MATCH_UNFILTERED], Frame, []).
723+
724+
subscribe_arguments([], _Frame , Acc) ->
725+
Acc;
726+
subscribe_arguments([K | T], Frame, Acc0) ->
727+
Acc1 = subscribe_argument(K, Frame, Acc0),
728+
subscribe_arguments(T, Frame, Acc1).
729+
730+
subscribe_argument(?HEADER_X_STREAM_OFFSET, Frame, Acc) ->
731+
StreamOffset = rabbit_stomp_frame:stream_offset_header(Frame),
732+
case StreamOffset of
733+
not_found ->
734+
Acc;
735+
{OffsetType, OffsetValue} ->
736+
[{list_to_binary(?HEADER_X_STREAM_OFFSET), OffsetType, OffsetValue}] ++ Acc
737+
end;
738+
subscribe_argument(?HEADER_X_STREAM_FILTER, Frame, Acc) ->
739+
StreamFilter = rabbit_stomp_frame:stream_filter_header(Frame),
740+
case StreamFilter of
741+
not_found ->
742+
Acc;
743+
{FilterType, FilterValue} ->
744+
[{list_to_binary(?HEADER_X_STREAM_FILTER), FilterType, FilterValue}] ++ Acc
745+
end;
746+
subscribe_argument(?HEADER_X_STREAM_MATCH_UNFILTERED, Frame, Acc) ->
747+
MatchUnfiltered = rabbit_stomp_frame:boolean_header(Frame, ?HEADER_X_STREAM_MATCH_UNFILTERED),
748+
case MatchUnfiltered of
749+
{ok, MU} ->
750+
[{list_to_binary(?HEADER_X_STREAM_MATCH_UNFILTERED), bool, MU}] ++ Acc;
751+
not_found ->
752+
Acc
753+
end.
754+
725755
check_subscription_access(Destination = {topic, _Topic},
726756
#proc_state{auth_login = _User,
727757
connection = Connection,

deps/rabbitmq_stomp/src/rabbit_stomp_util.erl

+6-2
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,12 @@ build_argument(?HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES, Val) ->
296296
{list_to_binary(?HEADER_X_STREAM_MAX_SEGMENT_SIZE_BYTES), long,
297297
list_to_integer(string:strip(Val))};
298298
build_argument(?HEADER_X_QUEUE_TYPE, Val) ->
299-
{list_to_binary(?HEADER_X_QUEUE_TYPE), longstr,
300-
list_to_binary(string:strip(Val))}.
299+
{list_to_binary(?HEADER_X_QUEUE_TYPE), longstr,
300+
list_to_binary(string:strip(Val))};
301+
build_argument(?HEADER_X_STREAM_FILTER_SIZE_BYTES, Val) ->
302+
{list_to_binary(?HEADER_X_STREAM_FILTER_SIZE_BYTES), long,
303+
list_to_integer(string:strip(Val))}.
304+
301305

302306
build_params(EndPoint, Headers) ->
303307
Params = lists:foldl(fun({K, V}, Acc) ->

deps/rabbitmq_stomp/test/frame_SUITE.erl

+23-4
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ all() ->
3939
header_value_with_colon,
4040
headers_escaping_roundtrip,
4141
headers_escaping_roundtrip_without_trailing_lf,
42-
stream_offset_header
42+
stream_offset_header,
43+
stream_filter_header
4344
].
4445

4546
parse_simple_frame(_) ->
@@ -170,17 +171,35 @@ stream_offset_header(_) ->
170171
{{"x-stream-offset", "next"}, {longstr, <<"next">>}},
171172
{{"x-stream-offset", "offset=5000"}, {long, 5000}},
172173
{{"x-stream-offset", "timestamp=1000"}, {timestamp, 1000}},
173-
{{"x-stream-offset", "foo"}, undefined},
174-
{{"some-header", "some value"}, undefined}
174+
{{"x-stream-offset", "foo"}, not_found},
175+
{{"some-header", "some value"}, not_found}
175176
],
176177

177178
lists:foreach(fun({Header, Expected}) ->
178179
?assertEqual(
179180
Expected,
180-
rabbit_stomp_frame:stream_offset_header(#stomp_frame{headers = [Header]}, undefined)
181+
rabbit_stomp_frame:stream_offset_header(#stomp_frame{headers = [Header]})
181182
)
182183
end, TestCases).
183184

185+
stream_filter_header(_) ->
186+
TestCases = [
187+
{{"x-stream-filter", "banana"}, {array, [{longstr, <<"banana">>}]}},
188+
{{"x-stream-filter", "banana,apple"}, {array, [{longstr, <<"banana">>},
189+
{longstr, <<"apple">>}]}},
190+
{{"x-stream-filter", "banana,apple,orange"}, {array, [{longstr, <<"banana">>},
191+
{longstr, <<"apple">>},
192+
{longstr, <<"orange">>}]}},
193+
{{"some-header", "some value"}, not_found}
194+
],
195+
196+
lists:foreach(fun({Header, Expected}) ->
197+
?assertEqual(
198+
Expected,
199+
rabbit_stomp_frame:stream_filter_header(#stomp_frame{headers = [Header]})
200+
)
201+
end, TestCases).
202+
184203
test_frame_serialization(Expected, TrailingLF) ->
185204
{ok, Frame, _} = parse(Expected),
186205
{ok, Val} = rabbit_stomp_frame:header(Frame, "head\r:\ner"),

deps/rabbitmq_stomp/test/python_SUITE_data/src/x_queue_type_stream.py

+2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ def test_stream_queue(self):
2727
'x-queue-type': 'stream',
2828
'x-max-age' : '10h',
2929
'x-stream-max-segment-size-bytes' : 1048576,
30+
'x-stream-filter-size-bytes' : 32,
31+
'x-stream-match-unfiltered' : True,
3032
'durable': True,
3133
'auto-delete': False,
3234
'id': 1234,

deps/rabbitmq_stomp/test/system_SUITE.erl

+165-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ groups() ->
3434
delete_queue_subscribe,
3535
temp_destination_queue,
3636
temp_destination_in_send,
37-
blank_destination_in_send
37+
blank_destination_in_send,
38+
stream_filtering
3839
],
3940

4041
[{version_to_group_name(V), [sequence], Tests}
@@ -102,6 +103,13 @@ init_per_testcase0(publish_unauthorized_error, Config) ->
102103
StompPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stomp),
103104
{ok, ClientFoo} = rabbit_stomp_client:connect(Version, "user", "pass", StompPort),
104105
rabbit_ct_helpers:set_config(Config, [{client_foo, ClientFoo}]);
106+
init_per_testcase0(stream_filtering, Config) ->
107+
case rabbit_ct_helpers:is_mixed_versions() of
108+
true ->
109+
{skip, "mixed version clusters are not supported for stream filtering"};
110+
_ ->
111+
Config
112+
end;
105113
init_per_testcase0(_, Config) ->
106114
Config.
107115

@@ -310,6 +318,162 @@ blank_destination_in_send(Config) ->
310318
"Invalid destination" = proplists:get_value("message", Hdrs),
311319
ok.
312320

321+
stream_filtering(Config) ->
322+
Version = ?config(version, Config),
323+
Client = ?config(stomp_client, Config),
324+
Stream = atom_to_list(?FUNCTION_NAME) ++ "-" ++ integer_to_list(rand:uniform(10000)),
325+
%% subscription just to create the stream from STOMP
326+
SubDestination = "/topic/stream-queue-test",
327+
rabbit_stomp_client:send(
328+
Client, "SUBSCRIBE",
329+
[{"destination", SubDestination},
330+
{"receipt", "foo"},
331+
{"x-queue-name", Stream},
332+
{"x-queue-type", "stream"},
333+
{?HEADER_X_STREAM_FILTER_SIZE_BYTES, "32"},
334+
{"durable", "true"},
335+
{"auto-delete", "false"},
336+
{"id", "1234"},
337+
{"prefetch-count", "1"},
338+
{"ack", "client"}]),
339+
{ok, Client1, _, _} = stomp_receive(Client, "RECEIPT"),
340+
rabbit_stomp_client:send(
341+
Client1, "UNSUBSCRIBE", [{"destination", SubDestination},
342+
{"id", "1234"},
343+
{"receipt", "bar"}]),
344+
{ok, Client2, _, _} = stomp_receive(Client1, "RECEIPT"),
345+
346+
%% we are going to publish several waves of messages with and without filter values.
347+
%% we will then create subscriptions with various filter options
348+
%% and make sure we receive only what we asked for and not all the messages.
349+
350+
StreamDestination = "/amq/queue/" ++ Stream,
351+
%% logic to publish a wave of messages with or without a filter value
352+
WaveCount = 1000,
353+
Publish =
354+
fun(C, FilterValue) ->
355+
lists:foldl(fun(Seq, C0) ->
356+
Headers0 = [{"destination", StreamDestination},
357+
{"receipt", integer_to_list(Seq)}],
358+
Headers = case FilterValue of
359+
undefined ->
360+
Headers0;
361+
_ ->
362+
[{"x-stream-filter-value", FilterValue}] ++ Headers0
363+
end,
364+
rabbit_stomp_client:send(
365+
C0, "SEND", Headers, ["hello"]),
366+
{ok, C1, _, _} = stomp_receive(C0, "RECEIPT"),
367+
C1
368+
end, C, lists:seq(1, WaveCount))
369+
end,
370+
%% publishing messages with the "apple" filter value
371+
Client3 = Publish(Client2, "apple"),
372+
%% publishing messages with no filter value
373+
Client4 = Publish(Client3, undefined),
374+
%% publishing messages with the "orange" filter value
375+
Client5 = Publish(Client4, "orange"),
376+
377+
%% filtering on "apple"
378+
rabbit_stomp_client:send(
379+
Client5, "SUBSCRIBE",
380+
[{"destination", StreamDestination},
381+
{"id", "0"},
382+
{"ack", "client"},
383+
{"prefetch-count", "1"},
384+
{"x-stream-filter", "apple"},
385+
{"x-stream-offset", "first"}]),
386+
{Client6, AppleMessages} = stomp_receive_messages(Client5, Version),
387+
%% we should get less than all the waves combined
388+
?assert(length(AppleMessages) < WaveCount * 3),
389+
%% client-side filtering
390+
AppleFilteredMessages =
391+
lists:filter(fun(H) ->
392+
proplists:get_value("x-stream-filter-value", H) =:= "apple"
393+
end, AppleMessages),
394+
%% we should have only the "apple" messages
395+
?assert(length(AppleFilteredMessages) =:= WaveCount),
396+
rabbit_stomp_client:send(
397+
Client6, "UNSUBSCRIBE", [{"destination", StreamDestination},
398+
{"id", "0"},
399+
{"receipt", "bar"}]),
400+
{ok, Client7, _, _} = stomp_receive(Client6, "RECEIPT"),
401+
402+
%% filtering on "apple" and "orange"
403+
rabbit_stomp_client:send(
404+
Client7, "SUBSCRIBE",
405+
[{"destination", StreamDestination},
406+
{"id", "0"},
407+
{"ack", "client"},
408+
{"prefetch-count", "1"},
409+
{"x-stream-filter", "apple,orange"},
410+
{"x-stream-offset", "first"}]),
411+
{Client8, AppleOrangeMessages} = stomp_receive_messages(Client7, Version),
412+
%% we should get less than all the waves combined
413+
?assert(length(AppleOrangeMessages) < WaveCount * 3),
414+
%% client-side filtering
415+
AppleOrangeFilteredMessages =
416+
lists:filter(fun(H) ->
417+
proplists:get_value("x-stream-filter-value", H) =:= "apple" orelse
418+
proplists:get_value("x-stream-filter-value", H) =:= "orange"
419+
end, AppleOrangeMessages),
420+
%% we should have only the "apple" and "orange" messages
421+
?assert(length(AppleOrangeFilteredMessages) =:= WaveCount * 2),
422+
rabbit_stomp_client:send(
423+
Client8, "UNSUBSCRIBE", [{"destination", StreamDestination},
424+
{"id", "0"},
425+
{"receipt", "bar"}]),
426+
{ok, Client9, _, _} = stomp_receive(Client8, "RECEIPT"),
427+
428+
%% filtering on "apple" and messages without a filter value
429+
rabbit_stomp_client:send(
430+
Client9, "SUBSCRIBE",
431+
[{"destination", StreamDestination},
432+
{"id", "0"},
433+
{"ack", "client"},
434+
{"prefetch-count", "1"},
435+
{"x-stream-filter", "apple"},
436+
{"x-stream-match-unfiltered", "true"},
437+
{"x-stream-offset", "first"}]),
438+
{Client10, AppleUnfilteredMessages} = stomp_receive_messages(Client9, Version),
439+
%% we should get less than all the waves combined
440+
?assert(length(AppleUnfilteredMessages) < WaveCount * 3),
441+
%% client-side filtering
442+
AppleUnfilteredFilteredMessages =
443+
lists:filter(fun(H) ->
444+
proplists:get_value("x-stream-filter-value", H) =:= "apple" orelse
445+
proplists:get_value("x-stream-filter-value", H) =:= undefined
446+
end, AppleUnfilteredMessages),
447+
%% we should have only the "apple" messages and messages without a filter value
448+
?assert(length(AppleUnfilteredFilteredMessages) =:= WaveCount * 2),
449+
rabbit_stomp_client:send(
450+
Client10, "UNSUBSCRIBE", [{"destination", StreamDestination},
451+
{"id", "0"},
452+
{"receipt", "bar"}]),
453+
{ok, _, _, _} = stomp_receive(Client10, "RECEIPT"),
454+
455+
Channel = ?config(amqp_channel, Config),
456+
#'queue.delete_ok'{} = amqp_channel:call(Channel,
457+
#'queue.delete'{queue = list_to_binary(Stream)}),
458+
ok.
459+
460+
stomp_receive_messages(Client, Version) ->
461+
stomp_receive_messages(Client, [], Version).
462+
463+
stomp_receive_messages(Client, Acc, Version) ->
464+
try rabbit_stomp_client:recv(Client) of
465+
{#stomp_frame{command = "MESSAGE",
466+
headers = Headers}, Client1} ->
467+
MsgHeader = rabbit_stomp_util:msg_header_name(Version),
468+
AckValue = proplists:get_value(MsgHeader, Headers),
469+
AckHeader = rabbit_stomp_util:ack_header_name(Version),
470+
rabbit_stomp_client:send(Client1, "ACK", [{AckHeader, AckValue}]),
471+
stomp_receive_messages(Client1, [Headers] ++ Acc, Version)
472+
catch
473+
error:{badmatch, {error, timeout}} ->
474+
{Client, Acc}
475+
end.
476+
313477
stomp_receive(Client, Command) ->
314478
{#stomp_frame{command = Command,
315479
headers = Hdrs,

0 commit comments

Comments
 (0)