Skip to content

Commit 4dfa447

Browse files
noxdafoxmichaelklishin
authored andcommitted
Adopt new rabbit_backing_queue:discard implementation
Signed-off-by: Matteo Cafasso <[email protected]> (cherry picked from commit facddb3)
1 parent d6a19bb commit 4dfa447

File tree

3 files changed

+9
-19
lines changed

3 files changed

+9
-19
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ discard(#delivery{confirm = Confirm,
648648
true -> confirm_messages([MsgId], MTC, QName);
649649
false -> MTC
650650
end,
651-
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
651+
BQS1 = BQ:discard(Msg, SenderPid, BQS),
652652
{BQS1, MTC1}.
653653

654654
run_message_queue(ActiveConsumersChanged, State) ->
@@ -828,7 +828,7 @@ send_reject_publish(#delivery{confirm = true,
828828
amqqueue:get_name(Q), MsgSeqNo),
829829

830830
MTC1 = maps:remove(MsgId, MTC),
831-
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
831+
BQS1 = BQ:discard(Msg, SenderPid, BQS),
832832
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
833833
send_reject_publish(#delivery{confirm = false}, State) ->
834834
State.

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -220,22 +220,12 @@ publish_delivered(Msg, MsgProps, ChPid,
220220
State = #passthrough{bq = BQ, bqs = BQS}) ->
221221
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, BQS)).
222222

223-
%% TODO this is a hack. The BQ api does not give us enough information
224-
%% here - if we had the Msg we could look at its priority and forward
225-
%% to the appropriate sub-BQ. But we don't so we are stuck.
226-
%%
227-
%% But fortunately VQ ignores discard/4, so we can too, *assuming we
228-
%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
229-
%% (if in use) so we don't break that either, just some hypothetical
230-
%% alternate BQ implementation.
231-
discard(_MsgId, _ChPid, State = #state{}) ->
232-
State;
233-
%% We should have something a bit like this here:
234-
%% pick1(fun (_P, BQSN) ->
235-
%% BQ:discard(MsgId, ChPid, BQSN)
236-
%% end, Msg, State);
237-
discard(MsgId, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
238-
?passthrough1(discard(MsgId, ChPid, BQS)).
223+
discard(Msg, ChPid, State = #state{bq = BQ}) ->
224+
pick1(fun (_P, BQSN) ->
225+
BQ:discard(Msg, ChPid, BQSN)
226+
end, Msg, State);
227+
discard(Msg, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
228+
?passthrough1(discard(Msg, ChPid, BQS)).
239229

240230
drain_confirmed(State = #state{bq = BQ}) ->
241231
fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ publish_delivered(Msg, MsgProps, ChPid, State) ->
544544
State),
545545
{SeqId, a(maybe_update_rates(State1))}.
546546

547-
discard(_MsgId, _ChPid, State) -> State.
547+
discard(_Msg, _ChPid, State) -> State.
548548

549549
drain_confirmed(State = #vqstate { confirmed = C }) ->
550550
case sets:is_empty(C) of

0 commit comments

Comments
 (0)