Skip to content

Commit 789fc64

Browse files
Merge pull request #13374 from rabbitmq/rabbitmq-server-12743
By @noxdafox: Revival of #9620 Pass the message to rabbit_backing_queue:discard callback
2 parents 0eb65c2 + 4dfa447 commit 789fc64

File tree

4 files changed

+10
-20
lines changed

4 files changed

+10
-20
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ discard(#delivery{confirm = Confirm,
648648
true -> confirm_messages([MsgId], MTC, QName);
649649
false -> MTC
650650
end,
651-
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
651+
BQS1 = BQ:discard(Msg, SenderPid, BQS),
652652
{BQS1, MTC1}.
653653

654654
run_message_queue(ActiveConsumersChanged, State) ->
@@ -828,7 +828,7 @@ send_reject_publish(#delivery{confirm = true,
828828
amqqueue:get_name(Q), MsgSeqNo),
829829

830830
MTC1 = maps:remove(MsgId, MTC),
831-
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
831+
BQS1 = BQ:discard(Msg, SenderPid, BQS),
832832
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
833833
send_reject_publish(#delivery{confirm = false}, State) ->
834834
State.

deps/rabbit/src/rabbit_backing_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105

106106
%% Called to inform the BQ about messages which have reached the
107107
%% queue, but are not going to be further passed to BQ.
108-
-callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
108+
-callback discard(rabbit_types:basic_message(), pid(), state()) -> state().
109109

110110
%% Return ids of messages which have been confirmed since the last
111111
%% invocation of this function (or initialisation).

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -220,22 +220,12 @@ publish_delivered(Msg, MsgProps, ChPid,
220220
State = #passthrough{bq = BQ, bqs = BQS}) ->
221221
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, BQS)).
222222

223-
%% TODO this is a hack. The BQ api does not give us enough information
224-
%% here - if we had the Msg we could look at its priority and forward
225-
%% to the appropriate sub-BQ. But we don't so we are stuck.
226-
%%
227-
%% But fortunately VQ ignores discard/4, so we can too, *assuming we
228-
%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
229-
%% (if in use) so we don't break that either, just some hypothetical
230-
%% alternate BQ implementation.
231-
discard(_MsgId, _ChPid, State = #state{}) ->
232-
State;
233-
%% We should have something a bit like this here:
234-
%% pick1(fun (_P, BQSN) ->
235-
%% BQ:discard(MsgId, ChPid, BQSN)
236-
%% end, Msg, State);
237-
discard(MsgId, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
238-
?passthrough1(discard(MsgId, ChPid, BQS)).
223+
discard(Msg, ChPid, State = #state{bq = BQ}) ->
224+
pick1(fun (_P, BQSN) ->
225+
BQ:discard(Msg, ChPid, BQSN)
226+
end, Msg, State);
227+
discard(Msg, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
228+
?passthrough1(discard(Msg, ChPid, BQS)).
239229

240230
drain_confirmed(State = #state{bq = BQ}) ->
241231
fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ publish_delivered(Msg, MsgProps, ChPid, State) ->
544544
State),
545545
{SeqId, a(maybe_update_rates(State1))}.
546546

547-
discard(_MsgId, _ChPid, State) -> State.
547+
discard(_Msg, _ChPid, State) -> State.
548548

549549
drain_confirmed(State = #vqstate { confirmed = C }) ->
550550
case sets:is_empty(C) of

0 commit comments

Comments
 (0)