From c41d198633b7879a39da0b6f9e1d3bee604d1c05 Mon Sep 17 00:00:00 2001 From: Benoit Foucher Date: Wed, 17 Apr 2024 21:05:26 +0200 Subject: [PATCH] Added comments to connection's implementation (#2047) --- cpp/src/Ice/ConnectionI.cpp | 258 +++++++++-------- cpp/src/Ice/ConnectionI.h | 2 +- csharp/src/Ice/ConnectionI.cs | 260 ++++++++++-------- .../main/java/com/zeroc/Ice/ConnectionI.java | 254 +++++++++-------- 4 files changed, 439 insertions(+), 335 deletions(-) diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 75a9d28089b..62f5b867979 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -537,7 +537,7 @@ Ice::ConnectionI::isFinished() const return false; } - if (_state != StateFinished || _dispatchCount != 0) + if (_state != StateFinished || _upcallCount != 0) { return false; } @@ -562,7 +562,7 @@ void Ice::ConnectionI::waitUntilHolding() const { std::unique_lock lock(_mutex); - _conditionVariable.wait(lock, [this] { return _state >= StateHolding && _dispatchCount == 0; }); + _conditionVariable.wait(lock, [this] { return _state >= StateHolding && _upcallCount == 0; }); } void @@ -576,7 +576,7 @@ Ice::ConnectionI::waitUntilFinished() // guarantee that there are no outstanding calls when deactivate() // is called on the servant locators. // - _conditionVariable.wait(lock, [this] { return _state >= StateFinished && _dispatchCount == 0; }); + _conditionVariable.wait(lock, [this] { return _state >= StateFinished && _upcallCount == 0; }); assert(_state == StateFinished); @@ -631,7 +631,7 @@ Ice::ConnectionI::monitor(const chrono::steady_clock::time_point& now, const ACM (acm.heartbeat != ACMHeartbeat::HeartbeatOff && _writeStream.b.empty() && now >= (_acmLastActivity + chrono::duration_cast(acm.timeout) / 4))) { - if (acm.heartbeat != ACMHeartbeat::HeartbeatOnDispatch || _dispatchCount > 0) + if (acm.heartbeat != ACMHeartbeat::HeartbeatOnDispatch || _upcallCount > 0) { sendHeartbeatNow(); } @@ -660,7 +660,7 @@ Ice::ConnectionI::monitor(const chrono::steady_clock::time_point& now, const ACM setState(StateClosed, make_exception_ptr(ConnectionTimeoutException(__FILE__, __LINE__))); } else if ( - acm.close != ACMClose::CloseOnInvocation && _dispatchCount == 0 && _batchRequestQueue->isEmpty() && + acm.close != ACMClose::CloseOnInvocation && _upcallCount == 0 && _batchRequestQueue->isEmpty() && _asyncRequests.empty()) { // @@ -1109,7 +1109,7 @@ Ice::ConnectionI::sendResponse(int32_t, OutputStream* os, uint8_t compressFlag) try { - if (--_dispatchCount == 0) + if (--_upcallCount == 0) { if (_state == StateFinished) { @@ -1127,7 +1127,7 @@ Ice::ConnectionI::sendResponse(int32_t, OutputStream* os, uint8_t compressFlag) OutgoingMessage message(os, compressFlag > 0); sendMessage(message); - if (_state == StateClosing && _dispatchCount == 0) + if (_state == StateClosing && _upcallCount == 0) { initiateShutdown(); } @@ -1148,7 +1148,7 @@ Ice::ConnectionI::sendNoResponse() try { - if (--_dispatchCount == 0) + if (--_upcallCount == 0) { if (_state == StateFinished) { @@ -1163,7 +1163,7 @@ Ice::ConnectionI::sendNoResponse() rethrow_exception(_exception); } - if (_state == StateClosing && _dispatchCount == 0) + if (_state == StateClosing && _upcallCount == 0) { initiateShutdown(); } @@ -1179,7 +1179,7 @@ Ice::ConnectionI::invokeException(int32_t, exception_ptr ex, int invokeNum) { // // Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't - // called in case of a fatal exception we decrement _dispatchCount here. + // called in case of a fatal exception we decrement _upcallCount here. // std::lock_guard lock(_mutex); @@ -1187,9 +1187,9 @@ Ice::ConnectionI::invokeException(int32_t, exception_ptr ex, int invokeNum) if (invokeNum > 0) { - assert(_dispatchCount >= invokeNum); - _dispatchCount -= invokeNum; - if (_dispatchCount == 0) + assert(_upcallCount >= invokeNum); + _upcallCount -= invokeNum; + if (_upcallCount == 0) { if (_state == StateFinished) { @@ -1382,7 +1382,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) ObjectAdapterIPtr adapter; OutgoingAsyncBasePtr outAsync; HeartbeatCallback heartbeatCallback; - int dispatchCount = 0; + int upcallCount = 0; ThreadPoolMessage msg(current, *this); { @@ -1399,14 +1399,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) return; } - SocketOperation readyOp = current.operation; try { unscheduleTimeout(current.operation); SocketOperation writeOp = SocketOperationNone; SocketOperation readOp = SocketOperationNone; - if (readyOp & SocketOperationWrite) + + // If writes are ready, write the data from the connection's write buffer (_writeStream) + if (current.operation & SocketOperationWrite) { if (_observer) { @@ -1419,140 +1420,166 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } } - while (readyOp & SocketOperationRead) + // If reads are ready, read the data into the connection's read buffer (_readStream). The data is read + // until: + // - the full message is read (the transport read returns SocketOperationNone) and + // the read buffer is fully filled + // - the read operation on the transport can't continue without blocking + if (current.operation & SocketOperationRead) { - if (_observer && !_readHeader) - { - _observer.startRead(_readStream); - } - - readOp = read(_readStream); - if (readOp & SocketOperationRead) - { - break; - } - if (_observer && !_readHeader) - { - assert(_readStream.i == _readStream.b.end()); - _observer.finishRead(_readStream); - } - - if (_readHeader) // Read header if necessary. + while (true) { - _readHeader = false; - - if (_observer) + if (_observer && !_readHeader) { - _observer->receivedBytes(static_cast(headerSize)); + _observer.startRead(_readStream); } - // - // Connection is validated on first message. This is only used by - // setState() to check wether or not we can print a connection - // warning (a client might close the connection forcefully if the - // connection isn't validated, we don't want to print a warning - // in this case). - // - _validated = true; - - ptrdiff_t pos = _readStream.i - _readStream.b.begin(); - if (pos < headerSize) + readOp = read(_readStream); + if (readOp & SocketOperationRead) { - // - // This situation is possible for small UDP packets. - // - throw IllegalMessageSizeException(__FILE__, __LINE__); + // Can't continue without blocking, exit out of the loop. + break; } - _readStream.i = _readStream.b.begin(); - const byte* m; - _readStream.readBlob(m, static_cast(sizeof(magic))); - if (m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) - { - throw BadMagicException(__FILE__, __LINE__, "", Ice::ByteSeq(&m[0], &m[0] + sizeof(magic))); - } - ProtocolVersion pv; - _readStream.read(pv); - checkSupportedProtocol(pv); - EncodingVersion ev; - _readStream.read(ev); - checkSupportedProtocolEncoding(ev); - - uint8_t messageType; - _readStream.read(messageType); - uint8_t compressByte; - _readStream.read(compressByte); - int32_t size; - _readStream.read(size); - if (size < headerSize) + if (_observer && !_readHeader) { - throw IllegalMessageSizeException(__FILE__, __LINE__); + assert(_readStream.i == _readStream.b.end()); + _observer.finishRead(_readStream); } - if (size > static_cast(_messageSizeMax)) - { - Ex::throwMemoryLimitException(__FILE__, __LINE__, static_cast(size), _messageSizeMax); - } - if (static_cast(size) > _readStream.b.size()) + // If read header is true, we're reading a new Ice protocol message and we need to read + // the message header. + if (_readHeader) { - _readStream.b.resize(static_cast(size)); + // The next read will read the remainder of the message. + _readHeader = false; + + if (_observer) + { + _observer->receivedBytes(static_cast(headerSize)); + } + + // + // Connection is validated on first message. This is only used by + // setState() to check wether or not we can print a connection + // warning (a client might close the connection forcefully if the + // connection isn't validated, we don't want to print a warning + // in this case). + // + _validated = true; + + // Full header should be read because the size of _readStream is always headerSize (14) when + // reading a new message (see the code that sets _readHeader = true). + ptrdiff_t pos = _readStream.i - _readStream.b.begin(); + if (pos < headerSize) + { + // + // This situation is possible for small UDP packets. + // + throw IllegalMessageSizeException(__FILE__, __LINE__); + } + + // Decode the header. + _readStream.i = _readStream.b.begin(); + const byte* m; + _readStream.readBlob(m, static_cast(sizeof(magic))); + if (m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) + { + throw BadMagicException(__FILE__, __LINE__, "", Ice::ByteSeq(&m[0], &m[0] + sizeof(magic))); + } + ProtocolVersion pv; + _readStream.read(pv); + checkSupportedProtocol(pv); + EncodingVersion ev; + _readStream.read(ev); + checkSupportedProtocolEncoding(ev); + + uint8_t messageType; + _readStream.read(messageType); + uint8_t compressByte; + _readStream.read(compressByte); + int32_t size; + _readStream.read(size); + if (size < headerSize) + { + throw IllegalMessageSizeException(__FILE__, __LINE__); + } + + // Resize the read buffer to the message size. + if (size > static_cast(_messageSizeMax)) + { + Ex::throwMemoryLimitException( + __FILE__, + __LINE__, + static_cast(size), + _messageSizeMax); + } + if (static_cast(size) > _readStream.b.size()) + { + _readStream.b.resize(static_cast(size)); + } + _readStream.i = _readStream.b.begin() + pos; } - _readStream.i = _readStream.b.begin() + pos; - } - if (_readStream.i != _readStream.b.end()) - { - if (_endpoint->datagram()) + if (_readStream.i != _readStream.b.end()) { - throw DatagramLimitException(__FILE__, __LINE__); // The message was truncated. + if (_endpoint->datagram()) + { + throw DatagramLimitException(__FILE__, __LINE__); // The message was truncated. + } + continue; } - continue; + break; } - break; } + // readOp and writeOp are set to the operations that the transport read or write calls from above returned. + // They indicate which operations will need to be monitored by the thread pool's selector when this method + // returns. SocketOperation newOp = static_cast(readOp | writeOp); - readyOp = static_cast(readyOp & ~newOp); - assert(readyOp || newOp); + + // Operations that are ready. For example, if message was called with SocketOperationRead and the transport + // read returned SocketOperationNone, reads are considered done: there's no additional data to read. + SocketOperation readyOp = static_cast(current.operation & ~newOp); if (_state <= StateNotValidated) { + // If the connection is still not validated and there's still data to read or write, continue waiting + // for data to read or write. if (newOp) { - // - // Wait for all the transceiver conditions to be - // satisfied before continuing. - // scheduleTimeout(newOp); _threadPool->update(shared_from_this(), current.operation, newOp); return; } + // Initialize the connection if it's not initialized yet. if (_state == StateNotInitialized && !initialize(current.operation)) { return; } + // Validate the connection if it's not validated yet. if (_state <= StateNotValidated && !validate(current.operation)) { return; } + // The connection is validated and doesn't need additional data to be read or written. So unregister + // it from the thread pool's selector. _threadPool->unregister(shared_from_this(), current.operation); - // - // We start out in holding state. - // + // The connection starts in the holding state. It will be activated by the connection factory. setState(StateHolding); if (_connectionStartCompleted) { connectionStartCompleted = std::move(_connectionStartCompleted); - ++dispatchCount; + ++upcallCount; _connectionStartCompleted = nullptr; _connectionStartFailed = nullptr; } } - else + else // The connection is active or waits for the CloseConnection message. { assert(_state <= StateClosingPending); @@ -1562,6 +1589,8 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) // if (readyOp & SocketOperationRead) { + // At this point, the protocol message is fully read and can therefore be decoded by parseMessage. + // parseMessage returns the operation to wait for readiness next. newOp = static_cast( newOp | parseMessage( current.stream, @@ -1571,18 +1600,22 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) adapter, outAsync, heartbeatCallback, - dispatchCount)); + upcallCount)); } if (readyOp & SocketOperationWrite) { + // At this point the message from _writeStream is fully written and the next message can be written. + newOp = static_cast(newOp | sendNextMessage(sentCBs)); if (!sentCBs.empty()) { - ++dispatchCount; + ++upcallCount; } } + // If the connection is not closed yet, we can schedule the read or write timeout and update the thread + // pool selector to wait for readiness of read, write or both operations. if (_state < StateClosed) { scheduleTimeout(newOp); @@ -1595,12 +1628,15 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) _acmLastActivity = chrono::steady_clock::now(); } - if (dispatchCount == 0) + if (upcallCount == 0) { return; // Nothing to dispatch we're done! } - _dispatchCount += dispatchCount; + _upcallCount += upcallCount; + + // There's something to dispatch so we mark IO as completed to elect a new leader thread and let IO be + // performed on this new leader thread while this thread continues with dispatching the up-calls. io.completed(); } catch (const DatagramLimitException&) // Expected. @@ -1786,8 +1822,8 @@ ConnectionI::dispatch( if (dispatchedCount > 0) { std::lock_guard lock(_mutex); - _dispatchCount -= dispatchedCount; - if (_dispatchCount == 0) + _upcallCount -= dispatchedCount; + if (_upcallCount == 0) { // // Only initiate shutdown if not already done. It might @@ -2011,7 +2047,7 @@ Ice::ConnectionI::finish(bool close) std::lock_guard lock(_mutex); setState(StateFinished); - if (_dispatchCount == 0) + if (_upcallCount == 0) { reap(); } @@ -2129,7 +2165,7 @@ Ice::ConnectionI::ConnectionI( _readStream(_instance.get(), Ice::currentProtocolEncoding), _readHeader(false), _writeStream(_instance.get(), Ice::currentProtocolEncoding), - _dispatchCount(0), + _upcallCount(0), _state(StateNotInitialized), _shutdownInitiated(false), _initialized(false), @@ -2209,7 +2245,7 @@ Ice::ConnectionI::~ConnectionI() assert(!_closeCallback); assert(!_heartbeatCallback); assert(_state == StateFinished); - assert(_dispatchCount == 0); + assert(_upcallCount == 0); assert(_sendStreams.empty()); assert(_asyncRequests.empty()); } @@ -2483,7 +2519,7 @@ Ice::ConnectionI::setState(State state) _conditionVariable.notify_all(); - if (_state == StateClosing && _dispatchCount == 0) + if (_state == StateClosing && _upcallCount == 0) { try { @@ -2499,7 +2535,7 @@ Ice::ConnectionI::setState(State state) void Ice::ConnectionI::initiateShutdown() { - assert(_state == StateClosing && _dispatchCount == 0); + assert(_state == StateClosing && _upcallCount == 0); if (_shutdownInitiated) { diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index a1c573a5b2c..8349d07d543 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -367,7 +367,7 @@ namespace Ice Observer _observer; - int _dispatchCount; + int _upcallCount; State _state; // The current state. bool _shutdownInitiated; diff --git a/csharp/src/Ice/ConnectionI.cs b/csharp/src/Ice/ConnectionI.cs index 26bd342d028..0c35682db2e 100644 --- a/csharp/src/Ice/ConnectionI.cs +++ b/csharp/src/Ice/ConnectionI.cs @@ -51,9 +51,7 @@ public void start(StartCallback callback) return; } - // - // We start out in holding state. - // + // The connection starts in the holding state. It will be activated by the connection factory. setState(StateHolding); } } @@ -219,7 +217,7 @@ public bool isFinished() try { - if (_state != StateFinished || _dispatchCount != 0) + if (_state != StateFinished || _upcallCount != 0) { return false; } @@ -249,7 +247,7 @@ public void waitUntilHolding() { lock (this) { - while (_state < StateHolding || _dispatchCount > 0) + while (_state < StateHolding || _upcallCount > 0) { Monitor.Wait(this); } @@ -266,7 +264,7 @@ public void waitUntilFinished() // guarantee that there are no outstanding calls when deactivate() // is called on the servant locators. // - while (_state < StateFinished || _dispatchCount > 0) + while (_state < StateFinished || _upcallCount > 0) { Monitor.Wait(this); } @@ -332,7 +330,7 @@ public void monitor(long now, ACMConfig acm) (acm.heartbeat != ACMHeartbeat.HeartbeatOff && _writeStream.isEmpty() && now >= (_acmLastActivity + acm.timeout / 4))) { - if (acm.heartbeat != ACMHeartbeat.HeartbeatOnDispatch || _dispatchCount > 0) + if (acm.heartbeat != ACMHeartbeat.HeartbeatOnDispatch || _upcallCount > 0) { sendHeartbeatNow(); } @@ -361,7 +359,7 @@ public void monitor(long now, ACMConfig acm) setState(StateClosed, new ConnectionTimeoutException()); } else if (acm.close != ACMClose.CloseOnInvocation && - _dispatchCount == 0 && _batchRequestQueue.isEmpty() && + _upcallCount == 0 && _batchRequestQueue.isEmpty() && _asyncRequests.Count == 0) { // @@ -736,7 +734,7 @@ public void sendResponse(int requestId, OutputStream os, byte compressFlag, bool try { - if (--_dispatchCount == 0) + if (--_upcallCount == 0) { if (_state == StateFinished) { @@ -753,7 +751,7 @@ public void sendResponse(int requestId, OutputStream os, byte compressFlag, bool sendMessage(new OutgoingMessage(os, compressFlag > 0, true)); - if (_state == StateClosing && _dispatchCount == 0) + if (_state == StateClosing && _upcallCount == 0) { initiateShutdown(); } @@ -773,7 +771,7 @@ public void sendNoResponse() try { - if (--_dispatchCount == 0) + if (--_upcallCount == 0) { if (_state == StateFinished) { @@ -788,7 +786,7 @@ public void sendNoResponse() throw _exception; } - if (_state == StateClosing && _dispatchCount == 0) + if (_state == StateClosing && _upcallCount == 0) { initiateShutdown(); } @@ -818,9 +816,9 @@ public void invokeException(int requestId, LocalException ex, int invokeNum, boo if (invokeNum > 0) { - Debug.Assert(_dispatchCount >= invokeNum); - _dispatchCount -= invokeNum; - if (_dispatchCount == 0) + Debug.Assert(_upcallCount >= invokeNum); + _upcallCount -= invokeNum; + if (_upcallCount == 0) { if (_state == StateFinished) { @@ -1023,7 +1021,7 @@ public override void message(ref ThreadPoolCurrent current) StartCallback startCB = null; Queue sentCBs = null; MessageInfo info = new MessageInfo(); - int dispatchCount = 0; + int upcallCount = 0; ThreadPoolMessage msg = new ThreadPoolMessage(this); try @@ -1040,14 +1038,15 @@ public override void message(ref ThreadPoolCurrent current) return; } - int readyOp = current.operation; try { unscheduleTimeout(current.operation); int writeOp = SocketOperation.None; int readOp = SocketOperation.None; - if ((readyOp & SocketOperation.Write) != 0) + + // If writes are ready, write the data from the connection's write buffer (_writeStream) + if ((current.operation & SocketOperation.Write) != 0) { if (_observer != null) { @@ -1060,131 +1059,155 @@ public override void message(ref ThreadPoolCurrent current) } } - while ((readyOp & SocketOperation.Read) != 0) + // If reads are ready, read the data into the connection's read buffer (_readStream). The data is + // read until: + // - the full message is read (the transport read returns SocketOperationNone) and + // the read buffer is fully filled + // - the read operation on the transport can't continue without blocking + if ((current.operation & SocketOperation.Read) != 0) { - IceInternal.Buffer buf = _readStream.getBuffer(); - - if (_observer != null && !_readHeader) + while (true) { - observerStartRead(buf); - } + IceInternal.Buffer buf = _readStream.getBuffer(); - readOp = read(buf); - if ((readOp & SocketOperation.Read) != 0) - { - break; - } - if (_observer != null && !_readHeader) - { - Debug.Assert(!buf.b.hasRemaining()); - observerFinishRead(buf); - } - - if (_readHeader) // Read header if necessary. - { - _readHeader = false; - - if (_observer != null) + if (_observer != null && !_readHeader) { - _observer.receivedBytes(Protocol.headerSize); + observerStartRead(buf); } - // - // Connection is validated on first message. This is only used by - // setState() to check wether or not we can print a connection - // warning (a client might close the connection forcefully if the - // connection isn't validated, we don't want to print a warning - // in this case). - // - _validated = true; - - int pos = _readStream.pos(); - if (pos < Protocol.headerSize) + readOp = read(buf); + if ((readOp & SocketOperation.Read) != 0) { - // - // This situation is possible for small UDP packets. - // - throw new IllegalMessageSizeException(); + // Can't continue without blocking, exit out of the loop. + break; } - - _readStream.pos(0); - byte[] m = new byte[4]; - m[0] = _readStream.readByte(); - m[1] = _readStream.readByte(); - m[2] = _readStream.readByte(); - m[3] = _readStream.readByte(); - if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] || - m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) + if (_observer != null && !_readHeader) { - BadMagicException ex = new BadMagicException(); - ex.badMagic = m; - throw ex; + Debug.Assert(!buf.b.hasRemaining()); + observerFinishRead(buf); } - ProtocolVersion pv = new ProtocolVersion(); - pv.ice_readMembers(_readStream); - Protocol.checkSupportedProtocol(pv); - EncodingVersion ev = new EncodingVersion(); - ev.ice_readMembers(_readStream); - Protocol.checkSupportedProtocolEncoding(ev); - - _readStream.readByte(); // messageType - _readStream.readByte(); // compress - int size = _readStream.readInt(); - if (size < Protocol.headerSize) + // If read header is true, we're reading a new Ice protocol message and we need to read + // the message header. + if (_readHeader) { - throw new IllegalMessageSizeException(); - } + // The next read will read the remainder of the message. + _readHeader = false; - if (size > _messageSizeMax) - { - Ex.throwMemoryLimitException(size, _messageSizeMax); - } - if (size > _readStream.size()) - { - _readStream.resize(size); + if (_observer != null) + { + _observer.receivedBytes(Protocol.headerSize); + } + + // + // Connection is validated on first message. This is only used by + // setState() to check wether or not we can print a connection + // warning (a client might close the connection forcefully if the + // connection isn't validated, we don't want to print a warning + // in this case). + // + _validated = true; + + // Full header should be read because the size of _readStream is always headerSize (14) + // when reading a new message (see the code that sets _readHeader = true). + int pos = _readStream.pos(); + if (pos < Protocol.headerSize) + { + // + // This situation is possible for small UDP packets. + // + throw new IllegalMessageSizeException(); + } + + // Decode the header. + _readStream.pos(0); + byte[] m = new byte[4]; + m[0] = _readStream.readByte(); + m[1] = _readStream.readByte(); + m[2] = _readStream.readByte(); + m[3] = _readStream.readByte(); + if (m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] || + m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) + { + BadMagicException ex = new BadMagicException(); + ex.badMagic = m; + throw ex; + } + + ProtocolVersion pv = new ProtocolVersion(); + pv.ice_readMembers(_readStream); + Protocol.checkSupportedProtocol(pv); + EncodingVersion ev = new EncodingVersion(); + ev.ice_readMembers(_readStream); + Protocol.checkSupportedProtocolEncoding(ev); + + _readStream.readByte(); // messageType + _readStream.readByte(); // compress + int size = _readStream.readInt(); + if (size < Protocol.headerSize) + { + throw new IllegalMessageSizeException(); + } + + // Resize the read buffer to the message size. + if (size > _messageSizeMax) + { + Ex.throwMemoryLimitException(size, _messageSizeMax); + } + if (size > _readStream.size()) + { + _readStream.resize(size); + } + _readStream.pos(pos); } - _readStream.pos(pos); - } - if (buf.b.hasRemaining()) - { - if (_endpoint.datagram()) + if (buf.b.hasRemaining()) { - throw new DatagramLimitException(); // The message was truncated. + if (_endpoint.datagram()) + { + throw new DatagramLimitException(); // The message was truncated. + } + continue; } - continue; + break; } - break; } + // readOp and writeOp are set to the operations that the transport read or write calls from above + // returned. They indicate which operations will need to be monitored by the thread pool's selector + // when this method returns. int newOp = readOp | writeOp; - readyOp &= ~newOp; - Debug.Assert(readyOp != 0 || newOp != 0); + + // Operations that are ready. For example, if message was called with SocketOperationRead and the + // transport read returned SocketOperationNone, reads are considered done: there's no additional + // data to read. + int readyOp = current.operation & ~newOp; if (_state <= StateNotValidated) { + // If the connection is still not validated and there's still data to read or write, continue + // waiting for data to read or write. if (newOp != 0) { - // - // Wait for all the transceiver conditions to be - // satisfied before continuing. - // scheduleTimeout(newOp); _threadPool.update(this, current.operation, newOp); return; } + // Initialize the connection if it's not initialized yet. if (_state == StateNotInitialized && !initialize(current.operation)) { return; } + // Validate the connection if it's not validated yet. if (_state <= StateNotValidated && !validate(current.operation)) { return; } + // The connection is validated and doesn't need additional data to be read or written. So + // unregister it from the thread pool's selector. _threadPool.unregister(this, current.operation); // @@ -1197,7 +1220,7 @@ public override void message(ref ThreadPoolCurrent current) _startCallback = null; if (startCB != null) { - ++dispatchCount; + ++upcallCount; } } } @@ -1211,19 +1234,26 @@ public override void message(ref ThreadPoolCurrent current) // if ((readyOp & SocketOperation.Read) != 0) { + // At this point, the protocol message is fully read and can therefore be decoded by + // parseMessage. parseMessage returns the operation to wait for readiness next. newOp |= parseMessage(ref info); - dispatchCount += info.messageDispatchCount; + upcallCount += info.messageDispatchCount; } if ((readyOp & SocketOperation.Write) != 0) { + // At this point the message from _writeStream is fully written and the next message can be + // written. + newOp |= sendNextMessage(out sentCBs); if (sentCBs != null) { - ++dispatchCount; + ++upcallCount; } } + // If the connection is not closed yet, we can schedule the read or write timeout and update the + // thread pool selector to wait for readiness of read, write or both operations. if (_state < StateClosed) { scheduleTimeout(newOp); @@ -1236,13 +1266,15 @@ public override void message(ref ThreadPoolCurrent current) _acmLastActivity = Time.currentMonotonicTimeMillis(); } - if (dispatchCount == 0) + if (upcallCount == 0) { return; // Nothing to dispatch we're done! } - _dispatchCount += dispatchCount; + _upcallCount += upcallCount; + // There's something to dispatch so we mark IO as completed to elect a new leader thread and let IO + // be performed on this new leader thread while this thread continues with dispatching the up-calls. msg.completed(ref current); } catch (DatagramLimitException) // Expected. @@ -1378,8 +1410,8 @@ private void dispatch(StartCallback startCB, Queue sentCBs, Mes { lock (this) { - _dispatchCount -= dispatchedCount; - if (_dispatchCount == 0) + _upcallCount -= dispatchedCount; + if (_upcallCount == 0) { // // Only initiate shutdown if not already done. It @@ -1572,7 +1604,7 @@ _exception is CommunicatorDestroyedException || { setState(StateFinished); - if (_dispatchCount == 0) + if (_upcallCount == 0) { reap(); } @@ -1699,7 +1731,7 @@ internal ConnectionI(Instance instance, ACMMonitor monitor, Transceiver transcei _readStreamPos = -1; _writeStream = new OutputStream(instance, Util.currentProtocolEncoding); _writeStreamPos = -1; - _dispatchCount = 0; + _upcallCount = 0; _state = StateNotInitialized; _compressionLevel = initData.properties.getPropertyAsIntWithDefault("Ice.Compression.Level", 1); @@ -1971,7 +2003,7 @@ _exception is ObjectAdapterDeactivatedException || Monitor.PulseAll(this); - if (_state == StateClosing && _dispatchCount == 0) + if (_state == StateClosing && _upcallCount == 0) { try { @@ -1986,7 +2018,7 @@ _exception is ObjectAdapterDeactivatedException || private void initiateShutdown() { - Debug.Assert(_state == StateClosing && _dispatchCount == 0); + Debug.Assert(_state == StateClosing && _upcallCount == 0); if (_shutdownInitiated) { @@ -3048,7 +3080,7 @@ internal void completed(LocalException ex) private int _readStreamPos; private int _writeStreamPos; - private int _dispatchCount; + private int _upcallCount; private int _state; // The current state. private bool _shutdownInitiated; diff --git a/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java b/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java index 57e7d0987c7..ad440c14afa 100644 --- a/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java +++ b/java/src/Ice/src/main/java/com/zeroc/Ice/ConnectionI.java @@ -183,7 +183,7 @@ public synchronized boolean isActiveOrHolding() { } public synchronized boolean isFinished() { - if (_state != StateFinished || _dispatchCount != 0) { + if (_state != StateFinished || _upcallCount != 0) { return false; } @@ -199,7 +199,7 @@ public synchronized void throwException() { } public synchronized void waitUntilHolding() throws InterruptedException { - while (_state < StateHolding || _dispatchCount > 0) { + while (_state < StateHolding || _upcallCount > 0) { wait(); } } @@ -211,7 +211,7 @@ public synchronized void waitUntilFinished() throws InterruptedException { // guarantee that there are no outstanding calls when deactivate() // is called on the servant locators. // - while (_state < StateFinished || _dispatchCount > 0) { + while (_state < StateFinished || _upcallCount > 0) { wait(); } @@ -265,7 +265,7 @@ public synchronized void monitor(long now, com.zeroc.IceInternal.ACMConfig acm) || (acm.heartbeat != ACMHeartbeat.HeartbeatOff && _writeStream.isEmpty() && now >= (_acmLastActivity + acm.timeout / 4))) { - if (acm.heartbeat != ACMHeartbeat.HeartbeatOnDispatch || _dispatchCount > 0) { + if (acm.heartbeat != ACMHeartbeat.HeartbeatOnDispatch || _upcallCount > 0) { sendHeartbeatNow(); } } @@ -289,7 +289,7 @@ public synchronized void monitor(long now, com.zeroc.IceInternal.ACMConfig acm) // setState(StateClosed, new ConnectionTimeoutException()); } else if (acm.close != ACMClose.CloseOnInvocation - && _dispatchCount == 0 + && _upcallCount == 0 && _batchRequestQueue.isEmpty() && _asyncRequests.isEmpty()) { // @@ -620,7 +620,7 @@ public Void call() throws Exception { private synchronized void sendResponseImpl(OutputStream os, byte compressFlag) { try { - if (--_dispatchCount == 0) { + if (--_upcallCount == 0) { if (_state == StateFinished) { reap(); } @@ -630,7 +630,7 @@ private synchronized void sendResponseImpl(OutputStream os, byte compressFlag) { if (_state < StateClosed) { sendMessage(new OutgoingMessage(os, compressFlag != 0, true)); - if (_state == StateClosing && _dispatchCount == 0) { + if (_state == StateClosing && _upcallCount == 0) { initiateShutdown(); } } @@ -646,7 +646,7 @@ public void sendNoResponse() { synchronized (this) { assert (_state > StateNotValidated); try { - if (--_dispatchCount == 0) { + if (--_upcallCount == 0) { if (_state == StateFinished) { reap(); } @@ -658,7 +658,7 @@ public void sendNoResponse() { throw (LocalException) _exception.fillInStackTrace(); } - if (_state == StateClosing && _dispatchCount == 0) { + if (_state == StateClosing && _upcallCount == 0) { // // We may be executing on the "main thread" (e.g., in Android together with a custom // dispatcher) @@ -696,10 +696,10 @@ public synchronized void invokeException( setState(StateClosed, ex); if (invokeNum > 0) { - assert (_dispatchCount > 0); - _dispatchCount -= invokeNum; - assert (_dispatchCount >= 0); - if (_dispatchCount == 0) { + assert (_upcallCount > 0); + _upcallCount -= invokeNum; + assert (_upcallCount >= 0); + if (_upcallCount == 0) { if (_state == StateFinished) { reap(); } @@ -781,7 +781,7 @@ public void message(com.zeroc.IceInternal.ThreadPoolCurrent current) { StartCallback startCB = null; java.util.List sentCBs = null; MessageInfo info = null; - int dispatchCount = 0; + int upcallCount = 0; synchronized (this) { if (_state >= StateClosed) { @@ -792,14 +792,15 @@ public void message(com.zeroc.IceInternal.ThreadPoolCurrent current) { return; } - int readyOp = current.operation; try { unscheduleTimeout(current.operation); int writeOp = SocketOperation.None; int readOp = SocketOperation.None; - if ((readyOp & SocketOperation.Write) != 0) { + // If writes are ready, write the data from the connection's write buffer + // (_writeStream) + if ((current.operation & SocketOperation.Write) != 0) { final Buffer buf = _writeStream.getBuffer(); if (_observer != null) { observerStartWrite(buf); @@ -810,130 +811,152 @@ public void message(com.zeroc.IceInternal.ThreadPoolCurrent current) { } } - while ((readyOp & SocketOperation.Read) != 0) { - final Buffer buf = _readStream.getBuffer(); - if (_observer != null && !_readHeader) { - observerStartRead(buf); - } - - readOp = read(buf); - if ((readOp & SocketOperation.Read) != 0) { - break; - } - if (_observer != null && !_readHeader) { - assert (!buf.b.hasRemaining()); - observerFinishRead(buf); - } - - if (_readHeader) // Read header if necessary. - { - _readHeader = false; + // If reads are ready, read the data into the connection's read buffer + // (_readStream). The data is read until: + // - the full message is read (the transport read returns SocketOperationNone) + // and the read buffer is fully filled + // - the read operation on the transport can't continue without blocking + if ((current.operation & SocketOperation.Read) != 0) { + while (true) { + final Buffer buf = _readStream.getBuffer(); + if (_observer != null && !_readHeader) { + observerStartRead(buf); + } - if (_observer != null) { - _observer.receivedBytes(Protocol.headerSize); + readOp = read(buf); + if ((readOp & SocketOperation.Read) != 0) { + // Can't continue without blocking, exit out of the loop. + break; + } + if (_observer != null && !_readHeader) { + assert (!buf.b.hasRemaining()); + observerFinishRead(buf); } - // - // Connection is validated on first message. This is only used by - // setState() to check wether or not we can print a connection - // warning (a client might close the connection forcefully if the - // connection isn't validated, we don't want to print a warning - // in this case). - // - _validated = true; + // If read header is true, we're reading a new Ice protocol message and we need + // to read the message header. + if (_readHeader) { + // The next read will read the remainder of the message. + _readHeader = false; + + if (_observer != null) { + _observer.receivedBytes(Protocol.headerSize); + } - int pos = _readStream.pos(); - if (pos < Protocol.headerSize) { // - // This situation is possible for small UDP packets. + // Connection is validated on first message. This is only used by + // setState() to check wether or not we can print a connection + // warning (a client might close the connection forcefully if the + // connection isn't validated, we don't want to print a warning + // in this case). // - throw new IllegalMessageSizeException(); - } + _validated = true; + + // Full header should be read because the size of _readStream is always + // headerSize (14) when reading a new message (see the code that sets + // _readHeader = true). + int pos = _readStream.pos(); + if (pos < Protocol.headerSize) { + // + // This situation is possible for small UDP packets. + // + throw new IllegalMessageSizeException(); + } - _readStream.pos(0); - byte[] m = new byte[4]; - m[0] = _readStream.readByte(); - m[1] = _readStream.readByte(); - m[2] = _readStream.readByte(); - m[3] = _readStream.readByte(); - if (m[0] != Protocol.magic[0] - || m[1] != Protocol.magic[1] - || m[2] != Protocol.magic[2] - || m[3] != Protocol.magic[3]) { - BadMagicException ex = new BadMagicException(); - ex.badMagic = m; - throw ex; - } + // Decode the header. + _readStream.pos(0); + byte[] m = new byte[4]; + m[0] = _readStream.readByte(); + m[1] = _readStream.readByte(); + m[2] = _readStream.readByte(); + m[3] = _readStream.readByte(); + if (m[0] != Protocol.magic[0] + || m[1] != Protocol.magic[1] + || m[2] != Protocol.magic[2] + || m[3] != Protocol.magic[3]) { + BadMagicException ex = new BadMagicException(); + ex.badMagic = m; + throw ex; + } - _readProtocol.ice_readMembers(_readStream); - Protocol.checkSupportedProtocol(_readProtocol); + _readProtocol.ice_readMembers(_readStream); + Protocol.checkSupportedProtocol(_readProtocol); - _readProtocolEncoding.ice_readMembers(_readStream); - Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding); + _readProtocolEncoding.ice_readMembers(_readStream); + Protocol.checkSupportedProtocolEncoding(_readProtocolEncoding); - _readStream.readByte(); // messageType - _readStream.readByte(); // compress - int size = _readStream.readInt(); - if (size < Protocol.headerSize) { - throw new IllegalMessageSizeException(); - } + _readStream.readByte(); // messageType + _readStream.readByte(); // compress + int size = _readStream.readInt(); + if (size < Protocol.headerSize) { + throw new IllegalMessageSizeException(); + } - if (size > _messageSizeMax) { - com.zeroc.IceInternal.Ex.throwMemoryLimitException(size, _messageSizeMax); - } - if (size > _readStream.size()) { - _readStream.resize(size); + // Resize the read buffer to the message size. + if (size > _messageSizeMax) { + com.zeroc.IceInternal.Ex.throwMemoryLimitException(size, _messageSizeMax); + } + if (size > _readStream.size()) { + _readStream.resize(size); + } + _readStream.pos(pos); } - _readStream.pos(pos); - } - if (_readStream.pos() != _readStream.size()) { - if (_endpoint.datagram()) { - // The message was truncated. - throw new DatagramLimitException(); + if (_readStream.pos() != _readStream.size()) { + if (_endpoint.datagram()) { + // The message was truncated. + throw new DatagramLimitException(); + } + continue; } - continue; + break; } - break; } + // readOp and writeOp are set to the operations that the transport read or write + // calls from above returned. They indicate which operations will need to be + // monitored by the thread pool's selector when this method returns. int newOp = readOp | writeOp; - readyOp = readyOp & ~newOp; - assert (readyOp != 0 || newOp != 0); + + // Operations that are ready. For example, if message was called with + // SocketOperationRead and the transport read returned SocketOperationNone, + // reads are considered done: there's no additional data to read. + int readyOp = current.operation & ~newOp; if (_state <= StateNotValidated) { + // If the connection is still not validated and there's still data to read or + // write, continue waiting for data to read or write. if (newOp != 0) { - // - // Wait for all the transceiver conditions to be - // satisfied before continuing. - // scheduleTimeout(newOp); _threadPool.update(this, current.operation, newOp); return; } + // Initialize the connection if it's not initialized yet. if (_state == StateNotInitialized && !initialize(current.operation)) { return; } + // Validate the connection if it's not validate yet. if (_state <= StateNotValidated && !validate(current.operation)) { return; } + // The connection is validated and doesn't need additional data to be read or + // written. So unregister it from the thread pool's selector. _threadPool.unregister(this, current.operation); - // - // We start out in holding state. - // + // The connection starts in the holding state. It will be activated by the + // connection factory. setState(StateHolding); if (_startCallback != null) { startCB = _startCallback; _startCallback = null; if (startCB != null) { - ++dispatchCount; + ++upcallCount; } } - } else { + } else { // The connection is active or waits for the CloseConnection message. assert (_state <= StateClosingPending); // @@ -943,20 +966,29 @@ public void message(com.zeroc.IceInternal.ThreadPoolCurrent current) { if ((readyOp & SocketOperation.Read) != 0) { // Optimization: use the thread's stream. info = new MessageInfo(current.stream); + + // At this point, the protocol message is fully read and can therefore be + // decoded by parseMessage. parseMessage returns the operation to wait for + // readiness next. newOp |= parseMessage(info); - dispatchCount += info.messageDispatchCount; + upcallCount += info.messageDispatchCount; } if ((readyOp & SocketOperation.Write) != 0) { + // At this point the message from _writeStream is fully written and the next + // message can be written. sentCBs = new java.util.LinkedList<>(); newOp |= sendNextMessage(sentCBs); if (!sentCBs.isEmpty()) { - ++dispatchCount; + ++upcallCount; } else { sentCBs = null; } } + // If the connection is not closed yet, we can schedule the read or write + // timeout and update the thread pool selector to wait for readiness of + // read, write or both operations. if (_state < StateClosed) { scheduleTimeout(newOp); _threadPool.update(this, current.operation, newOp); @@ -967,11 +999,15 @@ public void message(com.zeroc.IceInternal.ThreadPoolCurrent current) { _acmLastActivity = Time.currentMonotonicTimeMillis(); } - if (dispatchCount == 0) { + if (upcallCount == 0) { return; // Nothing to dispatch we're done! } - _dispatchCount += dispatchCount; + _upcallCount += upcallCount; + + // There's something to dispatch so we mark IO as completed to elect a new + // leader thread and let IO be performed on this new leader thread while + // this thread continues with dispatching the up-calls. current.ioCompleted(); } catch (DatagramLimitException ex) // Expected. { @@ -1100,8 +1136,8 @@ protected void dispatch( boolean shutdown = false; synchronized (this) { - _dispatchCount -= dispatchedCount; - if (_dispatchCount == 0) { + _upcallCount -= dispatchedCount; + if (_upcallCount == 0) { // // Only initiate shutdown if not already done. It might // have already been done if the sent callback or AMI @@ -1318,7 +1354,7 @@ public Void call() throws Exception { synchronized (this) { setState(StateFinished); - if (_dispatchCount == 0) { + if (_upcallCount == 0) { reap(); } } @@ -1434,7 +1470,7 @@ public ConnectionI( _readStreamPos = -1; _writeStream = new OutputStream(instance, Protocol.currentProtocolEncoding); _writeStreamPos = -1; - _dispatchCount = 0; + _upcallCount = 0; _state = StateNotInitialized; int compressionLevel = @@ -1472,7 +1508,7 @@ protected synchronized void finalize() throws Throwable { try { com.zeroc.IceUtilInternal.Assert.FinalizerAssert(_startCallback == null); com.zeroc.IceUtilInternal.Assert.FinalizerAssert(_state == StateFinished); - com.zeroc.IceUtilInternal.Assert.FinalizerAssert(_dispatchCount == 0); + com.zeroc.IceUtilInternal.Assert.FinalizerAssert(_upcallCount == 0); com.zeroc.IceUtilInternal.Assert.FinalizerAssert(_sendStreams.isEmpty()); com.zeroc.IceUtilInternal.Assert.FinalizerAssert(_asyncRequests.isEmpty()); } catch (java.lang.Exception ex) { @@ -1696,7 +1732,7 @@ private void setState(int state) { notifyAll(); - if (_state == StateClosing && _dispatchCount == 0) { + if (_state == StateClosing && _upcallCount == 0) { try { initiateShutdown(); } catch (LocalException ex) { @@ -1706,7 +1742,7 @@ private void setState(int state) { } private void initiateShutdown() { - assert (_state == StateClosing && _dispatchCount == 0); + assert (_state == StateClosing && _upcallCount == 0); if (_shutdownInitiated) { return; @@ -2690,7 +2726,7 @@ public void completed(LocalException ex) { private int _readStreamPos; private int _writeStreamPos; - private int _dispatchCount; + private int _upcallCount; private int _state; // The current state. private boolean _shutdownInitiated = false;