Skip to content

Commit 9a5a321

Browse files
Merge pull request #13364 from rabbitmq/recover-ts
Recover "received timestamp" when reading from stream
2 parents 99a09df + 7e71730 commit 9a5a321

File tree

3 files changed

+31
-12
lines changed

3 files changed

+31
-12
lines changed

deps/rabbit/src/mc.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ init(Proto, Data, Anns0, Env) ->
160160
false -> Anns0#{env => Env}
161161
end,
162162
Anns2 = maps:merge(ProtoAnns, Anns1),
163-
Anns = set_received_at_timestamp(Anns2),
163+
Anns = ensure_received_at_timestamp(Anns2),
164164
#?MODULE{protocol = Proto,
165165
data = ProtoData,
166166
annotations = Anns}.
@@ -527,6 +527,9 @@ is_cycle_v1(Queue, [{Queue, Reason} | _])
527527
is_cycle_v1(Queue, [_ | Rem]) ->
528528
is_cycle_v1(Queue, Rem).
529529

530-
set_received_at_timestamp(Anns) ->
530+
ensure_received_at_timestamp(Anns)
531+
when is_map_key(?ANN_RECEIVED_AT_TIMESTAMP, Anns) ->
532+
Anns;
533+
ensure_received_at_timestamp(Anns) ->
531534
Millis = os:system_time(millisecond),
532535
Anns#{?ANN_RECEIVED_AT_TIMESTAMP => Millis}.

deps/rabbit/src/mc_amqp.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,9 @@ essential_properties(#msg_body_encoded{message_annotations = MA} = Msg, recover)
677677
({{symbol, <<"x-exchange">>},
678678
{utf8, Exchange}}, Acc) ->
679679
Acc#{?ANN_EXCHANGE => Exchange};
680+
({{symbol, <<"x-opt-rabbitmq-received-time">>},
681+
{timestamp, Ts}}, Acc) ->
682+
Acc#{?ANN_RECEIVED_AT_TIMESTAMP => Ts};
680683
(_, Acc) ->
681684
Acc
682685
end, Anns, MA)

deps/rabbit/test/mc_unit_SUITE.erl

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ amqpl_compat(_Config) ->
100100
Content = #content{properties = Props,
101101
payload_fragments_rev = Payload},
102102

103-
XName= <<"exch">>,
103+
XName = <<"exch">>,
104104
RoutingKey = <<"apple">>,
105105
{ok, Msg00} = rabbit_basic:message_no_id(XName, RoutingKey, Content),
106106

@@ -148,7 +148,6 @@ amqpl_compat(_Config) ->
148148
<<"x-stream-filter">> := <<"apple">>}, RoutingHeadersX),
149149
ok.
150150

151-
152151
amqpl_table_x_header(_Config) ->
153152
Tbl = [{<<"type">>, longstr, <<"apple">>},
154153
{<<"count">>, long, 99}],
@@ -346,7 +345,11 @@ amqpl_amqp_bin_amqpl(_Config) ->
346345
},
347346
Content = #content{properties = Props,
348347
payload_fragments_rev = [<<"data">>]},
349-
Msg = mc:init(mc_amqpl, Content, annotations()),
348+
Msg0 = mc:init(mc_amqpl, Content, annotations()),
349+
350+
ok = persistent_term:put(incoming_message_interceptors,
351+
[{set_header_timestamp, false}]),
352+
Msg = rabbit_message_interceptor:intercept(Msg0),
350353

351354
?assertEqual(<<"exch">>, mc:exchange(Msg)),
352355
?assertEqual([<<"apple">>], mc:routing_keys(Msg)),
@@ -357,17 +360,25 @@ amqpl_amqp_bin_amqpl(_Config) ->
357360
?assertEqual({utf8, <<"msg-id">>}, mc:message_id(Msg)),
358361
?assertEqual(1, mc:ttl(Msg)),
359362
?assertEqual({utf8, <<"apple">>}, mc:x_header(<<"x-stream-filter">>, Msg)),
360-
?assert(is_integer(mc:get_annotation(rts, Msg))),
363+
ReceivedTs = mc:get_annotation(rts, Msg),
364+
?assert(is_integer(ReceivedTs)),
361365

362366
%% array type non x-headers cannot be converted into amqp
363367
RoutingHeaders = maps:remove(<<"a-array">>, mc:routing_headers(Msg, [])),
364368

365369
%% roundtrip to binary
366370
Msg10Pre = mc:convert(mc_amqp, Msg),
367371
Payload = iolist_to_binary(mc:protocol_state(Msg10Pre)),
368-
Msg10 = mc:init(mc_amqp, Payload, #{}),
372+
Msg10 = mc_amqp:init_from_stream(Payload, #{}),
373+
374+
%% mc annotations should be recovered when reading from a stream.
375+
?assertEqual(<<"exch">>, mc:exchange(Msg10)),
376+
?assertEqual([<<"apple">>], mc:routing_keys(Msg10)),
377+
?assertEqual(ReceivedTs, mc:get_annotation(rts, Msg10)),
378+
369379
?assertMatch(#{<<"x-exchange">> := {utf8, <<"exch">>},
370-
<<"x-routing-key">> := {utf8, <<"apple">>}},
380+
<<"x-routing-key">> := {utf8, <<"apple">>},
381+
<<"x-opt-rabbitmq-received-time">> := {timestamp, ReceivedTs}},
371382
mc:x_headers(Msg10)),
372383
?assertEqual(98, mc:priority(Msg10)),
373384
?assertEqual(true, mc:is_persistent(Msg10)),
@@ -379,7 +390,6 @@ amqpl_amqp_bin_amqpl(_Config) ->
379390
%% at this point the type is now present as a message annotation
380391
?assertEqual({utf8, <<"45">>}, mc:x_header(<<"x-basic-type">>, Msg10)),
381392
?assertEqual(RoutingHeaders, mc:routing_headers(Msg10, [])),
382-
?assert(is_integer(mc:get_annotation(rts, Msg10))),
383393

384394
Sections = amqp10_framing:decode_bin(Payload),
385395
[
@@ -435,9 +445,12 @@ amqpl_amqp_bin_amqpl(_Config) ->
435445
?assertEqual({utf8, <<"msg-id">>}, mc:message_id(MsgL2)),
436446
?assertEqual(1, mc:ttl(MsgL2)),
437447
?assertEqual({utf8, <<"apple">>}, mc:x_header(<<"x-stream-filter">>, MsgL2)),
438-
?assertEqual(RoutingHeaders, mc:routing_headers(MsgL2, [])),
439-
?assert(is_integer(mc:get_annotation(rts, MsgL2))),
440-
ok.
448+
?assertEqual(ReceivedTs, mc:get_annotation(rts, MsgL2)),
449+
RoutingHeaders2 = mc:routing_headers(MsgL2, []),
450+
?assertEqual(RoutingHeaders,
451+
maps:remove(<<"timestamp_in_ms">>, RoutingHeaders2)),
452+
453+
true = persistent_term:erase(incoming_message_interceptors).
441454

442455
amqpl_cc_amqp_bin_amqpl(_Config) ->
443456
Headers = [{<<"CC">>, array, [{longstr, <<"q1">>},

0 commit comments

Comments
 (0)