Skip to content

Commit bd60710

Browse files
Outgoing cleanup (#1838)
1 parent 383f3da commit bd60710

31 files changed

+203
-220
lines changed

cpp/include/Ice/Communicator.h

+2-21
Original file line numberDiff line numberDiff line change
@@ -317,10 +317,7 @@ class ICE_CLASS(ICE_API) Communicator
317317
* @param compress Specifies whether or not the queued batch requests should be compressed before being sent over
318318
* the wire.
319319
*/
320-
virtual void flushBatchRequests(CompressBatch compress)
321-
{
322-
flushBatchRequestsAsync(compress).get();
323-
}
320+
ICE_MEMBER(ICE_API) void flushBatchRequests(CompressBatch compress);
324321

325322
/**
326323
* Flush any pending batch requests for this communicator. This means all batch requests invoked on fixed proxies
@@ -345,23 +342,7 @@ class ICE_CLASS(ICE_API) Communicator
345342
* the wire.
346343
* @return The future object for the invocation.
347344
*/
348-
template<template<typename> class P = ::std::promise>
349-
auto flushBatchRequestsAsync(CompressBatch compress)
350-
-> decltype(::std::declval<P<void>>().get_future())
351-
{
352-
using Promise = P<void>;
353-
auto promise = ::std::make_shared<Promise>();
354-
flushBatchRequestsAsync(compress,
355-
[promise](::std::exception_ptr ex)
356-
{
357-
promise->set_exception(::std::move(ex));
358-
},
359-
[promise](bool)
360-
{
361-
promise->set_value();
362-
});
363-
return promise->get_future();
364-
}
345+
ICE_MEMBER(ICE_API) std::future<void> flushBatchRequestsAsync(CompressBatch compress);
365346

366347
/**
367348
* Add the Admin object with all its facets to the provided object adapter. If <code>Ice.Admin.ServerId</code> is

cpp/include/Ice/Connection.h

+7-45
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
#include <Ice/Exception.h>
1313
#include <Ice/StreamHelpers.h>
1414
#include <Ice/Comparable.h>
15-
#include <Ice/OutgoingAsync.h>
1615
#include <optional>
1716
#include <Ice/ObjectAdapterF.h>
1817
#include <Ice/Identity.h>
@@ -304,10 +303,7 @@ class ICE_CLASS(ICE_API) Connection
304303
* @param compress Specifies whether or not the queued batch requests should be compressed before being sent over
305304
* the wire.
306305
*/
307-
virtual void flushBatchRequests(CompressBatch compress)
308-
{
309-
flushBatchRequestsAsync(compress).get();
310-
}
306+
ICE_MEMBER(ICE_API) void flushBatchRequests(CompressBatch compress);
311307

312308
/**
313309
* Flush any pending batch requests for this connection. This means all batch requests invoked on fixed proxies
@@ -330,23 +326,7 @@ class ICE_CLASS(ICE_API) Connection
330326
* the wire.
331327
* @return The future object for the invocation.
332328
*/
333-
template<template<typename> class P = ::std::promise>
334-
auto flushBatchRequestsAsync(CompressBatch compress)
335-
-> decltype(::std::declval<P<void>>().get_future())
336-
{
337-
using Promise = P<void>;
338-
auto promise = ::std::make_shared<Promise>();
339-
flushBatchRequestsAsync(compress,
340-
[promise](::std::exception_ptr ex)
341-
{
342-
promise->set_exception(::std::move(ex));
343-
},
344-
[promise](bool)
345-
{
346-
promise->set_value();
347-
});
348-
return promise->get_future();
349-
}
329+
ICE_MEMBER(ICE_API) std::future<void> flushBatchRequestsAsync(CompressBatch compress);
350330

351331
/**
352332
* Set a close callback on the connection. The callback is called by the connection when it's closed. The callback
@@ -366,10 +346,7 @@ class ICE_CLASS(ICE_API) Connection
366346
/**
367347
* Send a heartbeat message.
368348
*/
369-
virtual void heartbeat()
370-
{
371-
heartbeatAsync().get();
372-
}
349+
ICE_MEMBER(ICE_API) void heartbeat();
373350

374351
/**
375352
* Send a heartbeat message.
@@ -385,28 +362,13 @@ class ICE_CLASS(ICE_API) Connection
385362
* Send a heartbeat message.
386363
* @return The future object for the invocation.
387364
*/
388-
template<template<typename> class P = ::std::promise>
389-
auto heartbeatAsync()
390-
-> decltype(::std::declval<P<void>>().get_future())
391-
{
392-
using Promise = P<void>;
393-
auto promise = ::std::make_shared<Promise>();
394-
heartbeatAsync([promise](::std::exception_ptr ex)
395-
{
396-
promise->set_exception(::std::move(ex));
397-
},
398-
[promise](bool)
399-
{
400-
promise->set_value();
401-
});
402-
return promise->get_future();
403-
}
365+
ICE_MEMBER(ICE_API) std::future<void> heartbeatAsync();
404366

405367
/**
406368
* Set the active connection management parameters.
407369
* @param timeout The timeout value in seconds, must be &gt;= 0.
408370
* @param close The close condition
409-
* @param heartbeat The hertbeat condition
371+
* @param heartbeat The heartbeat condition
410372
*/
411373
virtual void setACM(const std::optional<int>& timeout, const std::optional<ACMClose>& close, const std::optional<ACMHeartbeat>& heartbeat) = 0;
412374

@@ -481,7 +443,7 @@ class ICE_CLASS(ICE_API) IPConnectionInfo : public ::Ice::ConnectionInfo
481443

482444
/**
483445
* One-shot constructor to initialize all data members.
484-
* @param underlying The information of the underyling transport or null if there's no underlying transport.
446+
* @param underlying The information of the underlying transport or null if there's no underlying transport.
485447
* @param incoming Whether or not the connection is an incoming or outgoing connection.
486448
* @param adapterName The name of the adapter associated with the connection.
487449
* @param connectionId The connection id.
@@ -540,7 +502,7 @@ class ICE_CLASS(ICE_API) TCPConnectionInfo : public ::Ice::IPConnectionInfo
540502

541503
/**
542504
* One-shot constructor to initialize all data members.
543-
* @param underlying The information of the underyling transport or null if there's no underlying transport.
505+
* @param underlying The information of the underlying transport or null if there's no underlying transport.
544506
* @param incoming Whether or not the connection is an incoming or outgoing connection.
545507
* @param adapterName The name of the adapter associated with the connection.
546508
* @param connectionId The connection id.

cpp/include/Ice/Ice.h

-2
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,12 @@
99
#include <Ice/Config.h>
1010
#include <Ice/Comparable.h>
1111
#include <Ice/StreamHelpers.h>
12-
#include <Ice/OutgoingAsync.h>
1312
#include <Ice/Proxy.h>
1413
#include <Ice/Current.h>
1514
#include <Ice/LocalException.h>
1615
#include <optional>
1716
#include <Ice/Object.h>
1817
#include <Ice/SlicedData.h>
19-
#include <Ice/OutgoingAsync.h>
2018
#include <Ice/IncomingAsync.h>
2119
#include <Ice/FactoryTable.h>
2220
#include <Ice/FactoryTableInit.h>

cpp/include/Ice/OutgoingAsync.h

+48-35
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#define ICE_OUTGOING_ASYNC_H
77

88
#include <IceUtil/Timer.h>
9-
#include <Ice/OutgoingAsyncF.h>
109
#include <Ice/CommunicatorF.h>
1110
#include <Ice/ConnectionIF.h>
1211
#include <Ice/ObjectAdapterF.h>
@@ -23,6 +22,7 @@
2322
namespace IceInternal
2423
{
2524

25+
class OutgoingAsyncBase;
2626
class RetryException;
2727
class CollocatedRequestHandler;
2828

@@ -136,6 +136,8 @@ class ICE_API OutgoingAsyncBase : public virtual OutgoingAsyncCompletionCallback
136136
static const unsigned char Sent;
137137
};
138138

139+
using OutgoingAsyncBasePtr = ::std::shared_ptr<OutgoingAsyncBase>;
140+
139141
//
140142
// Base class for proxy based invocations. This class handles the
141143
// retry for proxy invocations. It also ensures the child observer is
@@ -164,7 +166,7 @@ class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase,
164166

165167
protected:
166168

167-
ProxyOutgoingAsyncBase(const Ice::ObjectPrx&);
169+
ProxyOutgoingAsyncBase(Ice::ObjectPrx);
168170
~ProxyOutgoingAsyncBase();
169171

170172
void invokeImpl(bool);
@@ -184,14 +186,16 @@ class ICE_API ProxyOutgoingAsyncBase : public OutgoingAsyncBase,
184186
bool _sent;
185187
};
186188

189+
using ProxyOutgoingAsyncBasePtr = ::std::shared_ptr<ProxyOutgoingAsyncBase>;
190+
187191
//
188192
// Class for handling Slice operation invocations
189193
//
190194
class ICE_API OutgoingAsync : public ProxyOutgoingAsyncBase
191195
{
192196
public:
193197

194-
OutgoingAsync(const Ice::ObjectPrx&, bool);
198+
OutgoingAsync(Ice::ObjectPrx, bool);
195199

196200
void prepare(const std::string&, Ice::OperationMode, const Ice::Context&);
197201

@@ -220,7 +224,7 @@ class ICE_API OutgoingAsync : public ProxyOutgoingAsyncBase
220224
{
221225
_os.writeEmptyEncapsulation(_encoding);
222226
}
223-
void writeParamEncaps(const ::Ice::Byte* encaps, ::std::int32_t size)
227+
void writeParamEncaps(const ::Ice::Byte* encaps, std::int32_t size)
224228
{
225229
if(size == 0)
226230
{
@@ -239,16 +243,13 @@ class ICE_API OutgoingAsync : public ProxyOutgoingAsyncBase
239243
bool _synchronous;
240244
};
241245

242-
}
243-
244-
namespace IceInternal
245-
{
246+
using OutgoingAsyncPtr = ::std::shared_ptr<OutgoingAsync>;
246247

247248
class ICE_API LambdaInvoke : public virtual OutgoingAsyncCompletionCallback
248249
{
249250
public:
250251

251-
LambdaInvoke(std::function<void(::std::exception_ptr)> exception, std::function<void(bool)> sent) :
252+
LambdaInvoke(std::function<void(std::exception_ptr)> exception, std::function<void(bool)> sent) :
252253
_exception(std::move(exception)), _sent(std::move(sent))
253254
{
254255
}
@@ -263,25 +264,21 @@ class ICE_API LambdaInvoke : public virtual OutgoingAsyncCompletionCallback
263264
virtual void handleInvokeException(std::exception_ptr, OutgoingAsyncBase*) const override;
264265
virtual void handleInvokeResponse(bool, OutgoingAsyncBase*) const override;
265266

266-
std::function<void(::std::exception_ptr)> _exception;
267+
std::function<void(std::exception_ptr)> _exception;
267268
std::function<void(bool)> _sent;
268269
std::function<void(bool)> _response;
269270
};
270271

271-
template<typename Promise>
272+
template<typename R>
272273
class PromiseInvoke : public virtual OutgoingAsyncCompletionCallback
273274
{
274275
public:
275276

276-
auto
277-
getFuture() -> decltype(std::declval<Promise>().get_future())
278-
{
279-
return _promise.get_future();
280-
}
277+
std::future<R> getFuture() { return _promise.get_future(); }
281278

282279
protected:
283280

284-
Promise _promise;
281+
std::promise<R> _promise;
285282
std::function<void(bool)> _response;
286283

287284
private:
@@ -388,11 +385,11 @@ class LambdaOutgoing : public OutgoingAsyncT<R>, public LambdaInvoke
388385
{
389386
public:
390387

391-
LambdaOutgoing(const Ice::ObjectPrx& proxy,
388+
LambdaOutgoing(Ice::ObjectPrx proxy,
392389
std::function<void(R)> response,
393-
std::function<void(::std::exception_ptr)> ex,
390+
std::function<void(std::exception_ptr)> ex,
394391
std::function<void(bool)> sent) :
395-
OutgoingAsyncT<R>(proxy, false), LambdaInvoke(std::move(ex), std::move(sent))
392+
OutgoingAsyncT<R>(std::move(proxy), false), LambdaInvoke(std::move(ex), std::move(sent))
396393
{
397394
_response = [this, response = std::move(response)](bool ok)
398395
{
@@ -424,11 +421,11 @@ class LambdaOutgoing<void> : public OutgoingAsyncT<void>, public LambdaInvoke
424421
{
425422
public:
426423

427-
LambdaOutgoing(const Ice::ObjectPrx& proxy,
424+
LambdaOutgoing(Ice::ObjectPrx proxy,
428425
std::function<void()> response,
429-
std::function<void(::std::exception_ptr)> ex,
426+
std::function<void(std::exception_ptr)> ex,
430427
std::function<void(bool)> sent) :
431-
OutgoingAsyncT<void>(proxy, false), LambdaInvoke(std::move(ex), std::move(sent))
428+
OutgoingAsyncT<void>(std::move(proxy), false), LambdaInvoke(std::move(ex), std::move(sent))
432429
{
433430
_response = [this, response = std::move(response)](bool ok)
434431
{
@@ -460,11 +457,11 @@ class CustomLambdaOutgoing : public OutgoingAsync, public LambdaInvoke
460457
{
461458
public:
462459

463-
CustomLambdaOutgoing(const Ice::ObjectPrx& proxy,
460+
CustomLambdaOutgoing(Ice::ObjectPrx proxy,
464461
std::function<void(Ice::InputStream*)> read,
465-
std::function<void(::std::exception_ptr)> ex,
462+
std::function<void(std::exception_ptr)> ex,
466463
std::function<void(bool)> sent) :
467-
OutgoingAsync(proxy, false), LambdaInvoke(std::move(ex), std::move(sent))
464+
OutgoingAsync(std::move(proxy), false), LambdaInvoke(std::move(ex), std::move(sent))
468465
{
469466
_response = [this, read = std::move(read)](bool ok)
470467
{
@@ -495,13 +492,13 @@ class CustomLambdaOutgoing : public OutgoingAsync, public LambdaInvoke
495492
}
496493
};
497494

498-
template<typename P, typename R>
499-
class PromiseOutgoing : public OutgoingAsyncT<R>, public PromiseInvoke<P>
495+
template<typename R>
496+
class PromiseOutgoing : public OutgoingAsyncT<R>, public PromiseInvoke<R>
500497
{
501498
public:
502499

503-
PromiseOutgoing(const Ice::ObjectPrx& proxy, bool sync) :
504-
OutgoingAsyncT<R>(proxy, sync)
500+
PromiseOutgoing(Ice::ObjectPrx proxy, bool sync) :
501+
OutgoingAsyncT<R>(std::move(proxy), sync)
505502
{
506503
this->_response = [this](bool ok)
507504
{
@@ -521,13 +518,13 @@ class PromiseOutgoing : public OutgoingAsyncT<R>, public PromiseInvoke<P>
521518
}
522519
};
523520

524-
template<typename P>
525-
class PromiseOutgoing<P, void> : public OutgoingAsyncT<void>, public PromiseInvoke<P>
521+
template<>
522+
class PromiseOutgoing<void> : public OutgoingAsyncT<void>, public PromiseInvoke<void>
526523
{
527524
public:
528525

529-
PromiseOutgoing(const Ice::ObjectPrx& proxy, bool sync) :
530-
OutgoingAsyncT<void>(proxy, sync)
526+
PromiseOutgoing(Ice::ObjectPrx proxy, bool sync) :
527+
OutgoingAsyncT<void>(std::move(proxy), sync)
531528
{
532529
this->_response = [&](bool ok)
533530
{
@@ -556,12 +553,28 @@ class PromiseOutgoing<P, void> : public OutgoingAsyncT<void>, public PromiseInvo
556553
{
557554
if(done)
558555
{
559-
PromiseInvoke<P>::_promise.set_value();
556+
PromiseInvoke<void>::_promise.set_value();
560557
}
561558
return false;
562559
}
563560
};
564561

562+
template<typename R, typename Obj, typename Fn, typename... Args>
563+
inline std::future<R> makePromiseOutgoing(bool sync, Obj obj, Fn fn, Args&&... args)
564+
{
565+
auto outAsync = std::make_shared<PromiseOutgoing<R>>(*obj, sync);
566+
(obj->*fn)(outAsync, std::forward<Args>(args)...);
567+
return outAsync->getFuture();
568+
}
569+
570+
template<typename R, typename Re, typename E, typename S, typename Obj, typename Fn, typename... Args>
571+
inline std::function<void()> makeLambdaOutgoing(Re r, E e, S s, Obj obj, Fn fn, Args&&... args)
572+
{
573+
auto outAsync = std::make_shared<LambdaOutgoing<R>>(*obj, std::move(r), std::move(e), std::move(s));
574+
(obj->*fn)(outAsync, std::forward<Args>(args)...);
575+
return [outAsync]() { outAsync->cancel(); };
576+
}
577+
565578
}
566579

567580
#endif

0 commit comments

Comments
 (0)