19
19
*/
20
20
21
21
use std:: io:: { self , Read , Write } ;
22
- use std:: time:: Duration ;
23
-
24
- use mio:: net:: TcpStream ;
25
- use mio:: { Events , Interest , Poll } ;
22
+ use std:: net:: TcpStream ;
23
+ use std:: time:: { Duration , Instant } ;
26
24
27
25
use crate :: BoxResult ;
28
26
29
27
/// how long to wait for keepalive events
30
28
/// the communications channels typically exchange data every second, so 2s is reasonable to avoid excess noise
31
- pub const KEEPALIVE_DURATION : Duration = Duration :: from_secs ( 2 ) ;
29
+ #[ cfg( unix) ]
30
+ pub const KEEPALIVE_DURATION : Duration = Duration :: from_secs ( 3 ) ;
32
31
33
32
/// how long to block on polling operations
34
33
const POLL_TIMEOUT : Duration = Duration :: from_millis ( 50 ) ;
35
34
35
+ /// how long to allow for send-operations to complete
36
+ const SEND_TIMEOUT : Duration = Duration :: from_secs ( 5 ) ;
37
+
36
38
/// sends JSON data over a client-server communications stream
37
39
pub fn send ( stream : & mut TcpStream , message : & serde_json:: Value ) -> BoxResult < ( ) > {
40
+ stream. set_write_timeout ( Some ( POLL_TIMEOUT ) ) ?;
41
+
38
42
let serialised_message = serde_json:: to_vec ( message) ?;
39
43
40
44
log:: debug!(
@@ -46,61 +50,72 @@ pub fn send(stream: &mut TcpStream, message: &serde_json::Value) -> BoxResult<()
46
50
let mut output_buffer = vec ! [ 0_u8 ; serialised_message. len( ) + 2 ] ;
47
51
output_buffer[ ..2 ] . copy_from_slice ( & ( serialised_message. len ( ) as u16 ) . to_be_bytes ( ) ) ;
48
52
output_buffer[ 2 ..] . copy_from_slice ( serialised_message. as_slice ( ) ) ;
49
- Ok ( stream. write_all ( & output_buffer) ?)
53
+
54
+ let start = Instant :: now ( ) ;
55
+ let mut total_bytes_written: usize = 0 ;
56
+
57
+ while start. elapsed ( ) < SEND_TIMEOUT {
58
+ match stream. write ( & output_buffer[ total_bytes_written..] ) {
59
+ Ok ( bytes_written) => {
60
+ total_bytes_written += bytes_written;
61
+ if total_bytes_written == output_buffer. len ( ) {
62
+ return Ok ( ( ) ) ;
63
+ }
64
+ }
65
+ Err ( ref e) if e. kind ( ) == std:: io:: ErrorKind :: WouldBlock || e. kind ( ) == std:: io:: ErrorKind :: TimedOut => {
66
+ // unable to write at the moment; keep trying until the full timeout is reached
67
+ continue ;
68
+ }
69
+ Err ( e) => {
70
+ return Err ( Box :: new ( e) ) ;
71
+ }
72
+ }
73
+ }
74
+ Err ( Box :: new ( simple_error:: simple_error!(
75
+ "timed out while attempting to send status-message to {}" ,
76
+ stream. peer_addr( ) ?
77
+ ) ) )
50
78
}
51
79
52
80
/// receives the length-count of a pending message over a client-server communications stream
53
81
fn receive_length ( stream : & mut TcpStream , alive_check : fn ( ) -> bool , results_handler : & mut dyn FnMut ( ) -> BoxResult < ( ) > ) -> BoxResult < u16 > {
54
- let mio_token = crate :: get_global_token ( ) ;
55
- let mut poll = Poll :: new ( ) ?;
56
- poll. registry ( ) . register ( stream, mio_token, Interest :: READABLE ) ?;
57
- let mut events = Events :: with_capacity ( 1 ) ; //only interacting with one stream
82
+ stream. set_read_timeout ( Some ( POLL_TIMEOUT ) ) . expect ( "unable to set TCP read-timeout" ) ;
58
83
59
84
let mut length_bytes_read = 0 ;
60
85
let mut length_spec: [ u8 ; 2 ] = [ 0 ; 2 ] ;
61
- let result: BoxResult < u16 > = ' exiting: loop {
62
- if !alive_check ( ) {
63
- break ' exiting Ok ( 0 ) ;
64
- }
86
+ while alive_check ( ) {
65
87
//waiting to find out how long the next message is
66
88
results_handler ( ) ?; //send any outstanding results between cycles
67
- poll. poll ( & mut events, Some ( POLL_TIMEOUT ) ) ?;
68
- for event in events. iter ( ) {
69
- event. token ( ) ;
70
- loop {
71
- let size = match stream. read ( & mut length_spec[ length_bytes_read..] ) {
72
- Ok ( size) => size,
73
- Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock => {
74
- //nothing left to process
75
- break ;
76
- }
77
- Err ( e) => {
78
- break ' exiting Err ( Box :: new ( e) ) ;
79
- }
80
- } ;
81
-
82
- if size == 0 {
83
- if alive_check ( ) {
84
- break ' exiting Err ( Box :: new ( simple_error:: simple_error!( "connection lost" ) ) ) ;
85
- } else {
86
- //shutting down; a disconnect is expected
87
- break ' exiting Err ( Box :: new ( simple_error:: simple_error!( "local shutdown requested" ) ) ) ;
88
- }
89
- }
90
89
91
- length_bytes_read += size;
92
- if length_bytes_read == 2 {
93
- let length = u16:: from_be_bytes ( length_spec) ;
94
- log:: debug!( "received length-spec of {} from {}" , length, stream. peer_addr( ) ?) ;
95
- break ' exiting Ok ( length) ;
96
- } else {
97
- log:: debug!( "received partial length-spec from {}" , stream. peer_addr( ) ?) ;
98
- }
90
+ let size = match stream. read ( & mut length_spec[ length_bytes_read..] ) {
91
+ Ok ( size) => size,
92
+ Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock || e. kind ( ) == std:: io:: ErrorKind :: TimedOut => {
93
+ // nothing available to process
94
+ continue ;
95
+ }
96
+ Err ( e) => {
97
+ return Err ( Box :: new ( e) ) ;
98
+ }
99
+ } ;
100
+ if size == 0 {
101
+ if alive_check ( ) {
102
+ return Err ( Box :: new ( simple_error:: simple_error!( "connection lost" ) ) ) ;
103
+ } else {
104
+ // shutting down; a disconnect is expected
105
+ return Err ( Box :: new ( simple_error:: simple_error!( "local shutdown requested" ) ) ) ;
99
106
}
100
107
}
101
- } ;
102
- poll. registry ( ) . deregister ( stream) ?;
103
- result
108
+
109
+ length_bytes_read += size;
110
+ if length_bytes_read == 2 {
111
+ let length = u16:: from_be_bytes ( length_spec) ;
112
+ log:: debug!( "received length-spec of {} from {}" , length, stream. peer_addr( ) ?) ;
113
+ return Ok ( length) ;
114
+ } else {
115
+ log:: debug!( "received partial length-spec from {}" , stream. peer_addr( ) ?) ;
116
+ }
117
+ }
118
+ Err ( Box :: new ( simple_error:: simple_error!( "system shutting down" ) ) )
104
119
}
105
120
106
121
/// receives the data-value of a pending message over a client-server communications stream
@@ -110,62 +125,49 @@ fn receive_payload(
110
125
results_handler : & mut dyn FnMut ( ) -> BoxResult < ( ) > ,
111
126
length : u16 ,
112
127
) -> BoxResult < serde_json:: Value > {
113
- let mio_token = crate :: get_global_token ( ) ;
114
- let mut poll = Poll :: new ( ) ?;
115
- poll. registry ( ) . register ( stream, mio_token, Interest :: READABLE ) ?;
116
- let mut events = Events :: with_capacity ( 1 ) ; //only interacting with one stream
128
+ stream. set_read_timeout ( Some ( POLL_TIMEOUT ) ) . expect ( "unable to set TCP read-timeout" ) ;
117
129
118
130
let mut bytes_read = 0 ;
119
131
let mut buffer = vec ! [ 0_u8 ; length. into( ) ] ;
120
- let result: BoxResult < serde_json:: Value > = ' exiting: loop {
121
- if !alive_check ( ) {
122
- break ' exiting Ok ( serde_json:: from_slice ( & buffer[ 0 ..0 ] ) ?) ;
123
- }
132
+ while alive_check ( ) {
124
133
//waiting to receive the payload
125
134
results_handler ( ) ?; //send any outstanding results between cycles
126
- poll. poll ( & mut events, Some ( POLL_TIMEOUT ) ) ?;
127
- for event in events. iter ( ) {
128
- event. token ( ) ;
129
- loop {
130
- let size = match stream. read ( & mut buffer[ bytes_read..] ) {
131
- Ok ( size) => size,
132
- Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock => {
133
- // nothing left to process
134
- break ;
135
- }
136
- Err ( e) => {
137
- break ' exiting Err ( Box :: new ( e) ) ;
138
- }
139
- } ;
140
-
141
- if size == 0 {
142
- if alive_check ( ) {
143
- break ' exiting Err ( Box :: new ( simple_error:: simple_error!( "connection lost" ) ) ) ;
144
- } else {
145
- // shutting down; a disconnect is expected
146
- break ' exiting Err ( Box :: new ( simple_error:: simple_error!( "local shutdown requested" ) ) ) ;
147
- }
148
- }
149
135
150
- bytes_read += size;
151
- if bytes_read == length as usize {
152
- match serde_json:: from_slice ( & buffer) {
153
- Ok ( v) => {
154
- log:: debug!( "received {:?} from {}" , v, stream. peer_addr( ) ?) ;
155
- break ' exiting Ok ( v) ;
156
- }
157
- Err ( e) => {
158
- break ' exiting Err ( Box :: new ( e) ) ;
159
- }
160
- }
161
- } else {
162
- log:: debug!( "received partial payload from {}" , stream. peer_addr( ) ?) ;
136
+ let size = match stream. read ( & mut buffer[ bytes_read..] ) {
137
+ Ok ( size) => size,
138
+ Err ( ref e) if e. kind ( ) == io:: ErrorKind :: WouldBlock || e. kind ( ) == std:: io:: ErrorKind :: TimedOut => {
139
+ // nothing available to process
140
+ continue ;
141
+ }
142
+ Err ( e) => {
143
+ return Err ( Box :: new ( e) ) ;
144
+ }
145
+ } ;
146
+ if size == 0 {
147
+ if alive_check ( ) {
148
+ return Err ( Box :: new ( simple_error:: simple_error!( "connection lost" ) ) ) ;
149
+ } else {
150
+ //shutting down; a disconnect is expected
151
+ return Err ( Box :: new ( simple_error:: simple_error!( "local shutdown requested" ) ) ) ;
152
+ }
153
+ }
154
+
155
+ bytes_read += size;
156
+ if bytes_read == length as usize {
157
+ match serde_json:: from_slice ( & buffer) {
158
+ Ok ( v) => {
159
+ log:: debug!( "received {:?} from {}" , v, stream. peer_addr( ) ?) ;
160
+ return Ok ( v) ;
161
+ }
162
+ Err ( e) => {
163
+ return Err ( Box :: new ( e) ) ;
163
164
}
164
165
}
166
+ } else {
167
+ log:: debug!( "received partial payload from {}" , stream. peer_addr( ) ?) ;
165
168
}
166
- } ;
167
- poll. registry ( ) . deregister ( stream) ?;
168
- result
169
+ }
170
+ Err ( Box :: new ( simple_error:: simple_error!( "system shutting down" ) ) )
169
171
}
170
172
171
173
/// handles the full process of retrieving a message from a client-server communications stream
0 commit comments