Skip to content

Commit c20ed70

Browse files
Add capabilities to attach function
for sender and receiver links
1 parent a337684 commit c20ed70

File tree

2 files changed

+69
-6
lines changed

2 files changed

+69
-6
lines changed

deps/amqp10_client/src/amqp10_client.erl

+50-4
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020
attach_sender_link/3,
2121
attach_sender_link/4,
2222
attach_sender_link/5,
23+
attach_sender_link/6,
2324
attach_sender_link_sync/3,
2425
attach_sender_link_sync/4,
2526
attach_sender_link_sync/5,
27+
attach_sender_link_sync/6,
2628
attach_receiver_link/3,
2729
attach_receiver_link/4,
2830
attach_receiver_link/5,
2931
attach_receiver_link/6,
3032
attach_receiver_link/7,
33+
attach_receiver_link/8,
3134
attach_link/2,
3235
detach_link/1,
3336
send_msg/2,
@@ -182,8 +185,19 @@ attach_sender_link_sync(Session, Name, Target, SettleMode) ->
182185
snd_settle_mode(), terminus_durability()) ->
183186
{ok, link_ref()} | link_timeout.
184187
attach_sender_link_sync(Session, Name, Target, SettleMode, Durability) ->
188+
attach_sender_link_sync(Session, Name, Target, SettleMode, Durability, none).
189+
190+
%% @doc Synchronously attach a link on 'Session'.
191+
%% This is a convenience function that awaits attached event
192+
%% for the link before returning.
193+
-spec attach_sender_link_sync(pid(), binary(), binary(),
194+
snd_settle_mode(), terminus_durability(),
195+
binary() | list()) ->
196+
{ok, link_ref()} | link_timeout.
197+
198+
attach_sender_link_sync(Session, Name, Target, SettleMode, Durability, Capabilities) ->
185199
{ok, Ref} = attach_sender_link(Session, Name, Target, SettleMode,
186-
Durability),
200+
Durability, Capabilities),
187201
receive
188202
{amqp10_event, {link, Ref, attached}} ->
189203
{ok, Ref};
@@ -226,6 +240,23 @@ attach_sender_link(Session, Name, Target, SettleMode, Durability) ->
226240
rcv_settle_mode => first},
227241
amqp10_client_session:attach(Session, AttachArgs).
228242

243+
%% @doc Attaches a sender link to a target.
244+
%% This is asynchronous and will notify completion of the attach request to the
245+
%% caller using an amqp10_event of the following format:
246+
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
247+
-spec attach_sender_link(pid(), binary(), binary(),
248+
snd_settle_mode(), terminus_durability(),
249+
binary() | list()) ->
250+
{ok, link_ref()}.
251+
attach_sender_link(Session, Name, Target, SettleMode, Durability, Capabilities) ->
252+
AttachArgs = #{name => Name,
253+
role => {sender, #{address => Target,
254+
durable => Durability,
255+
capabilities => Capabilities}},
256+
snd_settle_mode => SettleMode,
257+
rcv_settle_mode => first},
258+
amqp10_client_session:attach(Session, AttachArgs).
259+
229260
%% @doc Attaches a receiver link to a source.
230261
%% This is asynchronous and will notify completion of the attach request to the
231262
%% caller using an amqp10_event of the following format:
@@ -272,7 +303,19 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) ->
272303
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
273304
terminus_durability(), filter(), properties()) ->
274305
{ok, link_ref()}.
275-
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties)
306+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) ->
307+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties, none).
308+
309+
%% @doc Attaches a receiver link to a source.
310+
%% This is asynchronous and will notify completion of the attach request to the
311+
%% caller using an amqp10_event of the following format:
312+
%% {amqp10_event, {link, LinkRef, attached | {detached, Why}}}
313+
-spec attach_receiver_link(pid(), binary(), binary(), snd_settle_mode(),
314+
terminus_durability(), filter(), properties(),
315+
binary() | list()) ->
316+
{ok, link_ref()}.
317+
attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter,
318+
Properties, Capabilities)
276319
when is_pid(Session) andalso
277320
is_binary(Name) andalso
278321
is_binary(Source) andalso
@@ -281,10 +324,13 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop
281324
SettleMode == mixed) andalso
282325
is_atom(Durability) andalso
283326
is_map(Filter) andalso
284-
is_map(Properties) ->
327+
is_map(Properties) andalso
328+
is_list(Capabilities) orelse
329+
(Capabilities == none orelse is_binary(Capabilities)) ->
285330
AttachArgs = #{name => Name,
286331
role => {receiver, #{address => Source,
287-
durable => Durability}, self()},
332+
durable => Durability,
333+
capabilities => Capabilities}, self()},
288334
snd_settle_mode => SettleMode,
289335
rcv_settle_mode => first,
290336
filter => Filter,

deps/amqp10_client/src/amqp10_client_session.erl

+19-2
Original file line numberDiff line numberDiff line change
@@ -713,20 +713,24 @@ make_source(#{role := {sender, _}}) ->
713713
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
714714
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
715715
TranslatedFilter = translate_filters(Filter),
716+
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Source, none)),
716717
#'v1_0.source'{address = {utf8, Address},
717718
durable = {uint, Durable},
718-
filter = TranslatedFilter}.
719+
filter = TranslatedFilter,
720+
capabilities = Capabilities}.
719721

720722
make_target(#{role := {receiver, _Source, _Pid}}) ->
721723
#'v1_0.target'{};
722724
make_target(#{role := {sender, #{address := Address} = Target}}) ->
723725
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
726+
Capabilities = translate_terminus_capabilities(maps:get(capabilities, Target, none)),
724727
TargetAddr = case is_binary(Address) of
725728
true -> {utf8, Address};
726729
false -> Address
727730
end,
728731
#'v1_0.target'{address = TargetAddr,
729-
durable = {uint, Durable}}.
732+
durable = {uint, Durable},
733+
capabilities = Capabilities}.
730734

731735
max_message_size(#{max_message_size := Size})
732736
when is_integer(Size) andalso
@@ -771,6 +775,19 @@ filter_value_type({T, _} = V) when is_atom(T) ->
771775
%% looks like an already tagged type, just pass it through
772776
V.
773777

778+
translate_terminus_capabilities(none) ->
779+
undefined;
780+
translate_terminus_capabilities(Capabilities) when is_binary(Capabilities) ->
781+
{utf8, Capabilities};
782+
translate_terminus_capabilities(CapabilitiesList) when is_list(CapabilitiesList) ->
783+
{list, [filter_capability(V) || V <- CapabilitiesList]}.
784+
785+
filter_capability(V) when is_binary(V) ->
786+
{utf8, V};
787+
filter_capability({T, _} = V) when is_atom(T) ->
788+
%% looks like an already tagged type, just pass it through
789+
V.
790+
774791
% https://people.apache.org/~rgodfrey/amqp-1.0/apache-filters.html
775792
translate_legacy_amqp_headers_binding(LegacyHeaders) ->
776793
{map,

0 commit comments

Comments
 (0)