Skip to content

Commit

Permalink
use async message filters
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Dec 18, 2024
1 parent 5fc4533 commit 3609ec4
Show file tree
Hide file tree
Showing 4 changed files with 308 additions and 186 deletions.
85 changes: 4 additions & 81 deletions src/handler/handlerengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1793,77 +1793,7 @@ class HandlerEngine::Private : public QObject
{
WsSession *s = qobject_cast<WsSession*>(target);

if(f.haveContentFilters)
{
// ensure content filters match
QStringList contentFilters;
foreach(const QString &f, s->channelFilters[item.channel])
{
if(Filter::targets(f) & Filter::MessageContent)
contentFilters += f;
}
if(contentFilters != f.contentFilters)
{
QString errorMessage = QString("content filter mismatch: subscription=%1 message=%2").arg(contentFilters.join(","), f.contentFilters.join(","));
log_debug("%s", qPrintable(errorMessage));
return;
}
}

Filter::Context fc;
fc.subscriptionMeta = s->meta;
fc.publishMeta = item.meta;
fc.zhttpOut = zhttpOut;
fc.currentUri = s->requestData.uri;
fc.route = s->route;
fc.trusted = s->targetTrusted;

FilterStack filters(fc, s->channelFilters[item.channel]);

if(filters.sendAction() == Filter::Drop)
return;

// TODO: hint support for websockets?
if(f.action != PublishFormat::Send && f.action != PublishFormat::Close && f.action != PublishFormat::Refresh)
return;

WsControlPacket::Item i;
i.cid = s->cid.toUtf8();

if(f.action == PublishFormat::Send)
{
QByteArray body = filters.process(f.body);
if(body.isNull())
{
log_debug("filter error: %s", qPrintable(filters.errorMessage()));
return;
}

i.type = WsControlPacket::Item::Send;

switch(f.messageType)
{
case PublishFormat::Text: i.contentType = "text"; break;
case PublishFormat::Binary: i.contentType = "binary"; break;
case PublishFormat::Ping: i.contentType = "ping"; break;
case PublishFormat::Pong: i.contentType = "pong"; break;
default: return; // unrecognized type, skip
}

i.message = body;
}
else if(f.action == PublishFormat::Close)
{
i.type = WsControlPacket::Item::Close;
i.code = f.code;
i.reason = f.reason;
}
else if(f.action == PublishFormat::Refresh)
{
i.type = WsControlPacket::Item::Refresh;
}

writeWsControlItems(s->peer, QList<WsControlPacket::Item>() << i);
s->publish(item);
}
}

Expand Down Expand Up @@ -2690,14 +2620,15 @@ class HandlerEngine::Private : public QObject
{
s = new WsSession(this);
wsSessionConnectionMap[s] = {
s->send.connect(boost::bind(&Private::wssession_send, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, s)),
s->send.connect(boost::bind(&Private::wssession_send, this, boost::placeholders::_1, s)),
s->expired.connect(boost::bind(&Private::wssession_expired, this, s)),
s->error.connect(boost::bind(&Private::wssession_error, this, s))
};
s->peer = packet.from;
s->cid = QString::fromUtf8(item.cid);
s->ttl = item.ttl;
s->requestData.uri = item.uri;
s->zhttpOut = zhttpOut;
s->refreshExpiration();
cs.wsSessions.insert(s->cid, s);
log_debug("added ws session: %s", qPrintable(s->cid));
Expand Down Expand Up @@ -3229,16 +3160,8 @@ private slots:
writeRetryPacket(addr, rp);
}

void wssession_send(int reqId, const QByteArray &type, const QByteArray &message, WsSession *s)
void wssession_send(const WsControlPacket::Item &i, WsSession *s)
{
WsControlPacket::Item i;
i.cid = s->cid.toUtf8();
i.requestId = QByteArray::number(reqId);
i.type = WsControlPacket::Item::Send;
i.contentType = type;
i.message = message;
i.queue = true;

writeWsControlItems(s->peer, QList<WsControlPacket::Item>() << i);
}

Expand Down
Loading

0 comments on commit 3609ec4

Please sign in to comment.