Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RedisConnection#Connect(): get rid of spin lock #10265

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions lib/base/io-engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,50 @@ void IoEngine::RunEventLoop()
}
}

AsioConditionVariable::AsioConditionVariable(boost::asio::io_context& io, bool init)
AsioEvent::AsioEvent(boost::asio::io_context& io, bool init)
: m_Timer(io)
{
m_Timer.expires_at(init ? boost::posix_time::neg_infin : boost::posix_time::pos_infin);
}

void AsioConditionVariable::Set()
void AsioEvent::Set()
{
m_Timer.expires_at(boost::posix_time::neg_infin);
}

void AsioConditionVariable::Clear()
void AsioEvent::Clear()
{
m_Timer.expires_at(boost::posix_time::pos_infin);
}

void AsioConditionVariable::Wait(boost::asio::yield_context yc)
void AsioEvent::Wait(boost::asio::yield_context yc)
{
boost::system::error_code ec;
m_Timer.async_wait(yc[ec]);
}

AsioSymmetricEvent::AsioSymmetricEvent(boost::asio::io_context& io, bool init)
: m_IsTrue(io, init), m_IsFalse(io, !init)
{
}

void AsioSymmetricEvent::Set()
{
m_IsTrue.Set();
m_IsFalse.Clear();
}

void AsioSymmetricEvent::Clear()
{
m_IsTrue.Clear();
m_IsFalse.Set();
}

void AsioSymmetricEvent::Wait(boost::asio::yield_context yc, bool desiredState)
{
(desiredState ? m_IsTrue : m_IsFalse).Wait(std::move(yc));
}

/**
* Cancels any pending timeout callback.
*
Expand Down
24 changes: 21 additions & 3 deletions lib/base/io-engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ class TerminateIoThread : public std::exception
};

/**
* Condition variable which doesn't block I/O threads
* Awaitable flag which doesn't block I/O threads, inspired by threading.Event from Python
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the updated docs of the class in question.

*
* @ingroup base
*/
class AsioConditionVariable
class AsioEvent
{
public:
AsioConditionVariable(boost::asio::io_context& io, bool init = false);
AsioEvent(boost::asio::io_context& io, bool init = false);

void Set();
void Clear();
Expand All @@ -162,6 +162,24 @@ class AsioConditionVariable
boost::asio::deadline_timer m_Timer;
};

/**
* Like AsioEvent, but additionally supports waiting for an event to be cleared
*
* @ingroup base
*/
class AsioSymmetricEvent
{
public:
AsioSymmetricEvent(boost::asio::io_context& io, bool init = false);

void Set();
void Clear();
void Wait(boost::asio::yield_context yc, bool desiredState = true);

private:
AsioEvent m_IsTrue, m_IsFalse;
};

/**
* I/O timeout emulator
*
Expand Down
12 changes: 3 additions & 9 deletions lib/icingadb/redisconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,12 +302,6 @@ void RedisConnection::Connect(asio::yield_context& yc)

boost::asio::deadline_timer timer (m_Strand.context());

auto waitForReadLoop ([this, &yc]() {
while (!m_Queues.FutureResponseActions.empty()) {
IoEngine::YieldCurrentCoroutine(yc);
}
});

for (;;) {
try {
if (m_Path.IsEmpty()) {
Expand Down Expand Up @@ -339,7 +333,7 @@ void RedisConnection::Connect(asio::yield_context& yc)
}

Handshake(conn, yc);
waitForReadLoop();
m_QueuedReads.Wait(yc, false);
m_TlsConn = std::move(conn);
} else {
Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
Expand All @@ -350,7 +344,7 @@ void RedisConnection::Connect(asio::yield_context& yc)

icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
Handshake(conn, yc);
waitForReadLoop();
m_QueuedReads.Wait(yc, false);
m_TcpConn = std::move(conn);
}
} else {
Expand All @@ -362,7 +356,7 @@ void RedisConnection::Connect(asio::yield_context& yc)

conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
Handshake(conn, yc);
waitForReadLoop();
m_QueuedReads.Wait(yc, false);
m_UnixConn = std::move(conn);
}

Expand Down
3 changes: 2 additions & 1 deletion lib/icingadb/redisconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ namespace icinga
std::set<QueryPriority> m_SuppressedQueryKinds;

// Indicate that there's something to send/receive
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
AsioEvent m_QueuedWrites;
AsioSymmetricEvent m_QueuedReads;

std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback;

Expand Down
4 changes: 2 additions & 2 deletions lib/remote/jsonrpcconnection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ class JsonRpcConnection final : public Object
double m_Seen;
boost::asio::io_context::strand m_IoStrand;
std::vector<String> m_OutgoingMessagesQueue;
AsioConditionVariable m_OutgoingMessagesQueued;
AsioConditionVariable m_WriterDone;
AsioEvent m_OutgoingMessagesQueued;
AsioEvent m_WriterDone;
Atomic<bool> m_ShuttingDown;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;

Expand Down
Loading