Skip to content

Commit

Permalink
httpsession: consolidate response/stream filtering and avoid recursion
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Dec 19, 2024
1 parent 47e4958 commit 5e2934d
Showing 1 changed file with 48 additions and 108 deletions.
156 changes: 48 additions & 108 deletions src/handler/httpsession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class HttpSession::Private : public QObject
Priority needUpdatePriority;
UpdateAction *pendingAction;
QList<QueuedItem> publishQueue;
bool processingSendQueue;
QByteArray retryToAddress;
RetryRequestPacket retryPacket;
LogUtil::Config logConfig;
Expand Down Expand Up @@ -219,6 +220,7 @@ class HttpSession::Private : public QObject
sentOutReqData(0),
retries(0),
needUpdate(false),
processingSendQueue(false),
pendingAction(0),
responseFilters(0),
connectionSubscriptionMax(_connectionSubscriptionMax),
Expand Down Expand Up @@ -431,83 +433,10 @@ class HttpSession::Private : public QObject

assert(instruct.holdMode == Instruct::ResponseHold);

if(!channels.contains(item.channel))
{
log_debug("httpsession: received publish for channel with no subscription, dropping");
return;
}

Instruct::Channel &channel = channels[item.channel];

if(!channel.prevId.isNull())
{
if(channel.prevId != item.prevId)
{
log_debug("last ID inconsistency (got=%s, expected=%s), retrying", qPrintable(item.prevId), qPrintable(channel.prevId));
publishLastIds->remove(item.channel);

update(LowPriority);
return;
}

channel.prevId = item.id;
}

if(f.action == PublishFormat::Send)
{
if(f.haveContentFilters)
{
// ensure content filters match
QStringList contentFilters;
foreach(const QString &f, channels[item.channel].filters)
{
if(Filter::targets(f) & Filter::MessageContent)
contentFilters += f;
}
if(contentFilters != f.contentFilters)
{
errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(","));
doError();
return;
}
}

publishQueue += QueuedItem(item, exposeHeaders);

QByteArray body;
if(f.haveBodyPatch)
body = applyBodyPatch(instruct.response.body, f.bodyPatch);
else
body = f.body;
publishQueue += QueuedItem(item, exposeHeaders);

messageFilters = std::make_unique<Filter::MessageFilterStack>(channels[item.channel].filters);
messageFiltersFinishedConnection = messageFilters->finished.connect(boost::bind(&Private::messageFiltersFinished, this, boost::placeholders::_1));

QHash<QString, QString> prevIds;
QHashIterator<QString, Instruct::Channel> it(channels);
while(it.hasNext())
{
it.next();
const Instruct::Channel &c = it.value();
prevIds[c.name] = c.prevId;
}

Filter::Context fc;
fc.prevIds = prevIds;
fc.subscriptionMeta = instruct.meta;
fc.publishMeta = item.meta;
fc.zhttpOut = outZhttp;
fc.currentUri = currentUri;
fc.route = adata.route;
fc.trusted = adata.trusted;

// may call messageFiltersFinished immediately
messageFilters->start(fc, body);
}
else
{
afterMessageFilters(item, Filter::Send, QByteArray(), QList<QByteArray>());
}
state = SendingQueue;
trySendQueue();
}
else if(f.type == PublishFormat::HttpStream)
{
Expand Down Expand Up @@ -819,7 +748,7 @@ class HttpSession::Private : public QObject

void trySendQueue()
{
assert(instruct.holdMode == Instruct::StreamHold);
processingSendQueue = true;

while(!publishQueue.isEmpty() && req->writeBytesAvailable() > 0 && !messageFilters)
{
Expand All @@ -828,8 +757,8 @@ class HttpSession::Private : public QObject

if(!channels.contains(item.channel))
{
publishQueue.removeFirst();
log_debug("httpsession: received publish for channel with no subscription, dropping");
publishQueue.removeFirst();
continue;
}

Expand All @@ -853,6 +782,13 @@ class HttpSession::Private : public QObject

const PublishFormat &f = item.format;

if(f.action != PublishFormat::Send)
{
// skip filters
afterMessageFilters(item, Filter::Send, QByteArray(), QList<QByteArray>());
continue;
}

if(f.haveContentFilters)
{
// ensure content filters match
Expand All @@ -871,6 +807,12 @@ class HttpSession::Private : public QObject
}
}

QByteArray body;
if(f.type == PublishFormat::HttpResponse && f.haveBodyPatch)
body = applyBodyPatch(instruct.response.body, f.bodyPatch);
else
body = f.body;

messageFilters = std::make_unique<Filter::MessageFilterStack>(channels[item.channel].filters);
messageFiltersFinishedConnection = messageFilters->finished.connect(boost::bind(&Private::messageFiltersFinished, this, boost::placeholders::_1));

Expand All @@ -892,25 +834,33 @@ class HttpSession::Private : public QObject
fc.route = adata.route;
fc.trusted = adata.trusted;

// may call messageFiltersFinished immediately
messageFilters->start(fc, f.body);
return;
// may call messageFiltersFinished immediately. if it does, queue
// processing will continue. else, the loop will end and queue
// processing will resume after the filters finish
messageFilters->start(fc, body);
}

if(state == SendingQueue)
{
if(publishQueue.isEmpty())
sendQueueDone();
}
else if(state == Holding)
if(!messageFilters && instruct.holdMode == Instruct::StreamHold)
{
if(!publishQueue.isEmpty())
// the queue is empty or client buffer is full

if(state == SendingQueue)
{
// if backlogged, turn off timers until we're able to send again
timer->stop();
updateManager->unregisterSession(q);
if(publishQueue.isEmpty())
sendQueueDone();
}
else if(state == Holding)
{
if(!publishQueue.isEmpty())
{
// if backlogged, turn off timers until we're able to send again
timer->stop();
updateManager->unregisterSession(q);
}
}
}

processingSendQueue = false;
}

void sendQueueDone()
Expand Down Expand Up @@ -1448,28 +1398,18 @@ class HttpSession::Private : public QObject
}

afterMessageFilters(qi.item, result.sendAction, result.content, qi.exposeHeaders);
}

if(state == SendingQueue)
{
if(publishQueue.isEmpty())
sendQueueDone();
}
else if(state == Holding)
{
if(!publishQueue.isEmpty())
{
// if backlogged, turn off timers until we're able to send again
timer->stop();
updateManager->unregisterSession(q);
}
}
void afterMessageFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content, const QList<QByteArray> &exposeHeaders)
{
processItem(item, sendAction, content, exposeHeaders);

// FIXME: are these states correct?
if(state == Holding || state == SendingQueue)
// if filters finished asynchronously then we need to resume processing
if(!processingSendQueue)
trySendQueue();
}

void afterMessageFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content, const QList<QByteArray> &exposeHeaders)
void processItem(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content, const QList<QByteArray> &exposeHeaders)
{
const PublishFormat &f = item.format;

Expand Down

0 comments on commit 5e2934d

Please sign in to comment.