Skip to content

Commit 862517b

Browse files
Configure capabilities on the source/target field in the ATTACH frame
1 parent b4efe04 commit 862517b

File tree

10 files changed

+525
-52
lines changed

10 files changed

+525
-52
lines changed

deps/amqp10_client/.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,5 @@ amqp10_client.d
2626

2727
# Downloaded ActiveMQ.
2828
/test/system_SUITE_data/apache-activemq-*
29+
/test/system_SUITE_data/ibmmq/mq-container
30+
/test/system_SUITE_data/ibmmq/*.tar.gz

deps/amqp10_client/BUILD.bazel

+4-2
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,14 @@ rabbitmq_integration_suite(
116116
size = "medium",
117117
additional_beam = [
118118
"test/activemq_ct_helpers.beam",
119+
"test/ibmmq_ct_helpers.beam",
119120
"test/mock_server.beam",
120121
],
121122
data = [
122-
"@activemq//:exec_dir",
123+
"@activemq//:exec_dir",
123124
],
124125
test_env = {
125-
"ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq",
126+
"ACTIVEMQ": "$TEST_SRCDIR/$TEST_WORKSPACE/external/activemq/bin/activemq"
126127
},
127128
deps = TEST_DEPS,
128129
)
@@ -139,6 +140,7 @@ eunit(
139140
name = "eunit",
140141
compiled_suites = [
141142
":test_activemq_ct_helpers_beam",
143+
":test_ibmmq_ct_helpers_beam",
142144
":test_mock_server_beam",
143145
],
144146
target = ":test_erlang_app",

deps/amqp10_client/Makefile

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,6 @@ $(ACTIVEMQ): \
6868
test/system_SUITE_data/apache-activemq-$(ACTIVEMQ_VERSION)-bin.tar.gz:
6969
$(gen_verbose) $(call core_http_get,$@,$(ACTIVEMQ_URL))
7070

71-
tests:: $(ACTIVEMQ)
71+
tests:: $(ACTIVEMQ)
7272

73-
ct ct-system: $(ACTIVEMQ)
73+
ct ct-system: $(ACTIVEMQ)

deps/amqp10_client/app.bzl

+8
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
126126
app_name = "amqp10_client",
127127
erlc_opts = "//:test_erlc_opts",
128128
)
129+
erlang_bytecode(
130+
name = "test_ibmmq_ct_helpers_beam",
131+
testonly = True,
132+
srcs = ["test/ibmmq_ct_helpers.erl"],
133+
outs = ["test/ibmmq_ct_helpers.beam"],
134+
app_name = "amqp10_client",
135+
erlc_opts = "//:test_erlc_opts",
136+
)
129137
erlang_bytecode(
130138
name = "test_mock_server_beam",
131139
testonly = True,

deps/amqp10_client/src/amqp10_client.erl

+6-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
end_session/1,
2020
attach_sender_link/3,
2121
attach_sender_link/4,
22-
attach_sender_link/5,
22+
attach_sender_link/5,
2323
attach_sender_link_sync/3,
2424
attach_sender_link_sync/4,
2525
attach_sender_link_sync/5,
@@ -166,6 +166,8 @@ end_session(Pid) ->
166166
%% for the link before returning.
167167
attach_sender_link_sync(Session, Name, Target) ->
168168
attach_sender_link_sync(Session, Name, Target, mixed).
169+
-spec attach_sender_link_sync(pid(), binary(), binary()) ->
170+
{ok, link_ref()} | link_timeout.
169171

170172
%% @doc Synchronously attach a link on 'Session'.
171173
%% This is a convenience function that awaits attached event
@@ -271,9 +273,10 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
271273
%% caller using an amqp10_event of the following format:
272274
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
273275
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
274-
terminus_durability(), filter(), properties()) ->
276+
terminus_durability(), filter(), properties()) ->
275277
{ok, link_ref()}.
276-
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
278+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter,
279+
Properties)
277280
when is_pid(Session) andalso
278281
is_binary(Name) andalso
279282
is_binary(Source) andalso

deps/amqp10_client/src/amqp10_client_session.erl

+26-4
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,14 @@
7373
-type rcv_settle_mode() :: first | second.
7474

7575
-type terminus_durability() :: none | configuration | unsettled_state.
76+
-type terminus_capabilities() :: none | binary() | list().
7677

7778
-type target_def() :: #{address => link_address(),
78-
durable => terminus_durability()}.
79+
durable => terminus_durability(),
80+
capabilities => terminus_capabilities()}.
7981
-type source_def() :: #{address => link_address(),
80-
durable => terminus_durability()}.
82+
durable => terminus_durability(),
83+
capabilities => terminus_capabilities()}.
8184

8285
-type attach_role() :: {sender, target_def()} | {receiver, source_def(), pid()}.
8386

@@ -109,6 +112,7 @@
109112
-export_type([snd_settle_mode/0,
110113
rcv_settle_mode/0,
111114
terminus_durability/0,
115+
terminus_capabilities/0,
112116
attach_args/0,
113117
attach_role/0,
114118
target_def/0,
@@ -713,20 +717,24 @@ make_source(#{role := {sender, _}}) ->
713717
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
714718
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
715719
TranslatedFilter = translate_filters(Filter),
720+
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, none)),
716721
#'v1_0.source'{address = {utf8, Address},
717722
durable = {uint, Durable},
718-
filter = TranslatedFilter}.
723+
filter = TranslatedFilter,
724+
capabilities = Capabilities}.
719725

720726
make_target(#{role := {receiver, _Source, _Pid}}) ->
721727
#'v1_0.target'{};
722728
make_target(#{role := {sender, #{address := Address} = Target}}) ->
723729
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
730+
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, none)),
724731
TargetAddr = case is_binary(Address) of
725732
true -> {utf8, Address};
726733
false -> Address
727734
end,
728735
#'v1_0.target'{address = TargetAddr,
729-
durable = {uint, Durable}}.
736+
durable = {uint, Durable},
737+
capabilities = Capabilities}.
730738

731739
max_message_size(#{max_message_size := Size})
732740
when is_integer(Size) andalso
@@ -771,6 +779,19 @@ filter_value_type({T, _} = V) when is_atom(T) ->
771779
%% looks like an already tagged type, just pass it through
772780
V.
773781

782+
translate_terminus_capabilities(none) ->
783+
undefined;
784+
translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) ->
785+
{symbol, Capabilities};
786+
translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) ->
787+
{array, symbol, [filter_capability(V) || V <- CapabilitiesList]}.
788+
789+
filter_capability(V) when is_binary(V) ->
790+
{symbol, V};
791+
filter_capability({T, _} = V) when is_atom(T) ->
792+
%% looks like an already tagged type, just pass it through
793+
V.
794+
774795
% https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html
775796
translate_legacy_amqp_headers_binding(LegacyHeaders) ->
776797
{map,
@@ -846,6 +867,7 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
846867
rcv_settle_mode = rcv_settle_mode(Args),
847868
target = Target,
848869
max_message_size = MaxMessageSize},
870+
849871
ok = Send(Attach, State),
850872

851873
Ref = make_link_ref(Role, self(), OutHandle),

deps/amqp10_client/test/activemq_ct_helpers.erl

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ start_activemq_nodes(Config) ->
6363
ActivemqCmd = ?config(activemq_cmd, Config1),
6464
TCPPort = rabbit_ct_broker_helpers:get_node_config(Config1, 0, tcp_port_amqp),
6565
ConfigFile = ?config(activemq_config_filename, Config1),
66+
ct:log("Running ~p", [ActivemqCmd]),
6667
Cmd = [ActivemqCmd,
6768
"start",
6869
{"-Dtestsuite.tcp_port_amqp=~b", [TCPPort]},
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(ibmmq_ct_helpers).
9+
10+
-include_lib("common_test/include/ct.hrl").
11+
12+
-export([setup_steps/0,
13+
teardown_steps/0,
14+
init_config/1,
15+
start_ibmmq_server/1,
16+
stop_ibmmq_server/1]).
17+
18+
setup_steps() ->
19+
[fun init_config/1,
20+
fun start_ibmmq_server/1
21+
].
22+
23+
teardown_steps() ->
24+
[
25+
fun stop_ibmmq_server/1
26+
].
27+
28+
init_config(Config) ->
29+
NodeConfig = [{tcp_port_amqp, 5672}],
30+
rabbit_ct_helpers:set_config(Config, [ {rmq_nodes, [NodeConfig]},
31+
{rmq_hostname, "localhost"},
32+
{tcp_hostname_amqp, "localhost"},
33+
{sasl, {plain, <<"app">>, <<"passw0rd">>}} ]).
34+
35+
start_ibmmq_server(Config) ->
36+
IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]),
37+
Cmd = [IBMmqCmd, "start"],
38+
ct:log("Running command ~p", [Cmd]),
39+
case rabbit_ct_helpers:exec(Cmd, []) of
40+
{ok, _} -> wait_for_ibmmq_nodes(Config);
41+
Error -> ct:pal("Error: ~tp", [Error]),
42+
{skip, "Failed to start IBM MQ"}
43+
end.
44+
45+
wait_for_ibmmq_nodes(Config) ->
46+
Hostname = ?config(rmq_hostname, Config),
47+
Ports = rabbit_ct_broker_helpers:get_node_configs(Config, tcp_port_amqp),
48+
wait_for_ibmmq_ports(Config, Hostname, Ports).
49+
50+
wait_for_ibmmq_ports(Config, Hostname, [Port | Rest]) ->
51+
ct:log("Waiting for IBM MQ on port ~b", [Port]),
52+
case wait_for_ibmmq_port(Hostname, Port, 60) of
53+
ok ->
54+
ct:log("IBM MQ ready on port ~b", [Port]),
55+
wait_for_ibmmq_ports(Config, Hostname, Rest);
56+
{error, _} ->
57+
Msg = lists:flatten(
58+
io_lib:format(
59+
"Failed to start IBM MQ on port ~b; see IBM MQ logs",
60+
[Port])),
61+
ct:pal(?LOW_IMPORTANCE, Msg, []),
62+
{skip, Msg}
63+
end;
64+
wait_for_ibmmq_ports(Config, _, []) ->
65+
Config.
66+
67+
wait_for_ibmmq_port(_, _, 0) ->
68+
{error, econnrefused};
69+
wait_for_ibmmq_port(Hostname, Port, Retries) ->
70+
case gen_tcp:connect(Hostname, Port, []) of
71+
{ok, Connection} ->
72+
gen_tcp:close(Connection),
73+
ok;
74+
{error, econnrefused} ->
75+
timer:sleep(1000),
76+
wait_for_ibmmq_port(Hostname, Port, Retries - 1);
77+
Error ->
78+
Error
79+
end.
80+
81+
stop_ibmmq_server(Config) ->
82+
IBMmqCmd = filename:join([?config(data_dir, Config), "ibmmq_runner"]),
83+
Cmd = [IBMmqCmd, "stop"],
84+
ct:log("Running command ~p", [Cmd]),
85+
case rabbit_ct_helpers:exec(Cmd, []) of
86+
{ok, _} -> Config;
87+
Error -> ct:pal("Error: ~tp", [Error]),
88+
{skip, "Failed to stop IBM MQ"}
89+
end.

0 commit comments

Comments
 (0)