Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into acm5
Browse files Browse the repository at this point in the history
  • Loading branch information
bernardnormier committed Apr 23, 2024
2 parents 5f67d4d + 320c695 commit 8e5bb36
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 302 deletions.
207 changes: 97 additions & 110 deletions cpp/src/Ice/CollocatedRequestHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,38 @@ using namespace IceInternal;

namespace
{
class InvokeAllAsync final : public ExecutorWorkItem
class ExecuteDispatchAll final : public ExecutorWorkItem
{
public:
InvokeAllAsync(
ExecuteDispatchAll(
const OutgoingAsyncBasePtr& outAsync,
OutputStream* os,
InputStream& stream,
const CollocatedRequestHandlerPtr& handler,
int32_t requestId,
int32_t batchRequestNum)
int32_t dispatchCount)
: _outAsync(outAsync),
_os(os),
_stream(stream.instance(), currentProtocolEncoding),
_handler(handler),
_requestId(requestId),
_batchRequestNum(batchRequestNum)
_dispatchCount(dispatchCount)
{
_stream.swap(stream);
}

void run() final
{
if (_handler->sentAsync(_outAsync.get()))
{
_handler->invokeAll(_os, _requestId, _batchRequestNum);
_handler->dispatchAll(_stream, _requestId, _dispatchCount);
}
}

private:
OutgoingAsyncBasePtr _outAsync;
OutputStream* _os;
InputStream _stream;
CollocatedRequestHandlerPtr _handler;
int32_t _requestId;
int32_t _batchRequestNum;
int32_t _dispatchCount;
};

void fillInValue(OutputStream* os, int pos, int32_t value)
Expand Down Expand Up @@ -100,7 +101,7 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
{
outAsync->invokeExceptionAsync();
}
_adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count.
_adapter->decDirectCount(); // dispatchAll won't be called, decrease the direct count.
return;
}

Expand All @@ -123,12 +124,10 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
}

AsyncStatus
CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum, bool synchronous)
CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestCount, bool synchronous)
{
//
// Increase the direct count to prevent the thread pool from being destroyed before
// invokeAll is called. This will also throw if the object adapter has been deactivated.
//
// Increase the direct count to prevent the thread pool from being destroyed before dispatchAll is called. This will
// also throw if the object adapter has been deactivated.
_adapter->incDirectCount();

int requestId = 0;
Expand All @@ -155,28 +154,56 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba
throw;
}

OutputStream* os = outAsync->getOs();
if (_traceLevels->protocol >= 1)
{
fillInValue(os, 10, static_cast<int32_t>(os->b.size()));
if (requestId > 0)
{
fillInValue(os, headerSize, requestId);
}
else if (batchRequestCount > 0)
{
fillInValue(os, headerSize, batchRequestCount);
}
traceSend(*os, _logger, _traceLevels);
}

outAsync->attachCollocatedObserver(_adapter, requestId);

InputStream is(os->instance(), os->getEncoding(), *os);

if (batchRequestCount > 0)
{
is.pos(sizeof(requestBatchHdr));
}
else
{
is.pos(sizeof(requestHdr));
}

int dispatchCount = batchRequestCount == 0 ? 1 : batchRequestCount;

if (!synchronous || !_response || _reference->getInvocationTimeout() > 0)
{
// Don't invoke from the user thread if async or invocation timeout is set
_adapter->getThreadPool()->execute(make_shared<InvokeAllAsync>(
_adapter->getThreadPool()->execute(make_shared<ExecuteDispatchAll>(
outAsync->shared_from_this(),
outAsync->getOs(),
is,
shared_from_this(),
requestId,
batchRequestNum));
dispatchCount));
}
else if (_hasExecutor)
{
_adapter->getThreadPool()->executeFromThisThread(make_shared<InvokeAllAsync>(
_adapter->getThreadPool()->executeFromThisThread(make_shared<ExecuteDispatchAll>(
outAsync->shared_from_this(),
outAsync->getOs(),
is,
shared_from_this(),
requestId,
batchRequestNum));
dispatchCount));
}
else // Optimization: directly call invokeAll if there's no custom executor.
else // Optimization: directly call dispatchAll if there's no custom executor.
{
//
// Make sure to hold a reference on this handler while the call is being
Expand All @@ -187,64 +214,14 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba
CollocatedRequestHandlerPtr self(shared_from_this());
if (sentAsync(outAsync))
{
invokeAll(outAsync->getOs(), requestId, batchRequestNum);
dispatchAll(is, requestId, dispatchCount);
}
}
return AsyncStatusQueued;
}

void
CollocatedRequestHandler::sendResponse(int32_t requestId, OutputStream* os)
{
OutgoingAsyncBasePtr outAsync;
{
lock_guard<mutex> lock(_mutex);
assert(_response);

if (_traceLevels->protocol >= 1)
{
fillInValue(os, 10, static_cast<int32_t>(os->b.size()));
}

InputStream is(os->instance(), os->getEncoding(), *os, true); // Adopting the OutputStream's buffer.
is.pos(sizeof(replyHdr) + 4);

if (_traceLevels->protocol >= 1)
{
traceRecv(is, _logger, _traceLevels);
}

map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(requestId);
if (q != _asyncRequests.end())
{
is.swap(*q->second->getIs());
if (q->second->response())
{
outAsync = q->second;
}
_asyncRequests.erase(q);
}
}

if (outAsync)
{
// We invoke the response using a thread-pool thread. If the invocation is a lambda async invocation, we want
// the callbacks to execute in a thread-pool thread - never in the application thread that sent the response
// via AMD.
outAsync->invokeResponseAsync();
}

_adapter->decDirectCount();
}

