From 5e2934df0fd3cdd65a4e8bc6f770d41abcc7d891 Mon Sep 17 00:00:00 2001 From: Justin Karneges Date: Wed, 18 Dec 2024 21:54:04 -0800 Subject: [PATCH] httpsession: consolidate response/stream filtering and avoid recursion --- src/handler/httpsession.cpp | 156 +++++++++++------------------------- 1 file changed, 48 insertions(+), 108 deletions(-) diff --git a/src/handler/httpsession.cpp b/src/handler/httpsession.cpp index d5d749d1..b892be12 100644 --- a/src/handler/httpsession.cpp +++ b/src/handler/httpsession.cpp @@ -184,6 +184,7 @@ class HttpSession::Private : public QObject Priority needUpdatePriority; UpdateAction *pendingAction; QList publishQueue; + bool processingSendQueue; QByteArray retryToAddress; RetryRequestPacket retryPacket; LogUtil::Config logConfig; @@ -219,6 +220,7 @@ class HttpSession::Private : public QObject sentOutReqData(0), retries(0), needUpdate(false), + processingSendQueue(false), pendingAction(0), responseFilters(0), connectionSubscriptionMax(_connectionSubscriptionMax), @@ -431,83 +433,10 @@ class HttpSession::Private : public QObject assert(instruct.holdMode == Instruct::ResponseHold); - if(!channels.contains(item.channel)) - { - log_debug("httpsession: received publish for channel with no subscription, dropping"); - return; - } - - Instruct::Channel &channel = channels[item.channel]; - - if(!channel.prevId.isNull()) - { - if(channel.prevId != item.prevId) - { - log_debug("last ID inconsistency (got=%s, expected=%s), retrying", qPrintable(item.prevId), qPrintable(channel.prevId)); - publishLastIds->remove(item.channel); - - update(LowPriority); - return; - } - - channel.prevId = item.id; - } - - if(f.action == PublishFormat::Send) - { - if(f.haveContentFilters) - { - // ensure content filters match - QStringList contentFilters; - foreach(const QString &f, channels[item.channel].filters) - { - if(Filter::targets(f) & Filter::MessageContent) - contentFilters += f; - } - if(contentFilters != f.contentFilters) - { - errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(",")); - doError(); - return; - } - } - - publishQueue += QueuedItem(item, exposeHeaders); - - QByteArray body; - if(f.haveBodyPatch) - body = applyBodyPatch(instruct.response.body, f.bodyPatch); - else - body = f.body; + publishQueue += QueuedItem(item, exposeHeaders); - messageFilters = std::make_unique(channels[item.channel].filters); - messageFiltersFinishedConnection = messageFilters->finished.connect(boost::bind(&Private::messageFiltersFinished, this, boost::placeholders::_1)); - - QHash prevIds; - QHashIterator it(channels); - while(it.hasNext()) - { - it.next(); - const Instruct::Channel &c = it.value(); - prevIds[c.name] = c.prevId; - } - - Filter::Context fc; - fc.prevIds = prevIds; - fc.subscriptionMeta = instruct.meta; - fc.publishMeta = item.meta; - fc.zhttpOut = outZhttp; - fc.currentUri = currentUri; - fc.route = adata.route; - fc.trusted = adata.trusted; - - // may call messageFiltersFinished immediately - messageFilters->start(fc, body); - } - else - { - afterMessageFilters(item, Filter::Send, QByteArray(), QList()); - } + state = SendingQueue; + trySendQueue(); } else if(f.type == PublishFormat::HttpStream) { @@ -819,7 +748,7 @@ class HttpSession::Private : public QObject void trySendQueue() { - assert(instruct.holdMode == Instruct::StreamHold); + processingSendQueue = true; while(!publishQueue.isEmpty() && req->writeBytesAvailable() > 0 && !messageFilters) { @@ -828,8 +757,8 @@ class HttpSession::Private : public QObject if(!channels.contains(item.channel)) { - publishQueue.removeFirst(); log_debug("httpsession: received publish for channel with no subscription, dropping"); + publishQueue.removeFirst(); continue; } @@ -853,6 +782,13 @@ class HttpSession::Private : public QObject const PublishFormat &f = item.format; + if(f.action != PublishFormat::Send) + { + // skip filters + afterMessageFilters(item, Filter::Send, QByteArray(), QList()); + continue; + } + if(f.haveContentFilters) { // ensure content filters match @@ -871,6 +807,12 @@ class HttpSession::Private : public QObject } } + QByteArray body; + if(f.type == PublishFormat::HttpResponse && f.haveBodyPatch) + body = applyBodyPatch(instruct.response.body, f.bodyPatch); + else + body = f.body; + messageFilters = std::make_unique(channels[item.channel].filters); messageFiltersFinishedConnection = messageFilters->finished.connect(boost::bind(&Private::messageFiltersFinished, this, boost::placeholders::_1)); @@ -892,25 +834,33 @@ class HttpSession::Private : public QObject fc.route = adata.route; fc.trusted = adata.trusted; - // may call messageFiltersFinished immediately - messageFilters->start(fc, f.body); - return; + // may call messageFiltersFinished immediately. if it does, queue + // processing will continue. else, the loop will end and queue + // processing will resume after the filters finish + messageFilters->start(fc, body); } - if(state == SendingQueue) - { - if(publishQueue.isEmpty()) - sendQueueDone(); - } - else if(state == Holding) + if(!messageFilters && instruct.holdMode == Instruct::StreamHold) { - if(!publishQueue.isEmpty()) + // the queue is empty or client buffer is full + + if(state == SendingQueue) { - // if backlogged, turn off timers until we're able to send again - timer->stop(); - updateManager->unregisterSession(q); + if(publishQueue.isEmpty()) + sendQueueDone(); + } + else if(state == Holding) + { + if(!publishQueue.isEmpty()) + { + // if backlogged, turn off timers until we're able to send again + timer->stop(); + updateManager->unregisterSession(q); + } } } + + processingSendQueue = false; } void sendQueueDone() @@ -1448,28 +1398,18 @@ class HttpSession::Private : public QObject } afterMessageFilters(qi.item, result.sendAction, result.content, qi.exposeHeaders); + } - if(state == SendingQueue) - { - if(publishQueue.isEmpty()) - sendQueueDone(); - } - else if(state == Holding) - { - if(!publishQueue.isEmpty()) - { - // if backlogged, turn off timers until we're able to send again - timer->stop(); - updateManager->unregisterSession(q); - } - } + void afterMessageFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content, const QList &exposeHeaders) + { + processItem(item, sendAction, content, exposeHeaders); - // FIXME: are these states correct? - if(state == Holding || state == SendingQueue) + // if filters finished asynchronously then we need to resume processing + if(!processingSendQueue) trySendQueue(); } - void afterMessageFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content, const QList &exposeHeaders) + void processItem(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content, const QList &exposeHeaders) { const PublishFormat &f = item.format;