Skip to content

Commit cb8add2

Browse files
committed
Keep the Channel in clients receivers & senders
Channel contains the Arc<ChannelInner> so that ```rust let streaming_stuff = Client::new(init_channel()).some_call(); ``` does not die due to channel being dropped at the end of the line. Signed-off-by: Thomas Bessou <[email protected]>
1 parent 963c6c3 commit cb8add2

File tree

1 file changed

+42
-8
lines changed

1 file changed

+42
-8
lines changed

src/call/client.rs

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,12 @@ impl Call {
129129
tag,
130130
)
131131
});
132-
Ok(ClientUnaryReceiver::new(call, cq_f, method.resp_de()))
132+
Ok(ClientUnaryReceiver::new(
133+
call,
134+
cq_f,
135+
method.resp_de(),
136+
channel,
137+
))
133138
}
134139

135140
pub fn client_streaming<Req, Resp>(
@@ -151,11 +156,12 @@ impl Call {
151156
});
152157

153158
let share_call = Arc::new(SpinLock::new(ShareCall::new(call, cq_f)));
154-
let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser());
159+
let sink = ClientCStreamSender::new(share_call.clone(), method.req_ser(), channel);
155160
let recv = ClientCStreamReceiver {
156161
call: share_call,
157162
resp_de: method.resp_de(),
158163
finished: false,
164+
_channel_keepalive: channel.clone(),
159165
};
160166
Ok((sink, recv))
161167
}
@@ -189,7 +195,12 @@ impl Call {
189195
grpc_sys::grpcwrap_call_recv_initial_metadata(call.call, ctx, tag)
190196
});
191197

192-
Ok(ClientSStreamReceiver::new(call, cq_f, method.resp_de()))
198+
Ok(ClientSStreamReceiver::new(
199+
call,
200+
cq_f,
201+
method.resp_de(),
202+
channel,
203+
))
193204
}
194205

195206
pub fn duplex_streaming<Req, Resp>(
@@ -216,8 +227,8 @@ impl Call {
216227
});
217228

218229
let share_call = Arc::new(SpinLock::new(ShareCall::new(call, cq_f)));
219-
let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser());
220-
let recv = ClientDuplexReceiver::new(share_call, method.resp_de());
230+
let sink = ClientDuplexSender::new(share_call.clone(), method.req_ser(), channel);
231+
let recv = ClientDuplexReceiver::new(share_call, method.resp_de(), channel);
221232
Ok((sink, recv))
222233
}
223234
}
@@ -230,14 +241,21 @@ pub struct ClientUnaryReceiver<T> {
230241
call: Call,
231242
resp_f: BatchFuture,
232243
resp_de: DeserializeFn<T>,
244+
_channel_keepalive: Channel,
233245
}
234246

235247
impl<T> ClientUnaryReceiver<T> {
236-
fn new(call: Call, resp_f: BatchFuture, resp_de: DeserializeFn<T>) -> ClientUnaryReceiver<T> {
248+
fn new(
249+
call: Call,
250+
resp_f: BatchFuture,
251+
resp_de: DeserializeFn<T>,
252+
_channel_keepalive: &Channel,
253+
) -> ClientUnaryReceiver<T> {
237254
ClientUnaryReceiver {
238255
call,
239256
resp_f,
240257
resp_de,
258+
_channel_keepalive: _channel_keepalive.clone(),
241259
}
242260
}
243261

@@ -276,6 +294,7 @@ pub struct ClientCStreamReceiver<T> {
276294
call: Arc<SpinLock<ShareCall>>,
277295
resp_de: DeserializeFn<T>,
278296
finished: bool,
297+
_channel_keepalive: Channel,
279298
}
280299

281300
impl<T> ClientCStreamReceiver<T> {
@@ -326,15 +345,21 @@ pub struct StreamingCallSink<Req> {
326345
sink_base: SinkBase,
327346
close_f: Option<BatchFuture>,
328347
req_ser: SerializeFn<Req>,
348+
_channel_keepalive: Channel,
329349
}
330350

331351
impl<Req> StreamingCallSink<Req> {
332-
fn new(call: Arc<SpinLock<ShareCall>>, req_ser: SerializeFn<Req>) -> StreamingCallSink<Req> {
352+
fn new(
353+
call: Arc<SpinLock<ShareCall>>,
354+
req_ser: SerializeFn<Req>,
355+
_channel_keepalive: &Channel,
356+
) -> StreamingCallSink<Req> {
333357
StreamingCallSink {
334358
call,
335359
sink_base: SinkBase::new(false),
336360
close_f: None,
337361
req_ser,
362+
_channel_keepalive: _channel_keepalive.clone(),
338363
}
339364
}
340365

@@ -490,17 +515,20 @@ impl<H: ShareCallHolder, T> ResponseStreamImpl<H, T> {
490515
#[must_use = "if unused the ClientSStreamReceiver may immediately cancel the RPC"]
491516
pub struct ClientSStreamReceiver<Resp> {
492517
imp: ResponseStreamImpl<ShareCall, Resp>,
518+
_channel_keepalive: Channel,
493519
}
494520

495521
impl<Resp> ClientSStreamReceiver<Resp> {
496522
fn new(
497523
call: Call,
498524
finish_f: BatchFuture,
499525
de: DeserializeFn<Resp>,
526+
_channel_keepalive: &Channel,
500527
) -> ClientSStreamReceiver<Resp> {
501528
let share_call = ShareCall::new(call, finish_f);
502529
ClientSStreamReceiver {
503530
imp: ResponseStreamImpl::new(share_call, de),
531+
_channel_keepalive: _channel_keepalive.clone(),
504532
}
505533
}
506534

@@ -528,12 +556,18 @@ impl<Resp> Stream for ClientSStreamReceiver<Resp> {
528556
#[must_use = "if unused the ClientDuplexReceiver may immediately cancel the RPC"]
529557
pub struct ClientDuplexReceiver<Resp> {
530558
imp: ResponseStreamImpl<Arc<SpinLock<ShareCall>>, Resp>,
559+
_channel_keepalive: Channel,
531560
}
532561

533562
impl<Resp> ClientDuplexReceiver<Resp> {
534-
fn new(call: Arc<SpinLock<ShareCall>>, de: DeserializeFn<Resp>) -> ClientDuplexReceiver<Resp> {
563+
fn new(
564+
call: Arc<SpinLock<ShareCall>>,
565+
de: DeserializeFn<Resp>,
566+
_channel_keepalive: &Channel,
567+
) -> ClientDuplexReceiver<Resp> {
535568
ClientDuplexReceiver {
536569
imp: ResponseStreamImpl::new(call, de),
570+
_channel_keepalive: _channel_keepalive.clone(),
537571
}
538572
}
539573

0 commit comments

Comments
 (0)