void
CollocatedRequestHandler::sendNoResponse()
{
_adapter->decDirectCount();
}

void
CollocatedRequestHandler::invokeException(int32_t requestId, exception_ptr ex)
CollocatedRequestHandler::dispatchException(int32_t requestId, exception_ptr ex)
{
handleException(requestId, ex);
_adapter->decDirectCount();
Expand Down Expand Up @@ -282,37 +259,11 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync)
}

void
CollocatedRequestHandler::invokeAll(OutputStream* os, int32_t requestId, int32_t batchRequestNum)
CollocatedRequestHandler::dispatchAll(InputStream& is, int32_t requestId, int32_t dispatchCount)
{
if (_traceLevels->protocol >= 1)
{
fillInValue(os, 10, static_cast<int32_t>(os->b.size()));
if (requestId > 0)
{
fillInValue(os, headerSize, requestId);
}
else if (batchRequestNum > 0)
{
fillInValue(os, headerSize, batchRequestNum);
}
traceSend(*os, _logger, _traceLevels);
}

InputStream is(os->instance(), os->getEncoding(), *os);

if (batchRequestNum > 0)
{
is.pos(sizeof(requestBatchHdr));
}
else
{
is.pos(sizeof(requestHdr));
}

int invokeNum = batchRequestNum > 0 ? batchRequestNum : 1;
try
{
while (invokeNum > 0)
while (dispatchCount > 0)
{
//
// Increase the direct count for the dispatch. We increase it again here for
Expand Down Expand Up @@ -344,12 +295,12 @@ CollocatedRequestHandler::invokeAll(OutputStream* os, int32_t requestId, int32_t
sendResponse(makeOutgoingResponse(current_exception(), request.current()));
}

--invokeNum;
--dispatchCount;
}
}
catch (...)
{
invokeException(requestId, current_exception()); // Fatal invocation exception
dispatchException(requestId, current_exception()); // Fatal invocation exception
}

_adapter->decDirectCount();
Expand Down Expand Up @@ -394,16 +345,52 @@ CollocatedRequestHandler::sendResponse(OutgoingResponse response)
{
if (_response)
{
sendResponse(response.current().requestId, &response.outputStream());
}
else
{
sendNoResponse();
OutgoingAsyncBasePtr outAsync;
{
lock_guard<mutex> lock(_mutex);
assert(_response);

auto os = &response.outputStream();

if (_traceLevels->protocol >= 1)
{
fillInValue(os, 10, static_cast<int32_t>(os->b.size()));
}

InputStream is(os->instance(), os->getEncoding(), *os, true); // Adopting the OutputStream's buffer.
is.pos(sizeof(replyHdr) + 4);

if (_traceLevels->protocol >= 1)
{
traceRecv(is, _logger, _traceLevels);
}

map<int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.find(response.current().requestId);
if (q != _asyncRequests.end())
{
is.swap(*q->second->getIs());
if (q->second->response())
{
outAsync = q->second;
}
_asyncRequests.erase(q);
}
}

if (outAsync)
{
// We invoke the response using a thread-pool thread. If the invocation is a lambda async invocation, we
// want the callbacks to execute in a thread-pool thread - never in the application thread that sent the
// response via AMD.
outAsync->invokeResponseAsync();
}
}

_adapter->decDirectCount();
}
catch (...)
{
// Fatal invocation exception
invokeException(response.current().requestId, current_exception());
dispatchException(response.current().requestId, current_exception());
}
}
6 changes: 2 additions & 4 deletions cpp/src/Ice/CollocatedRequestHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,13 @@ namespace IceInternal

bool sentAsync(OutgoingAsyncBase*);

void invokeAll(Ice::OutputStream*, std::int32_t, std::int32_t);
void dispatchAll(Ice::InputStream&, std::int32_t, std::int32_t);

private:
void handleException(std::int32_t, std::exception_ptr);

void sendResponse(Ice::OutgoingResponse);
void sendResponse(std::int32_t, Ice::OutputStream*);
void sendNoResponse();
void invokeException(std::int32_t, std::exception_ptr);
void dispatchException(std::int32_t, std::exception_ptr);

const std::shared_ptr<Ice::ObjectAdapterI> _adapter;
const bool _hasExecutor;
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/Ice/ConnectionFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ namespace
};

#if TARGET_OS_IPHONE != 0
class FinishCall final : public ExecutorWorkItem
class ExecuteFinish final : public ExecutorWorkItem
{
public:
FinishCall(const IncomingConnectionFactoryPtr& factory) : _factory(factory) {}
ExecuteFinish(const IncomingConnectionFactoryPtr& factory) : _factory(factory) {}

void run() final { _factory->finish(); }

Expand Down Expand Up @@ -1843,7 +1843,7 @@ IceInternal::IncomingConnectionFactory::setState(State state)
else
{
#if TARGET_OS_IPHONE != 0
_adapter->getThreadPool()->execute(make_shared<FinishCall>(shared_from_this()));
_adapter->getThreadPool()->execute(make_shared<ExecuteFinish>(shared_from_this()));
#endif
state = StateFinished;
}
Expand Down
Loading

0 comments on commit 8e5bb36

Please sign in to comment.