Skip to content

Commit e590c9f

Browse files
Merge pull request #10662 from rabbitmq/native-amqp-follow-ups
Native AMQP follow ups
2 parents 15688c3 + 118d846 commit e590c9f

File tree

4 files changed

+97
-62
lines changed

4 files changed

+97
-62
lines changed

deps/rabbit/src/mc.erl

+9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
correlation_id/1,
2222
user_id/1,
2323
message_id/1,
24+
property/2,
2425
timestamp/1,
2526
priority/1,
2627
set_ttl/2,
@@ -302,6 +303,14 @@ message_id(#?MODULE{protocol = Proto,
302303
message_id(BasicMsg) ->
303304
mc_compat:message_id(BasicMsg).
304305

306+
-spec property(atom(), state()) ->
307+
{utf8, binary()} | undefined.
308+
property(Property, #?MODULE{protocol = Proto,
309+
data = Data}) ->
310+
Proto:property(Property, Data);
311+
property(_Property, _BasicMsg) ->
312+
undefined.
313+
305314
-spec set_ttl(undefined | non_neg_integer(), state()) -> state().
306315
set_ttl(Value, #?MODULE{annotations = Anns} = State) ->
307316
State#?MODULE{annotations = maps:put(ttl, Value, Anns)};

deps/rabbit/src/mc_amqp.erl

+4-15
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,8 @@ property(message_id, #msg{properties = #'v1_0.properties'{message_id = MsgId}})
115115
MsgId;
116116
property(user_id, #msg{properties = #'v1_0.properties'{user_id = UserId}}) ->
117117
UserId;
118+
property(subject, #msg{properties = #'v1_0.properties'{subject = Subject}}) ->
119+
Subject;
118120
property(_Prop, #msg{}) ->
119121
undefined.
120122

@@ -178,13 +180,6 @@ get_property(priority, Msg) ->
178180
_ ->
179181
undefined
180182
end
181-
end;
182-
get_property(subject, Msg) ->
183-
case Msg of
184-
#msg{properties = #'v1_0.properties'{subject = {utf8, Subject}}} ->
185-
Subject;
186-
_ ->
187-
undefined
188183
end.
189184

190185
convert_to(?MODULE, Msg, _Env) ->
@@ -430,10 +425,6 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
430425
Priority = get_property(priority, Msg),
431426
Timestamp = get_property(timestamp, Msg),
432427
Ttl = get_property(ttl, Msg),
433-
RoutingKeys = case get_property(subject, Msg) of
434-
undefined -> undefined;
435-
Subject -> [Subject]
436-
end,
437428

438429
Deaths = case message_annotation(<<"x-death">>, Msg, undefined) of
439430
{list, DeathMaps} ->
@@ -458,10 +449,8 @@ essential_properties(#msg{message_annotations = MA} = Msg) ->
458449
maps_put_truthy(
459450
ttl, Ttl,
460451
maps_put_truthy(
461-
?ANN_ROUTING_KEYS, RoutingKeys,
462-
maps_put_truthy(
463-
deaths, Deaths,
464-
#{})))))),
452+
deaths, Deaths,
453+
#{}))))),
465454
case MA of
466455
[] ->
467456
Anns;

deps/rabbit/src/rabbit_amqp_session.erl

+67-40
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
}).
7777

7878
-record(incoming_link, {
79-
exchange :: rabbit_exchange:name(),
79+
exchange :: rabbit_types:exchange() | rabbit_exchange:name(),
8080
routing_key :: undefined | rabbit_types:routing_key(),
8181
%% queue_name_bin is only set if the link target address refers to a queue.
8282
queue_name_bin :: undefined | rabbit_misc:resource_name(),
@@ -713,9 +713,9 @@ handle_control(#'v1_0.attach'{role = ?SEND_ROLE,
713713
user = User}}) ->
714714
ok = validate_attach(Attach),
715715
case ensure_target(Target, Vhost, User) of
716-
{ok, XName, RoutingKey, QNameBin} ->
716+
{ok, Exchange, RoutingKey, QNameBin} ->
717717
IncomingLink = #incoming_link{
718-
exchange = XName,
718+
exchange = Exchange,
719719
routing_key = RoutingKey,
720720
queue_name_bin = QNameBin,
721721
delivery_count = DeliveryCountInt,
@@ -945,17 +945,14 @@ handle_control(#'v1_0.flow'{handle = Handle} = Flow,
945945
end
946946
end;
947947

948-
handle_control(#'v1_0.detach'{handle = Handle = ?UINT(HandleInt),
949-
closed = Closed},
948+
handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
950949
State0 = #state{queue_states = QStates0,
951950
incoming_links = IncomingLinks,
952951
outgoing_links = OutgoingLinks0,
953952
outgoing_unsettled_map = Unsettled0,
954953
cfg = #cfg{
955-
writer_pid = WriterPid,
956954
vhost = Vhost,
957-
user = #user{username = Username},
958-
channel_num = Ch}}) ->
955+
user = #user{username = Username}}}) ->
959956
Ctag = handle_to_ctag(HandleInt),
960957
%% TODO delete queue if closed flag is set to true? see 2.6.6
961958
%% TODO keep the state around depending on the lifetime
@@ -1011,8 +1008,7 @@ handle_control(#'v1_0.detach'{handle = Handle = ?UINT(HandleInt),
10111008
incoming_links = maps:remove(HandleInt, IncomingLinks),
10121009
outgoing_links = OutgoingLinks,
10131010
outgoing_unsettled_map = Unsettled},
1014-
rabbit_amqp_writer:send_command(WriterPid, Ch, #'v1_0.detach'{handle = Handle,
1015-
closed = Closed}),
1011+
maybe_detach_reply(Detach, State, State0),
10161012
publisher_or_consumer_deleted(State, State0),
10171013
{noreply, State};
10181014

@@ -1533,7 +1529,7 @@ incoming_link_transfer(
15331529
rcv_settle_mode = RcvSettleMode,
15341530
handle = Handle = ?UINT(HandleInt)},
15351531
MsgPart,
1536-
#incoming_link{exchange = XName = #resource{name = XNameBin},
1532+
#incoming_link{exchange = Exchange,
15371533
routing_key = LinkRKey,
15381534
delivery_count = DeliveryCount0,
15391535
incoming_unconfirmed_map = U0,
@@ -1564,20 +1560,16 @@ incoming_link_transfer(
15641560
Sections = amqp10_framing:decode_bin(MsgBin),
15651561
?DEBUG("~s Inbound content:~n ~tp",
15661562
[?MODULE, [amqp10_framing:pprint(Section) || Section <- Sections]]),
1567-
Anns0 = #{?ANN_EXCHANGE => XNameBin},
1568-
Anns = case LinkRKey of
1569-
undefined -> Anns0;
1570-
_ -> Anns0#{?ANN_ROUTING_KEYS => [LinkRKey]}
1571-
end,
1572-
Mc0 = mc:init(mc_amqp, Sections, Anns),
1573-
Mc1 = rabbit_message_interceptor:intercept(Mc0),
1574-
{Mc, RoutingKey} = ensure_routing_key(Mc1),
1575-
check_user_id(Mc, User),
1576-
messages_received(Settled),
1577-
case rabbit_exchange:lookup(XName) of
1578-
{ok, Exchange} ->
1579-
check_write_permitted_on_topic(Exchange, User, RoutingKey),
1580-
QNames = rabbit_exchange:route(Exchange, Mc, #{return_binding_keys => true}),
1563+
case rabbit_exchange_lookup(Exchange) of
1564+
{ok, X = #exchange{name = #resource{name = XNameBin}}} ->
1565+
Anns = #{?ANN_EXCHANGE => XNameBin},
1566+
Mc0 = mc:init(mc_amqp, Sections, Anns),
1567+
{RoutingKey, Mc1} = ensure_routing_key(LinkRKey, Mc0),
1568+
Mc = rabbit_message_interceptor:intercept(Mc1),
1569+
check_user_id(Mc, User),
1570+
messages_received(Settled),
1571+
check_write_permitted_on_topic(X, User, RoutingKey),
1572+
QNames = rabbit_exchange:route(X, Mc, #{return_binding_keys => true}),
15811573
rabbit_trace:tap_in(Mc, QNames, ConnName, ChannelNum, Username, Trace),
15821574
case not Settled andalso
15831575
RcvSettleMode =:= ?V_1_0_RECEIVER_SETTLE_MODE_SECOND of
@@ -1619,19 +1611,29 @@ incoming_link_transfer(
16191611
{error, [Disposition, Detach]}
16201612
end.
16211613

1622-
ensure_routing_key(Mc) ->
1623-
case mc:routing_keys(Mc) of
1624-
[RoutingKey] ->
1625-
{Mc, RoutingKey};
1626-
[] ->
1627-
%% Set the default routing key of AMQP 0.9.1 'basic.publish'{}.
1628-
%% For example, when the client attached to target /exchange/amq.fanout and sends a
1629-
%% message without setting a 'subject' in the message properties, the routing key is
1630-
%% ignored during routing, but receiving code paths still expect some routing key to be set.
1631-
DefaultRoutingKey = <<"">>,
1632-
Mc1 = mc:set_annotation(?ANN_ROUTING_KEYS, [DefaultRoutingKey], Mc),
1633-
{Mc1, DefaultRoutingKey}
1634-
end.
1614+
rabbit_exchange_lookup(X = #exchange{}) ->
1615+
{ok, X};
1616+
rabbit_exchange_lookup(XName = #resource{}) ->
1617+
rabbit_exchange:lookup(XName).
1618+
1619+
ensure_routing_key(LinkRKey, Mc0) ->
1620+
RKey = case LinkRKey of
1621+
undefined ->
1622+
case mc:property(subject, Mc0) of
1623+
undefined ->
1624+
%% Set the default routing key of AMQP 0.9.1 'basic.publish'{}.
1625+
%% For example, when the client attached to target /exchange/amq.fanout and sends a
1626+
%% message without setting a 'subject' in the message properties, the routing key is
1627+
%% ignored during routing, but receiving code paths still expect some routing key to be set.
1628+
<<"">>;
1629+
{utf8, Subject} ->
1630+
Subject
1631+
end;
1632+
_ ->
1633+
LinkRKey
1634+
end,
1635+
Mc = mc:set_annotation(?ANN_ROUTING_KEYS, [RKey], Mc0),
1636+
{RKey, Mc}.
16351637

16361638
process_routing_confirm([], _SenderSettles = true, _, U) ->
16371639
rabbit_global_counters:messages_unroutable_dropped(?PROTOCOL, 1),
@@ -1692,16 +1694,25 @@ ensure_target(#'v1_0.target'{address = Address,
16921694
{ok, Dest} ->
16931695
QNameBin = ensure_terminus(target, Dest, Vhost, User, Durable),
16941696
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
1695-
XName = rabbit_misc:r(Vhost, exchange, list_to_binary(XNameList1)),
1697+
XNameBin = list_to_binary(XNameList1),
1698+
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
16961699
{ok, X} = rabbit_exchange:lookup(XName),
16971700
check_internal_exchange(X),
16981701
check_write_permitted(XName, User),
1702+
%% Pre-declared exchanges are protected against deletion and modification.
1703+
%% Let's cache the whole #exchange{} record to save a
1704+
%% rabbit_exchange:lookup(XName) call each time we receive a message.
1705+
Exchange = case XNameBin of
1706+
<<>> -> X;
1707+
<<"amq.", _/binary>> -> X;
1708+
_ -> XName
1709+
end,
16991710
RoutingKey = case RK of
17001711
undefined -> undefined;
17011712
[] -> undefined;
17021713
_ -> list_to_binary(RK)
17031714
end,
1704-
{ok, XName, RoutingKey, QNameBin};
1715+
{ok, Exchange, RoutingKey, QNameBin};
17051716
{error, _} = E ->
17061717
E
17071718
end;
@@ -2192,6 +2203,22 @@ publisher_or_consumer_deleted(
21922203
ok
21932204
end.
21942205

2206+
%% If we previously already sent a detach with an error condition, and the Detach we
2207+
%% receive here is therefore the client's reply, do not reply again with a 3rd detach.
2208+
maybe_detach_reply(Detach,
2209+
#state{incoming_links = NewIncomingLinks,
2210+
outgoing_links = NewOutgoingLinks,
2211+
cfg = #cfg{writer_pid = WriterPid,
2212+
channel_num = Ch}},
2213+
#state{incoming_links = OldIncomingLinks,
2214+
outgoing_links = OldOutgoingLinks})
2215+
when map_size(NewIncomingLinks) < map_size(OldIncomingLinks) orelse
2216+
map_size(NewOutgoingLinks) < map_size(OldOutgoingLinks) ->
2217+
Reply = Detach#'v1_0.detach'{error = undefined},
2218+
rabbit_amqp_writer:send_command(WriterPid, Ch, Reply);
2219+
maybe_detach_reply(_, _, _) ->
2220+
ok.
2221+
21952222
check_internal_exchange(#exchange{internal = true,
21962223
name = XName}) ->
21972224
protocol_error(?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,

deps/rabbit/test/amqp_client_SUITE.erl

+17-7
Original file line numberDiff line numberDiff line change
@@ -925,35 +925,45 @@ server_closes_link(QType, Config) ->
925925

926926
server_closes_link_exchange(Config) ->
927927
XName = atom_to_binary(?FUNCTION_NAME),
928+
QName = <<"my queue">>,
929+
RoutingKey = <<"my routing key">>,
928930
Ch = rabbit_ct_client_helpers:open_channel(Config),
929931
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = XName}),
930-
932+
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
933+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = QName,
934+
exchange = XName,
935+
routing_key = RoutingKey}),
931936
OpnConf = connection_config(Config),
932937
{ok, Connection} = amqp10_client:open_connection(OpnConf),
933938
{ok, Session} = amqp10_client:begin_session_sync(Connection),
934-
Address = <<"/exchange/", XName/binary, "/some-routing-key">>,
939+
Address = <<"/exchange/", XName/binary, "/", RoutingKey/binary>>,
935940
{ok, Sender} = amqp10_client:attach_sender_link(
936941
Session, <<"test-sender">>, Address),
937942
ok = wait_for_credit(Sender),
938943
?assertMatch(#{publishers := 1}, get_global_counters(Config)),
939944

945+
DTag1 = <<1>>,
946+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag1, <<"m1">>, false)),
947+
ok = wait_for_settlement(DTag1),
948+
940949
%% Server closes the link endpoint due to some AMQP 1.0 external condition:
941950
%% In this test, the external condition is that an AMQP 0.9.1 client deletes the exchange.
942951
#'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = XName}),
943-
ok = rabbit_ct_client_helpers:close_channel(Ch),
944952

945953
%% When we publish the next message, we expect:
946954
%% 1. that the message is released because the exchange doesn't exist anymore, and
947-
DTag = <<255>>,
948-
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag, <<"body">>, false)),
949-
ok = wait_for_settlement(DTag, released),
955+
DTag2 = <<255>>,
956+
ok = amqp10_client:send_msg(Sender, amqp10_msg:new(DTag2, <<"m2">>, false)),
957+
ok = wait_for_settlement(DTag2, released),
950958
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
951959
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED},
952960
receive {amqp10_event, {link, Sender, {detached, ExpectedError}}} -> ok
953961
after 5000 -> ct:fail("server did not close our outgoing link")
954962
end,
955963
?assertMatch(#{publishers := 0}, get_global_counters(Config)),
956964

965+
#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
966+
ok = rabbit_ct_client_helpers:close_channel(Ch),
957967
ok = end_session_sync(Session),
958968
ok = amqp10_client:close_connection(Connection).
959969

@@ -3351,7 +3361,7 @@ receive_all_messages0(Receiver, Accept, Acc) ->
33513361
false -> ok
33523362
end,
33533363
receive_all_messages0(Receiver, Accept, [Msg | Acc])
3354-
after 500 ->
3364+
after 1000 ->
33553365
lists:reverse(Acc)
33563366
end.
33573367

0 commit comments

Comments
 (0)