47
47
send_limit :: non_neg_integer (),
48
48
log :: undefined | osiris_log :state (),
49
49
last_listener_offset = undefined :: undefined | osiris :offset ()}).
50
+ -record (request ,
51
+ {start :: integer (),
52
+ content :: term ()}).
50
53
-record (stream_connection_state ,
51
54
{data :: rabbit_stream_core :state (), blocked :: boolean (),
52
55
consumers :: #{subscription_id () => # consumer {}}}).
89
92
transport :: tcp | ssl ,
90
93
proxy_socket :: undefined | ranch_transport :socket (),
91
94
correlation_id_sequence :: integer (),
92
- outstanding_requests :: #{integer () => term ()},
93
- deliver_version :: rabbit_stream_core :command_version ()}).
95
+ outstanding_requests :: #{integer () => # request {}},
96
+ deliver_version :: rabbit_stream_core :command_version (),
97
+ request_timeout :: pos_integer (),
98
+ outstanding_requests_timer :: undefined | timer :tref ()}).
94
99
-record (configuration ,
95
100
{initial_credits :: integer (),
96
101
credits_required_for_unblocking :: integer (),
@@ -223,6 +228,8 @@ init([KeepaliveSup,
223
228
socket_op (Sock ,
224
229
fun (S ) -> rabbit_net :socket_ends (S , inbound ) end ),
225
230
DeliverVersion = ? VERSION_1 ,
231
+ RequestTimeout = application :get_env (rabbitmq_stream ,
232
+ request_timeout , 60_000 ),
226
233
Connection =
227
234
# stream_connection {name =
228
235
rabbit_data_coercion :to_binary (ConnStr ),
@@ -249,6 +256,7 @@ init([KeepaliveSup,
249
256
rabbit_net :maybe_get_proxy_socket (Sock ),
250
257
correlation_id_sequence = 0 ,
251
258
outstanding_requests = #{},
259
+ request_timeout = RequestTimeout ,
252
260
deliver_version = DeliverVersion },
253
261
State =
254
262
# stream_connection_state {consumers = #{},
@@ -962,6 +970,37 @@ open(info, emit_stats,
962
970
StatemData ) ->
963
971
Connection1 = emit_stats (Connection , State ),
964
972
{keep_state , StatemData # statem_data {connection = Connection1 }};
973
+ open (info , check_outstanding_requests ,
974
+ # statem_data {connection = # stream_connection {outstanding_requests = Requests ,
975
+ request_timeout = Timeout } = Connection0 } =
976
+ StatemData ) ->
977
+ Time = erlang :monotonic_time (millisecond ),
978
+ rabbit_log :debug (" Checking outstanding requests at ~tp : ~tp " , [Time , Requests ]),
979
+ HasTimedOut = maps :fold (fun (_ , # request {}, true ) ->
980
+ true ;
981
+ (K , # request {content = Ctnt , start = Start }, false ) ->
982
+ case (Time - Start ) > Timeout of
983
+ true ->
984
+ rabbit_log :debug (" Request ~tp with content ~tp has timed out" ,
985
+ [K , Ctnt ]),
986
+
987
+ true ;
988
+ false ->
989
+ false
990
+ end
991
+ end , false , Requests ),
992
+ case HasTimedOut of
993
+ true ->
994
+ rabbit_log_connection :info (" Forcing stream connection ~tp closing: request to client timed out" ,
995
+ [self ()]),
996
+ _ = demonitor_all_streams (Connection0 ),
997
+ {stop , {request_timeout , <<" Request timeout" >>}};
998
+ false ->
999
+ Connection1 = ensure_outstanding_requests_timer (
1000
+ Connection0 # stream_connection {outstanding_requests_timer = undefined }
1001
+ ),
1002
+ {keep_state , StatemData # statem_data {connection = Connection1 }}
1003
+ end ;
965
1004
open (info , {shutdown , Explanation } = Reason ,
966
1005
# statem_data {connection = Connection }) ->
967
1006
% % rabbitmq_management or rabbitmq_stream_management plugin
@@ -2531,7 +2570,7 @@ handle_frame_post_auth(Transport,
2531
2570
[RC ])
2532
2571
end ,
2533
2572
case maps :take (CorrelationId , Requests0 ) of
2534
- {#{ subscription_id := SubscriptionId } = Msg , Rs } ->
2573
+ {# request { content = #{ subscription_id := SubscriptionId } = Msg } , Rs } ->
2535
2574
Stream = stream_from_consumers (SubscriptionId , Consumers ),
2536
2575
rabbit_log :debug (" Received consumer update response for subscription "
2537
2576
" ~tp on stream ~tp , correlation ID ~tp " ,
@@ -2904,31 +2943,58 @@ maybe_register_consumer(VirtualHost,
2904
2943
Active .
2905
2944
2906
2945
maybe_send_consumer_update (Transport ,
2907
- Connection =
2908
- # stream_connection {socket = S ,
2909
- correlation_id_sequence = CorrIdSeq ,
2910
- outstanding_requests = OutstandingRequests0 },
2946
+ Connection = # stream_connection {
2947
+ socket = S ,
2948
+ correlation_id_sequence = CorrIdSeq },
2911
2949
Consumer ,
2912
2950
Active ,
2913
2951
Msg ) ->
2914
- # consumer {configuration = # consumer_configuration {
2915
- stream = Stream ,
2916
- subscription_id = SubscriptionId
2917
- }} = Consumer ,
2918
- rabbit_log :debug (" SAC subscription ~tp on ~tp , active = ~tp , " ++
2919
- " sending consumer update with correlation ID ~tp " ,
2920
- [SubscriptionId , Stream , Active , CorrIdSeq ]),
2921
- Frame =
2922
- rabbit_stream_core :frame ({request , CorrIdSeq ,
2923
- {consumer_update , SubscriptionId , Active }}),
2952
+ # consumer {configuration =
2953
+ # consumer_configuration {subscription_id = SubscriptionId }} = Consumer ,
2954
+ Frame = rabbit_stream_core :frame ({request , CorrIdSeq ,
2955
+ {consumer_update , SubscriptionId , Active }}),
2956
+
2957
+ Connection1 = register_request (Connection , Msg ),
2924
2958
2925
- OutstandingRequests1 =
2926
- maps :put (CorrIdSeq ,
2927
- Msg ,
2928
- OutstandingRequests0 ),
2929
2959
send (Transport , S , Frame ),
2930
- Connection # stream_connection {correlation_id_sequence = CorrIdSeq + 1 ,
2931
- outstanding_requests = OutstandingRequests1 }.
2960
+ Connection1 .
2961
+
2962
+ register_request (# stream_connection {outstanding_requests = Requests0 ,
2963
+ correlation_id_sequence = CorrIdSeq } = C ,
2964
+ RequestContent ) ->
2965
+ rabbit_log :debug (" Registering RPC request ~tp with correlation ID ~tp " ,
2966
+ [RequestContent , CorrIdSeq ]),
2967
+
2968
+ Requests1 = maps :put (CorrIdSeq , request (RequestContent ), Requests0 ),
2969
+
2970
+ ensure_outstanding_requests_timer (
2971
+ C # stream_connection {correlation_id_sequence = CorrIdSeq + 1 ,
2972
+ outstanding_requests = Requests1 }).
2973
+
2974
+ request (Content ) ->
2975
+ # request {start = erlang :monotonic_time (millisecond ),
2976
+ content = Content }.
2977
+
2978
+ ensure_outstanding_requests_timer (# stream_connection {
2979
+ outstanding_requests = Requests ,
2980
+ outstanding_requests_timer = undefined
2981
+ } = C ) when map_size (Requests ) =:= 0 ->
2982
+ C ;
2983
+ ensure_outstanding_requests_timer (# stream_connection {
2984
+ outstanding_requests = Requests ,
2985
+ outstanding_requests_timer = TRef
2986
+ } = C ) when map_size (Requests ) =:= 0 ->
2987
+ erlang :cancel_timer (TRef , [{async , true }, {info , false }]),
2988
+ C # stream_connection {outstanding_requests_timer = undefined };
2989
+ ensure_outstanding_requests_timer (# stream_connection {
2990
+ outstanding_requests = Requests ,
2991
+ outstanding_requests_timer = undefined ,
2992
+ request_timeout = Timeout
2993
+ } = C ) when map_size (Requests ) > 0 ->
2994
+ TRef = erlang :send_after (Timeout , self (), check_outstanding_requests ),
2995
+ C # stream_connection {outstanding_requests_timer = TRef };
2996
+ ensure_outstanding_requests_timer (C ) ->
2997
+ C .
2932
2998
2933
2999
maybe_unregister_consumer (_ , _ , false = _Sac , Requests ) ->
2934
3000
Requests ;
@@ -2945,9 +3011,10 @@ maybe_unregister_consumer(VirtualHost,
2945
3011
ConsumerName = consumer_name (Properties ),
2946
3012
2947
3013
Requests1 = maps :fold (
2948
- fun (_ , #{active := false ,
2949
- subscription_id := SubId ,
2950
- stepping_down := true }, Acc ) when SubId =:= SubscriptionId ->
3014
+ fun (_ , # request {content =
3015
+ #{active := false ,
3016
+ subscription_id := SubId ,
3017
+ stepping_down := true }}, Acc ) when SubId =:= SubscriptionId ->
2951
3018
_ = rabbit_stream_sac_coordinator :activate_consumer (VirtualHost ,
2952
3019
Stream ,
2953
3020
ConsumerName ),
0 commit comments