Skip to content

Commit

Permalink
wssession: avoid recursion
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Dec 19, 2024
1 parent 3609ec4 commit 47e4958
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 47 deletions.
94 changes: 49 additions & 45 deletions src/handler/wssession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ WsSession::WsSession(QObject *parent) :
nextReqId(0),
logLevel(LOG_LEVEL_DEBUG),
targetTrusted(false),
ttl(0)
ttl(0),
processingSendQueue(false)
{
expireTimer = new QTimer(this);
expireTimer->setSingleShot(true);
Expand Down Expand Up @@ -102,53 +103,58 @@ void WsSession::ack(int reqId)

void WsSession::publish(const PublishItem &item)
{
pendingItems += item;
publishQueue += item;

if(!filters)
processNextItem();
if(!processingSendQueue)
trySendQueue();
}

void WsSession::processNextItem()
void WsSession::trySendQueue()
{
if(pendingItems.isEmpty())
return;

const PublishItem &item = pendingItems.first();
const PublishFormat &f = item.format;
processingSendQueue = true;

if(f.haveContentFilters)
while(!publishQueue.isEmpty() && !filters)
{
// ensure content filters match
QStringList contentFilters;
foreach(const QString &f, channelFilters[item.channel])
const PublishItem &item = publishQueue.first();
const PublishFormat &f = item.format;

if(f.haveContentFilters)
{
if(Filter::targets(f) & Filter::MessageContent)
contentFilters += f;
// ensure content filters match
QStringList contentFilters;
foreach(const QString &f, channelFilters[item.channel])
{
if(Filter::targets(f) & Filter::MessageContent)
contentFilters += f;
}
if(contentFilters != f.contentFilters)
{
QString errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(","));
log_debug("%s", qPrintable(errorMessage));

publishQueue.removeFirst();
continue;
}
}
if(contentFilters != f.contentFilters)
{
QString errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(","));
log_debug("%s", qPrintable(errorMessage));

pendingItems.removeFirst();
processNextItem();
return;
}
filters = std::make_unique<Filter::MessageFilterStack>(channelFilters[item.channel]);
filtersFinishedConnection = filters->finished.connect(boost::bind(&WsSession::filtersFinished, this, boost::placeholders::_1));

Filter::Context fc;
fc.subscriptionMeta = meta;
fc.publishMeta = item.meta;
fc.zhttpOut = zhttpOut;
fc.currentUri = requestData.uri;
fc.route = route;
fc.trusted = targetTrusted;

// may call filtersFinished immediately. if it does, queue processing
// will continue. else, the loop will end and queue processing will
// resume after the filters finish
filters->start(fc, f.body);
}

filters = std::make_unique<Filter::MessageFilterStack>(channelFilters[item.channel]);
filtersFinishedConnection = filters->finished.connect(boost::bind(&WsSession::filtersFinished, this, boost::placeholders::_1));

Filter::Context fc;
fc.subscriptionMeta = meta;
fc.publishMeta = item.meta;
fc.zhttpOut = zhttpOut;
fc.currentUri = requestData.uri;
fc.route = route;
fc.trusted = targetTrusted;

// may call filtersFinished immediately
filters->start(fc, f.body);
processingSendQueue = false;
}

void WsSession::setupRequestTimer()
Expand Down Expand Up @@ -179,21 +185,19 @@ void WsSession::setupRequestTimer()

void WsSession::filtersFinished(const Filter::MessageFilter::Result &result)
{
PublishItem item = pendingItems.takeFirst();
PublishItem item = publishQueue.takeFirst();

filtersFinishedConnection.disconnect();
filters.reset();

if(!result.errorMessage.isNull())
{
log_debug("filter error: %s", qPrintable(result.errorMessage));
processNextItem();
return;
}

afterFilters(item, result.sendAction, result.content);
else
afterFilters(item, result.sendAction, result.content);

processNextItem();
// if filters finished asynchronously then we need to resume processing
if(!processingSendQueue)
trySendQueue();
}

void WsSession::afterFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content)
Expand Down
5 changes: 3 additions & 2 deletions src/handler/wssession.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ class WsSession : public QObject
QTimer *expireTimer;
QTimer *delayedTimer;
QTimer *requestTimer;
QList<PublishItem> pendingItems;
QList<PublishItem> publishQueue;
ZhttpManager *zhttpOut;
std::unique_ptr<Filter::MessageFilter> filters;
Connection filtersFinishedConnection;
bool processingSendQueue;

WsSession(QObject *parent = 0);
~WsSession();
Expand All @@ -87,7 +88,7 @@ class WsSession : public QObject
Signal error;

private:
void processNextItem();
void trySendQueue();
void setupRequestTimer();
void filtersFinished(const Filter::MessageFilter::Result &result);
void afterFilters(const PublishItem &item, Filter::SendAction sendAction, const QByteArray &content);
Expand Down

0 comments on commit 47e4958

Please sign in to comment.