From 3609ec4f2c902e0ea4266ceba7ad30411e122407 Mon Sep 17 00:00:00 2001 From: Justin Karneges Date: Fri, 13 Dec 2024 14:18:36 -0800 Subject: [PATCH] use async message filters --- src/handler/handlerengine.cpp | 85 +---------- src/handler/httpsession.cpp | 267 +++++++++++++++++++++------------- src/handler/wssession.cpp | 127 +++++++++++++++- src/handler/wssession.h | 15 +- 4 files changed, 308 insertions(+), 186 deletions(-) diff --git a/src/handler/handlerengine.cpp b/src/handler/handlerengine.cpp index 371771ec..86b29a3f 100644 --- a/src/handler/handlerengine.cpp +++ b/src/handler/handlerengine.cpp @@ -1793,77 +1793,7 @@ class HandlerEngine::Private : public QObject { WsSession *s = qobject_cast(target); - if(f.haveContentFilters) - { - // ensure content filters match - QStringList contentFilters; - foreach(const QString &f, s->channelFilters[item.channel]) - { - if(Filter::targets(f) & Filter::MessageContent) - contentFilters += f; - } - if(contentFilters != f.contentFilters) - { - QString errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(",")); - log_debug("%s", qPrintable(errorMessage)); - return; - } - } - - Filter::Context fc; - fc.subscriptionMeta = s->meta; - fc.publishMeta = item.meta; - fc.zhttpOut = zhttpOut; - fc.currentUri = s->requestData.uri; - fc.route = s->route; - fc.trusted = s->targetTrusted; - - FilterStack filters(fc, s->channelFilters[item.channel]); - - if(filters.sendAction() == Filter::Drop) - return; - - // TODO: hint support for websockets? - if(f.action != PublishFormat::Send && f.action != PublishFormat::Close && f.action != PublishFormat::Refresh) - return; - - WsControlPacket::Item i; - i.cid = s->cid.toUtf8(); - - if(f.action == PublishFormat::Send) - { - QByteArray body = filters.process(f.body); - if(body.isNull()) - { - log_debug("filter error: %s", qPrintable(filters.errorMessage())); - return; - } - - i.type = WsControlPacket::Item::Send; - - switch(f.messageType) - { - case PublishFormat::Text: i.contentType = "text"; break; - case PublishFormat::Binary: i.contentType = "binary"; break; - case PublishFormat::Ping: i.contentType = "ping"; break; - case PublishFormat::Pong: i.contentType = "pong"; break; - default: return; // unrecognized type, skip - } - - i.message = body; - } - else if(f.action == PublishFormat::Close) - { - i.type = WsControlPacket::Item::Close; - i.code = f.code; - i.reason = f.reason; - } - else if(f.action == PublishFormat::Refresh) - { - i.type = WsControlPacket::Item::Refresh; - } - - writeWsControlItems(s->peer, QList() << i); + s->publish(item); } } @@ -2690,7 +2620,7 @@ class HandlerEngine::Private : public QObject { s = new WsSession(this); wsSessionConnectionMap[s] = { - s->send.connect(boost::bind(&Private::wssession_send, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, s)), + s->send.connect(boost::bind(&Private::wssession_send, this, boost::placeholders::_1, s)), s->expired.connect(boost::bind(&Private::wssession_expired, this, s)), s->error.connect(boost::bind(&Private::wssession_error, this, s)) }; @@ -2698,6 +2628,7 @@ class HandlerEngine::Private : public QObject s->cid = QString::fromUtf8(item.cid); s->ttl = item.ttl; s->requestData.uri = item.uri; + s->zhttpOut = zhttpOut; s->refreshExpiration(); cs.wsSessions.insert(s->cid, s); log_debug("added ws session: %s", qPrintable(s->cid)); @@ -3229,16 +3160,8 @@ private slots: writeRetryPacket(addr, rp); } - void wssession_send(int reqId, const QByteArray &type, const QByteArray &message, WsSession *s) + void wssession_send(const WsControlPacket::Item &i, WsSession *s) { - WsControlPacket::Item i; - i.cid = s->cid.toUtf8(); - i.requestId = QByteArray::number(reqId); - i.type = WsControlPacket::Item::Send; - i.contentType = type; - i.message = message; - i.queue = true; - writeWsControlItems(s->peer, QList() << i); } diff --git a/src/handler/httpsession.cpp b/src/handler/httpsession.cpp index cbd8028e..d5d749d1 100644 --- a/src/handler/httpsession.cpp +++ b/src/handler/httpsession.cpp @@ -142,6 +142,19 @@ class HttpSession::Private : public QObject } }; + class QueuedItem + { + public: + PublishItem item; + QList exposeHeaders; + + QueuedItem(const PublishItem &_item, const QList &_exposeHeaders = QList()) : + item(_item), + exposeHeaders(_exposeHeaders) + { + } + }; + friend class UpdateAction; HttpSession *q; @@ -170,10 +183,11 @@ class HttpSession::Private : public QObject bool needUpdate; Priority needUpdatePriority; UpdateAction *pendingAction; - QList publishQueue; + QList publishQueue; QByteArray retryToAddress; RetryRequestPacket retryPacket; LogUtil::Config logConfig; + std::unique_ptr messageFilters; FilterStack *responseFilters; QSet activeChannels; int connectionSubscriptionMax; @@ -189,6 +203,7 @@ class HttpSession::Private : public QObject Connection errorOutConnection; Connection timerConnection; Connection retryTimerConnection; + Connection messageFiltersFinishedConnection; Private(HttpSession *_q, ZhttpRequest *_req, const HttpSession::AcceptData &_adata, const Instruct &_instruct, ZhttpManager *_outZhttp, StatsManager *_stats, RateLimiter *_updateLimiter, PublishLastIds *_publishLastIds, HttpSessionUpdateManager *_updateManager, int _connectionSubscriptionMax) : QObject(_q), @@ -438,74 +453,60 @@ class HttpSession::Private : public QObject channel.prevId = item.id; } - if(f.haveContentFilters) + if(f.action == PublishFormat::Send) { - // ensure content filters match - QStringList contentFilters; - foreach(const QString &f, channels[item.channel].filters) + if(f.haveContentFilters) { - if(Filter::targets(f) & Filter::MessageContent) - contentFilters += f; - } - if(contentFilters != f.contentFilters) - { - errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(",")); - doError(); - return; + // ensure content filters match + QStringList contentFilters; + foreach(const QString &f, channels[item.channel].filters) + { + if(Filter::targets(f) & Filter::MessageContent) + contentFilters += f; + } + if(contentFilters != f.contentFilters) + { + errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(",")); + doError(); + return; + } } - } - - QHash prevIds; - QHashIterator it(channels); - while(it.hasNext()) - { - it.next(); - const Instruct::Channel &c = it.value(); - prevIds[c.name] = c.prevId; - } - - Filter::Context fc; - fc.prevIds = prevIds; - fc.subscriptionMeta = instruct.meta; - fc.publishMeta = item.meta; - fc.zhttpOut = outZhttp; - fc.currentUri = currentUri; - fc.route = adata.route; - fc.trusted = adata.trusted; - - FilterStack fs(fc, channels[item.channel].filters); - - if(fs.sendAction() == Filter::Drop) - return; - // NOTE: http-response mode doesn't support a close - // action since it's better to send a real response + publishQueue += QueuedItem(item, exposeHeaders); - if(f.action == PublishFormat::Send) - { QByteArray body; if(f.haveBodyPatch) - { body = applyBodyPatch(instruct.response.body, f.bodyPatch); - } else - { body = f.body; - } - body = fs.process(body); - if(body.isNull()) + messageFilters = std::make_unique(channels[item.channel].filters); + messageFiltersFinishedConnection = messageFilters->finished.connect(boost::bind(&Private::messageFiltersFinished, this, boost::placeholders::_1)); + + QHash prevIds; + QHashIterator it(channels); + while(it.hasNext()) { - errorMessage = QString("filter error: %1").arg(fs.errorMessage()); - doError(); - return; + it.next(); + const Instruct::Channel &c = it.value(); + prevIds[c.name] = c.prevId; } - respond(f.code, f.reason, f.headers, body, exposeHeaders); + Filter::Context fc; + fc.prevIds = prevIds; + fc.subscriptionMeta = instruct.meta; + fc.publishMeta = item.meta; + fc.zhttpOut = outZhttp; + fc.currentUri = currentUri; + fc.route = adata.route; + fc.trusted = adata.trusted; + + // may call messageFiltersFinished immediately + messageFilters->start(fc, body); } - else if(f.action == PublishFormat::Hint) + else { - update(HighPriority); + afterMessageFilters(item, Filter::Send, QByteArray(), QList()); } } else if(f.type == PublishFormat::HttpStream) @@ -514,7 +515,7 @@ class HttpSession::Private : public QObject { if(publishQueue.count() < PUBLISH_QUEUE_MAX) { - publishQueue += item; + publishQueue += QueuedItem(item); if(state == Holding) trySendQueue(); @@ -782,7 +783,8 @@ class HttpSession::Private : public QObject // drop any non-matching queued items while(!publishQueue.isEmpty()) { - PublishItem &item = publishQueue.first(); + const QueuedItem &qi = publishQueue.first(); + const PublishItem &item = qi.item; if(!channels.contains(item.channel)) { @@ -819,12 +821,14 @@ class HttpSession::Private : public QObject { assert(instruct.holdMode == Instruct::StreamHold); - while(!publishQueue.isEmpty() && req->writeBytesAvailable() > 0) + while(!publishQueue.isEmpty() && req->writeBytesAvailable() > 0 && !messageFilters) { - PublishItem item = publishQueue.takeFirst(); + const QueuedItem &qi = publishQueue.first(); + const PublishItem &item = qi.item; if(!channels.contains(item.channel)) { + publishQueue.removeFirst(); log_debug("httpsession: received publish for channel with no subscription, dropping"); continue; } @@ -847,7 +851,7 @@ class HttpSession::Private : public QObject channel.prevId = item.id; } - PublishFormat &f = item.format; + const PublishFormat &f = item.format; if(f.haveContentFilters) { @@ -860,12 +864,16 @@ class HttpSession::Private : public QObject } if(contentFilters != f.contentFilters) { + publishQueue.removeFirst(); errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(",")); doError(); break; } } + messageFilters = std::make_unique(channels[item.channel].filters); + messageFiltersFinishedConnection = messageFilters->finished.connect(boost::bind(&Private::messageFiltersFinished, this, boost::placeholders::_1)); + QHash prevIds; QHashIterator it(channels); while(it.hasNext()) @@ -884,51 +892,9 @@ class HttpSession::Private : public QObject fc.route = adata.route; fc.trusted = adata.trusted; - FilterStack fs(fc, channels[item.channel].filters); - - if(fs.sendAction() == Filter::Drop) - continue; - - if(f.action == PublishFormat::Send) - { - QByteArray body = fs.process(f.body); - if(body.isNull()) - { - errorMessage = QString("filter error: %1").arg(fs.errorMessage()); - doError(); - break; - } - - writeBody(body); - - // restart keep alive timer - adjustKeepAlive(); - - if(!nextUri.isEmpty() && instruct.nextLinkTimeout >= 0) - { - activeChannels += item.channel; - if(activeChannels.count() == channels.count()) - { - activeChannels.clear(); - - updateManager->registerSession(q, instruct.nextLinkTimeout, nextUri); - } - } - } - else if(f.action == PublishFormat::Hint) - { - // clear queue since any items will be redundant - publishQueue.clear(); - - update(HighPriority); - break; - } - else if(f.action == PublishFormat::Close) - { - prepareToClose(); - req->endBody(); - break; - } + // may call messageFiltersFinished immediately + messageFilters->start(fc, f.body); + return; } if(state == SendingQueue) @@ -1467,6 +1433,101 @@ class HttpSession::Private : public QObject req->writeBody(body); } + void messageFiltersFinished(const Filter::MessageFilter::Result &result) + { + QueuedItem qi = publishQueue.takeFirst(); + + messageFiltersFinishedConnection.disconnect(); + messageFilters.reset(); + + if(!result.errorMessage.isNull()) + { + errorMessage = QString("filter error: %1").arg(result.errorMessage); + doError(); + return; + } + + afterMessageFilters(qi.item, result.sendAction, result.content, qi.exposeHeaders); + + if(state == SendingQueue) + { + if(publishQueue.isEmpty()) + sendQueueDone(); + } + else if(state == Holding) + { + if(!publishQueue.isEmpty()) + { + // if backlogged, turn off timers until we're able to send again + timer->stop(); + updateManager->unregisterSession(q); + } + } + + // FIXME: are these states correct? + if(state == Holding || state == SendingQueue) + trySendQueue(); + } + + void afterMessageFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content, const QList &exposeHeaders) + { + const PublishFormat &f = item.format; + + if(instruct.holdMode == Instruct::ResponseHold) + { + if(sendAction == Filter::Drop) + return; + + // NOTE: http-response mode doesn't support a close + // action since it's better to send a real response + + if(f.action == PublishFormat::Send) + { + respond(f.code, f.reason, f.headers, content, exposeHeaders); + } + else if(f.action == PublishFormat::Hint) + { + update(HighPriority); + } + } + else if(instruct.holdMode == Instruct::StreamHold) + { + if(sendAction == Filter::Drop) + return; + + if(f.action == PublishFormat::Send) + { + writeBody(content); + + // restart keep alive timer + adjustKeepAlive(); + + if(!nextUri.isEmpty() && instruct.nextLinkTimeout >= 0) + { + activeChannels += item.channel; + if(activeChannels.count() == channels.count()) + { + activeChannels.clear(); + + updateManager->registerSession(q, instruct.nextLinkTimeout, nextUri); + } + } + } + else if(f.action == PublishFormat::Hint) + { + // clear queue since any items will be redundant + publishQueue.clear(); + + update(HighPriority); + } + else if(f.action == PublishFormat::Close) + { + prepareToClose(); + req->endBody(); + } + } + } + private slots: void doError() { diff --git a/src/handler/wssession.cpp b/src/handler/wssession.cpp index d1bf9ff7..f4e27747 100644 --- a/src/handler/wssession.cpp +++ b/src/handler/wssession.cpp @@ -26,6 +26,10 @@ #include #include #include "log.h" +#include "filter.h" +#include "filterstack.h" +#include "publishitem.h" +#include "publishformat.h" #define WSCONTROL_REQUEST_TIMEOUT 8000 @@ -96,6 +100,57 @@ void WsSession::ack(int reqId) } } +void WsSession::publish(const PublishItem &item) +{ + pendingItems += item; + + if(!filters) + processNextItem(); +} + +void WsSession::processNextItem() +{ + if(pendingItems.isEmpty()) + return; + + const PublishItem &item = pendingItems.first(); + const PublishFormat &f = item.format; + + if(f.haveContentFilters) + { + // ensure content filters match + QStringList contentFilters; + foreach(const QString &f, channelFilters[item.channel]) + { + if(Filter::targets(f) & Filter::MessageContent) + contentFilters += f; + } + if(contentFilters != f.contentFilters) + { + QString errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(",")); + log_debug("%s", qPrintable(errorMessage)); + + pendingItems.removeFirst(); + processNextItem(); + return; + } + } + + filters = std::make_unique(channelFilters[item.channel]); + filtersFinishedConnection = filters->finished.connect(boost::bind(&WsSession::filtersFinished, this, boost::placeholders::_1)); + + Filter::Context fc; + fc.subscriptionMeta = meta; + fc.publishMeta = item.meta; + fc.zhttpOut = zhttpOut; + fc.currentUri = requestData.uri; + fc.route = route; + fc.trusted = targetTrusted; + + // may call filtersFinished immediately + filters->start(fc, f.body); +} + void WsSession::setupRequestTimer() { if(!pendingRequests.isEmpty()) @@ -122,6 +177,68 @@ void WsSession::setupRequestTimer() } } +void WsSession::filtersFinished(const Filter::MessageFilter::Result &result) +{ + PublishItem item = pendingItems.takeFirst(); + + filtersFinishedConnection.disconnect(); + filters.reset(); + + if(!result.errorMessage.isNull()) + { + log_debug("filter error: %s", qPrintable(result.errorMessage)); + processNextItem(); + return; + } + + afterFilters(item, result.sendAction, result.content); + + processNextItem(); +} + +void WsSession::afterFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content) +{ + if(sendAction == Filter::Drop) + return; + + const PublishFormat &f = item.format; + + // TODO: hint support for websockets? + if(f.action != PublishFormat::Send && f.action != PublishFormat::Close && f.action != PublishFormat::Refresh) + return; + + WsControlPacket::Item i; + i.cid = cid.toUtf8(); + + if(f.action == PublishFormat::Send) + { + i.type = WsControlPacket::Item::Send; + + switch(f.messageType) + { + case PublishFormat::Text: i.contentType = "text"; break; + case PublishFormat::Binary: i.contentType = "binary"; break; + case PublishFormat::Ping: i.contentType = "ping"; break; + case PublishFormat::Pong: i.contentType = "pong"; break; + default: return; // unrecognized type, skip + } + + i.message = content; + } + else if(f.action == PublishFormat::Close) + { + i.type = WsControlPacket::Item::Close; + i.code = f.code; + i.reason = f.reason; + } + else if(f.action == PublishFormat::Refresh) + { + i.type = WsControlPacket::Item::Refresh; + } + + send(i); +} + void WsSession::expireTimer_timeout() { log_debug("timing out ws session: %s", qPrintable(cid)); @@ -139,7 +256,15 @@ void WsSession::delayedTimer_timeout() pendingRequests[reqId] = QDateTime::currentMSecsSinceEpoch() + WSCONTROL_REQUEST_TIMEOUT; setupRequestTimer(); - send(reqId, delayedType, message); + WsControlPacket::Item i; + i.cid = cid.toUtf8(); + i.requestId = QByteArray::number(reqId); + i.type = WsControlPacket::Item::Send; + i.contentType = delayedType; + i.message = message; + i.queue = true; + + send(i); } void WsSession::requestTimer_timeout() diff --git a/src/handler/wssession.h b/src/handler/wssession.h index b502cc91..611c239e 100644 --- a/src/handler/wssession.h +++ b/src/handler/wssession.h @@ -28,6 +28,8 @@ #include #include #include "packet/httprequestdata.h" +#include "packet/wscontrolpacket.h" +#include "filter.h" #include using Signal = boost::signals2::signal; @@ -35,6 +37,9 @@ using Connection = boost::signals2::scoped_connection; class QTimer; +class ZhttpManager; +class PublishItem; + class WsSession : public QObject { Q_OBJECT @@ -63,6 +68,10 @@ class WsSession : public QObject QTimer *expireTimer; QTimer *delayedTimer; QTimer *requestTimer; + QList pendingItems; + ZhttpManager *zhttpOut; + std::unique_ptr filters; + Connection filtersFinishedConnection; WsSession(QObject *parent = 0); ~WsSession(); @@ -71,13 +80,17 @@ class WsSession : public QObject void flushDelayed(); void sendDelayed(const QByteArray &type, const QByteArray &message, int timeout); void ack(int reqId); + void publish(const PublishItem &item); - boost::signals2::signal send; + boost::signals2::signal send; Signal expired; Signal error; private: + void processNextItem(); void setupRequestTimer(); + void filtersFinished(const Filter::MessageFilter::Result &result); + void afterFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content); private slots: void expireTimer_timeout();