Skip to content

Commit 118d846

Browse files
ansdmichaelklishin
authored andcommitted
Fix routing key for AMQP 0.9.1 reading from a stream
when message was published to a stream via the stream protocol. Fixes the following test: ``` ./mvnw test -Dtest=AmqpInteroperabilityTest#publishToStreamConsumeFromStreamQueue ```
1 parent 99dd040 commit 118d846

File tree

4 files changed

+35
-36
lines changed

4 files changed

+35
-36
lines changed

deps/rabbit/src/mc.erl

+9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
correlation_id/1,
2222
user_id/1,
2323
message_id/1,
24+
property/2,
2425
timestamp/1,
2526
priority/1,
2627
set_ttl/2,
@@ -302,6 +303,14 @@ message_id(#?MODULE{protocol = Proto,
302303
message_id(BasicMsg) ->
303304
mc_compat:message_id(BasicMsg).
304305

306+
-spec property(atom(), state()) ->
307+
{utf8, binary()} | undefined.
308+
property(Property, #?MODULE{protocol = Proto,
309+
data = Data}) ->
310+
Proto:property(Property, Data);
311+
property(_Property, _BasicMsg) ->
312+
undefined.
313+
305314
-spec set_ttl(undefined | non_neg_integer(), state()) -> state().
306315
set_ttl(Value, #?MODULE{annotations = Anns} = State) ->
307316
State#?MODULE{annotations = maps:put(ttl, Value, Anns)};

deps/rabbit/src/mc_amqp.erl

+4-15
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ property(message_id, #msg{properties = #'v1_0.properties'{message_id = MsgId}})
115115
MsgId;
116116
property(user_id, #msg{properties = #'v1_0.properties'{user_id = UserId}}) ->
117117
UserId;
118+
property(subject, #msg{properties = #'v1_0.properties'{subject = Subject}}) ->
119+
Subject;
118120
property(_Prop, #msg{}) ->
119121
undefined.
120122

@@ -178,13 +180,6 @@ get_property(priority, Msg) ->
178180
_ ->
179181
undefined
180182
end
181-
end;
182-
get_property(subject, Msg) ->
183-
case Msg of
184-
#msg{properties = #'v1_0.properties'{subject = {utf8, Subject}}} ->
185-
Subject;
186-
_ ->
187-
undefined
188183
end.
189184

190185
convert_to(?MODULE, Msg, _Env) ->
@@ -430,10 +425,6 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
430425
Priority = get_property(priority, Msg),
431426
Timestamp = get_property(timestamp, Msg),
432427
Ttl = get_property(ttl, Msg),
433-
RoutingKeys = case get_property(subject, Msg) of
434-
undefined -> undefined;
435-
Subject -> [Subject]
436-
end,
437428

438429
Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of
439430
{list, DeathMaps} ->
@@ -458,10 +449,8 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
458449
maps_put_truthy(
459450
ttl, Ttl,
460451
maps_put_truthy(
461-
?ANN_ROUTING_KEYS, RoutingKeys,
462-
maps_put_truthy(
463-
deaths, Deaths,
464-
#{})))))),
452+
deaths, Deaths,
453+
#{}))))),
465454
case MA of
466455
[] ->
467456
Anns;

deps/rabbit/src/rabbit_amqp_session.erl

+21-20
Original file line numberDiff line numberDiff line change
@@ -1562,14 +1562,10 @@ incoming_link_transfer(
15621562
[?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]),
15631563
case rabbit_exchange_lookup(Exchange) of
15641564
{ok, X = #exchange{name = #resource{name = XNameBin}}} ->
1565-
Anns0 = #{?ANN_EXCHANGE => XNameBin},
1566-
Anns = case LinkRKey of
1567-
undefined -> Anns0;
1568-
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
1569-
end,
1565+
Anns = #{?ANN_EXCHANGE => XNameBin},
15701566
Mc0 = mc:init(mc_amqp, Sections, Anns),
1571-
Mc1 = rabbit_message_interceptor:intercept(Mc0),
1572-
{Mc, RoutingKey} = ensure_routing_key(Mc1),
1567+
{RoutingKey, Mc1} = ensure_routing_key(LinkRKey, Mc0),
1568+
Mc = rabbit_message_interceptor:intercept(Mc1),
15731569
check_user_id(Mc, User),
15741570
messages_received(Settled),
15751571
check_write_permitted_on_topic(X, User, RoutingKey),
@@ -1620,19 +1616,24 @@ rabbit_exchange_lookup(X = #exchange{}) ->
16201616
rabbit_exchange_lookup(XName = #resource{}) ->
16211617
rabbit_exchange:lookup(XName).
16221618

1623-
ensure_routing_key(Mc) ->
1624-
case mc:routing_keys(Mc) of
1625-
[RoutingKey] ->
1626-
{Mc, RoutingKey};
1627-
[] ->
1628-
%% Set the default routing key of AMQP 0.9.1 'basic.publish'{}.
1629-
%% For example, when the client attached to target /exchange/amq.fanout and sends a
1630-
%% message without setting a 'subject' in the message properties, the routing key is
1631-
%% ignored during routing, but receiving code paths still expect some routing key to be set.
1632-
DefaultRoutingKey = <<"">>,
1633-
Mc1 = mc:set_annotation(?ANN_ROUTING_KEYS, [DefaultRoutingKey], Mc),
1634-
{Mc1, DefaultRoutingKey}
1635-
end.
1619+
ensure_routing_key(LinkRKey, Mc0) ->
1620+
RKey = case LinkRKey of
1621+
undefined ->
1622+
case mc:property(subject, Mc0) of
1623+
undefined ->
1624+
%% Set the default routing key of AMQP 0.9.1 'basic.publish'{}.
1625+
%% For example, when the client attached to target /exchange/amq.fanout and sends a
1626+
%% message without setting a 'subject' in the message properties, the routing key is
1627+
%% ignored during routing, but receiving code paths still expect some routing key to be set.
1628+
<<"">>;
1629+
{utf8, Subject} ->
1630+
Subject
1631+
end;
1632+
_ ->
1633+
LinkRKey
1634+
end,
1635+
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc0),
1636+
{RKey, Mc}.
16361637

16371638
process_routing_confirm([], _SenderSettles = true, _, U) ->
16381639
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),

deps/rabbit/test/amqp_client_SUITE.erl

+1-1
Original file line numberDiff line numberDiff line change
@@ -3361,7 +3361,7 @@ receive_all_messages0(Receiver, Accept, Acc) ->
33613361
false -> ok
33623362
end,
33633363
receive_all_messages0(Receiver, Accept, [Msg | Acc])
3364-
after 500 ->
3364+
after 1000 ->
33653365
lists:reverse(Acc)
33663366
end.
33673367

0 commit comments

Comments
 (0)