Skip to content

Commit b8244f7

Browse files
authored
Pull from socket up to 10 times in stream test utils (#13588)
To make sure to have enough data to complete a command.
1 parent 41dfa6a commit b8244f7

File tree

3 files changed

+20
-30
lines changed

3 files changed

+20
-30
lines changed

deps/rabbitmq_ct_helpers/src/stream_test_utils.erl

+16-9
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ subscribe(Sock, C0, Stream, SubscriptionId, InitialCredit) ->
7777
{{response, 1, {subscribe, ?RESPONSE_CODE_OK}}, C1} = receive_stream_commands(Sock, C0),
7878
{ok, C1}.
7979

80+
credit(Sock, Subscription, Credit) ->
81+
CreditFrame = rabbit_stream_core:frame({credit, Subscription, Credit}),
82+
ok = gen_tcp:send(Sock, CreditFrame),
83+
ok.
84+
8085
unsubscribe(Sock, C0, SubscriptionId) ->
8186
UnsubscribeFrame = rabbit_stream_core:frame({request, 1, {unsubscribe, SubscriptionId}}),
8287
ok = gen_tcp:send(Sock, UnsubscribeFrame),
@@ -149,20 +154,22 @@ sub_batch_entry_compressed(Sequence, Bodies) ->
149154
<<Sequence:64, 1:1, 1:3, 0:4, (length(Bodies)):16, (byte_size(Uncompressed)):32,
150155
CompressedLen:32, Compressed:CompressedLen/binary>>.
151156

157+
152158
receive_stream_commands(Sock, C0) ->
159+
receive_stream_commands(gen_tcp, Sock, C0).
160+
161+
receive_stream_commands(Transport, Sock, C0) ->
162+
receive_stream_commands(Transport, Sock, C0, 10).
163+
164+
receive_stream_commands(_Transport, _Sock, C0, 0) ->
165+
rabbit_stream_core:next_command(C0);
166+
receive_stream_commands(Transport, Sock, C0, N) ->
153167
case rabbit_stream_core:next_command(C0) of
154168
empty ->
155-
case gen_tcp:recv(Sock, 0, 5000) of
169+
case Transport:recv(Sock, 0, 5000) of
156170
{ok, Data} ->
157171
C1 = rabbit_stream_core:incoming_data(Data, C0),
158-
case rabbit_stream_core:next_command(C1) of
159-
empty ->
160-
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
161-
rabbit_stream_core:next_command(
162-
rabbit_stream_core:incoming_data(Data2, C1));
163-
Res ->
164-
Res
165-
end;
172+
receive_stream_commands(Transport, Sock, C1, N - 1);
166173
{error, Err} ->
167174
ct:fail("error receiving stream data ~w", [Err])
168175
end;

deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl

+2-1
Original file line numberDiff line numberDiff line change
@@ -839,7 +839,8 @@ publish_via_stream_protocol(Stream, MsgPerBatch, Config) ->
839839
{ok, C5} = stream_test_utils:publish(S, C4, PublisherId2, SequenceFrom2, Payloads2),
840840

841841
SubscriptionId = 97,
842-
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 1),
842+
{ok, C6} = stream_test_utils:subscribe(S, C5, Stream, SubscriptionId, _InitialCredit = 0),
843+
ok = stream_test_utils:credit(S, SubscriptionId, 1),
843844
%% delivery of first batch of messages
844845
{{deliver, SubscriptionId, _Bin1}, C7} = stream_test_utils:receive_stream_commands(S, C6),
845846
{ok, S, C7}.

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

+2-20
Original file line numberDiff line numberDiff line change
@@ -1569,26 +1569,8 @@ wait_for_socket_close(Transport, S, Attempt) ->
15691569
closed
15701570
end.
15711571

1572-
receive_commands(Transport, S, C0) ->
1573-
case rabbit_stream_core:next_command(C0) of
1574-
empty ->
1575-
case Transport:recv(S, 0, 5000) of
1576-
{ok, Data} ->
1577-
C1 = rabbit_stream_core:incoming_data(Data, C0),
1578-
case rabbit_stream_core:next_command(C1) of
1579-
empty ->
1580-
{ok, Data2} = Transport:recv(S, 0, 5000),
1581-
rabbit_stream_core:next_command(
1582-
rabbit_stream_core:incoming_data(Data2, C1));
1583-
Res ->
1584-
Res
1585-
end;
1586-
{error, Err} ->
1587-
ct:fail("error receiving data ~w", [Err])
1588-
end;
1589-
Res ->
1590-
Res
1591-
end.
1572+
receive_commands(Transport, S, C) ->
1573+
stream_test_utils:receive_stream_commands(Transport, S, C).
15921574

15931575
get_osiris_counters(Config) ->
15941576
rabbit_ct_broker_helpers:rpc(Config,

0 commit comments

Comments
 (0)