Skip to content

Commit c0d22bf

Browse files
Merge pull request #7816 from rabbitmq/mergify/bp/v3.12.x/pr-7802
Pass the message to `rabbit_backing_queue:discard` callback (backport #7802)
2 parents d01cdf7 + 1e1a02c commit c0d22bf

6 files changed

+24
-30
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -641,12 +641,13 @@ send_mandatory(#delivery{mandatory = true,
641641
discard(#delivery{confirm = Confirm,
642642
sender = SenderPid,
643643
flow = Flow,
644-
message = #basic_message{id = MsgId}}, BQ, BQS, MTC, QName) ->
644+
message = Message}, BQ, BQS, MTC, QName) ->
645+
#basic_message{id = MsgId} = Message,
645646
MTC1 = case Confirm of
646647
true -> confirm_messages([MsgId], MTC, QName);
647648
false -> MTC
648649
end,
649-
BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
650+
BQS1 = BQ:discard(Message, SenderPid, Flow, BQS),
650651
{BQS1, MTC1}.
651652

652653
run_message_queue(State) -> run_message_queue(false, State).
@@ -809,7 +810,7 @@ send_reject_publish(#delivery{confirm = true,
809810
sender = SenderPid,
810811
flow = Flow,
811812
msg_seq_no = MsgSeqNo,
812-
message = #basic_message{id = MsgId}},
813+
message = Message},
813814
_Delivered,
814815
State = #q{ q = Q,
815816
backing_queue = BQ,
@@ -818,8 +819,9 @@ send_reject_publish(#delivery{confirm = true,
818819
ok = rabbit_classic_queue:send_rejection(SenderPid,
819820
amqqueue:get_name(Q), MsgSeqNo),
820821

822+
#basic_message{id = MsgId} = Message,
821823
MTC1 = maps:remove(MsgId, MTC),
822-
BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
824+
BQS1 = BQ:discard(Message, SenderPid, Flow, BQS),
823825
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
824826
send_reject_publish(#delivery{confirm = false},
825827
_Delivered, State) ->

deps/rabbit/src/rabbit_backing_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@
117117

118118
%% Called to inform the BQ about messages which have reached the
119119
%% queue, but are not going to be further passed to BQ.
120-
-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
120+
-callback discard(rabbit_types:basic_message(), pid(), flow(), state()) -> state().
121121

122122
%% Return ids of messages which have been confirmed since the last
123123
%% invocation of this function (or initialisation).

deps/rabbit/src/rabbit_mirror_queue_master.erl

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -293,15 +293,16 @@ batch_publish_delivered(Publishes, ChPid, Flow,
293293
State1 = State #state { backing_queue_state = BQS1 },
294294
{AckTags, ensure_monitoring(ChPid, State1)}.
295295

296-
discard(MsgId, ChPid, Flow, State = #state { gm = GM,
297-
backing_queue = BQ,
298-
backing_queue_state = BQS,
299-
seen_status = SS }) ->
296+
discard(Message = #basic_message{id = MsgId},
297+
ChPid, Flow, State = #state { gm = GM,
298+
backing_queue = BQ,
299+
backing_queue_state = BQS,
300+
seen_status = SS }) ->
300301
false = maps:is_key(MsgId, SS), %% ASSERTION
301-
ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
302+
ok = gm:broadcast(GM, {discard, ChPid, Flow, Message}),
302303
ensure_monitoring(ChPid,
303304
State #state { backing_queue_state =
304-
BQ:discard(MsgId, ChPid, Flow, BQS) }).
305+
BQ:discard(Message, ChPid, Flow, BQS) }).
305306

306307
dropwhile(Pred, State = #state{backing_queue = BQ,
307308
backing_queue_state = BQS }) ->

deps/rabbit/src/rabbit_mirror_queue_slave.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -957,11 +957,12 @@ process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) ->
957957
end, State1 #state { backing_queue_state = BQS1 },
958958
MsgIdsAndAcks),
959959
{ok, State2};
960-
process_instruction({discard, ChPid, Flow, MsgId}, State) ->
960+
process_instruction({discard, ChPid, Flow,
961+
Msg = #basic_message { id = MsgId }}, State) ->
961962
maybe_flow_ack(ChPid, Flow),
962963
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
963964
publish_or_discard(discarded, ChPid, MsgId, State),
964-
BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS),
965+
BQS1 = BQ:discard(Msg, ChPid, Flow, BQS),
965966
{ok, State1 #state { backing_queue_state = BQS1 }};
966967
process_instruction({drop, Length, Dropped, AckRequired},
967968
State = #state { backing_queue = BQ,

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -249,22 +249,12 @@ batch_publish_delivered(Publishes, ChPid, Flow,
249249
State = #passthrough{bq = BQ, bqs = BQS}) ->
250250
?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)).
251251

252-
%% TODO this is a hack. The BQ api does not give us enough information
253-
%% here - if we had the Msg we could look at its priority and forward
254-
%% to the appropriate sub-BQ. But we don't so we are stuck.
255-
%%
256-
%% But fortunately VQ ignores discard/4, so we can too, *assuming we
257-
%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
258-
%% (if in use) so we don't break that either, just some hypothetical
259-
%% alternate BQ implementation.
260-
discard(_MsgId, _ChPid, _Flow, State = #state{}) ->
261-
State;
262-
%% We should have something a bit like this here:
263-
%% pick1(fun (_P, BQSN) ->
264-
%% BQ:discard(MsgId, ChPid, Flow, BQSN)
265-
%% end, Msg, State);
266-
discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) ->
267-
?passthrough1(discard(MsgId, ChPid, Flow, BQS)).
252+
discard(Msg, ChPid, Flow, State = #state{bq = BQ}) ->
253+
pick1(fun (_P, BQSN) ->
254+
BQ:discard(Msg, ChPid, Flow, BQSN)
255+
end, Msg, State);
256+
discard(Msg, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) ->
257+
?passthrough1(discard(Msg, ChPid, Flow, BQS)).
268258

269259
drain_confirmed(State = #state{bq = BQ}) ->
270260
fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -588,7 +588,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State) ->
588588
State2 = ui(State1),
589589
{lists:reverse(SeqIds), a(maybe_update_rates(State2))}.
590590

591-
discard(_MsgId, _ChPid, _Flow, State) -> State.
591+
discard(_Msg, _ChPid, _Flow, State) -> State.
592592

593593
drain_confirmed(State = #vqstate { confirmed = C }) ->
594594
case sets:is_empty(C) of

0 commit comments

Comments
 (0)