Skip to content

Commit e7273f6

Browse files
Merge pull request #10059 from rabbitmq/cache-oldest-timestamp
QQ: Cache oldest_entry_timestamp
2 parents 1ab33ca + a5ce25e commit e7273f6

File tree

2 files changed

+65
-15
lines changed

2 files changed

+65
-15
lines changed

deps/rabbit/src/rabbit_fifo.erl

+25-15
Original file line numberDiff line numberDiff line change
@@ -983,7 +983,7 @@ which_module(3) -> ?MODULE.
983983
capacity :: term(),
984984
gc = #aux_gc{} :: #aux_gc{},
985985
tick_pid,
986-
unused2}).
986+
cache = #{} :: map()}).
987987

988988
init_aux(Name) when is_atom(Name) ->
989989
%% TODO: catch specific exception throw if table already exists
@@ -1102,21 +1102,31 @@ handle_aux(_RaState, cast, tick, #?AUX{name = Name,
11021102
handle_aux(_RaState, cast, eol, #?AUX{name = Name} = Aux, Log, _) ->
11031103
ets:delete(rabbit_fifo_usage, Name),
11041104
{no_reply, Aux, Log};
1105-
handle_aux(_RaState, {call, _From}, oldest_entry_timestamp, Aux,
1105+
handle_aux(_RaState, {call, _From}, oldest_entry_timestamp,
1106+
#?AUX{cache = Cache} = Aux0,
11061107
Log0, #?MODULE{} = State) ->
1107-
{Ts, Log} = case smallest_raft_index(State) of
1108-
%% if there are no entries, we return current timestamp
1109-
%% so that any previously obtained entries are considered
1110-
%% older than this
1111-
undefined ->
1112-
{erlang:system_time(millisecond), Log0};
1113-
Idx when is_integer(Idx) ->
1114-
%% TODO: make more defensive to avoid potential crash
1115-
{{_, _, {_, Meta, _, _}}, Log1} = ra_log:fetch(Idx, Log0),
1116-
#{ts := Timestamp} = Meta,
1117-
{Timestamp, Log1}
1118-
end,
1119-
{reply, {ok, Ts}, Aux, Log};
1108+
{CachedIdx, CachedTs} = maps:get(oldest_entry, Cache, {undefined, undefined}),
1109+
case smallest_raft_index(State) of
1110+
%% if there are no entries, we return current timestamp
1111+
%% so that any previously obtained entries are considered
1112+
%% older than this
1113+
undefined ->
1114+
Aux1 = Aux0#?AUX{cache = maps:remove(oldest_entry, Cache)},
1115+
{reply, {ok, erlang:system_time(millisecond)}, Aux1, Log0};
1116+
CachedIdx ->
1117+
%% cache hit
1118+
{reply, {ok, CachedTs}, Aux0, Log0};
1119+
Idx when is_integer(Idx) ->
1120+
case ra_log:fetch(Idx, Log0) of
1121+
{{_, _, {_, #{ts := Timestamp}, _, _}}, Log1} ->
1122+
Aux1 = Aux0#?AUX{cache = Cache#{oldest_entry =>
1123+
{Idx, Timestamp}}},
1124+
{reply, {ok, Timestamp}, Aux1, Log1};
1125+
{undefined, Log1} ->
1126+
%% fetch failed
1127+
{reply, {error, failed_to_get_timestamp}, Aux0, Log1}
1128+
end
1129+
end;
11201130
handle_aux(_RaState, {call, _From}, {peek, Pos}, Aux0,
11211131
Log0, MacState) ->
11221132
case rabbit_fifo:query_peek(Pos, MacState) of

deps/rabbit/test/quorum_queue_SUITE.erl

+40
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ all_tests() ->
156156
delete_if_unused,
157157
queue_ttl,
158158
peek,
159+
oldest_entry_timestamp,
159160
peek_with_wrong_queue_type,
160161
message_ttl,
161162
message_ttl_policy,
@@ -2729,6 +2730,45 @@ peek(Config) ->
27292730
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
27302731
ok.
27312732

2733+
oldest_entry_timestamp(Config) ->
2734+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2735+
2736+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
2737+
QQ = ?config(queue_name, Config),
2738+
?assertEqual({'queue.declare_ok', QQ, 0, 0},
2739+
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
2740+
{<<"x-max-in-memory-length">>, long, 2}])),
2741+
2742+
Msg1 = <<"msg1">>,
2743+
VHost = <<"%2F">>,
2744+
ServerId = binary_to_atom(<<VHost/binary, "_", QQ/binary>>, utf8),
2745+
2746+
?assertMatch({ok, Ts} when is_integer(Ts),
2747+
rabbit_ct_broker_helpers:rpc(Config, 0, ra,
2748+
aux_command,
2749+
[ServerId, oldest_entry_timestamp])),
2750+
publish(Ch, QQ, Msg1),
2751+
wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
2752+
2753+
?assertMatch({ok, Ts} when is_integer(Ts),
2754+
rabbit_ct_broker_helpers:rpc(Config, 0, ra,
2755+
aux_command,
2756+
[ServerId, oldest_entry_timestamp])),
2757+
?assertMatch({ok, Ts} when is_integer(Ts),
2758+
rabbit_ct_broker_helpers:rpc(Config, 0, ra,
2759+
aux_command,
2760+
[ServerId, oldest_entry_timestamp])),
2761+
2762+
{'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
2763+
Now = erlang:system_time(millisecond),
2764+
timer:sleep(100),
2765+
?assertMatch({ok, Ts2} when Ts2 > Now,
2766+
rabbit_ct_broker_helpers:rpc(Config, 0, ra,
2767+
aux_command,
2768+
[ServerId, oldest_entry_timestamp])),
2769+
2770+
ok.
2771+
27322772
-define(STATUS_MATCH(N, T),
27332773
[{<<"Node Name">>, N},
27342774
{<<"Raft State">>, _},

0 commit comments

Comments
 (0)