Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition when using ResponseStream::flush() from another thread #1290

Open
bkueng opened this issue Feb 14, 2025 · 6 comments
Open

Race condition when using ResponseStream::flush() from another thread #1290

bkueng opened this issue Feb 14, 2025 · 6 comments

Comments

@bkueng
Copy link
Contributor

bkueng commented Feb 14, 2025

I have a use-case where I call ResponseStream::flush() from another thread for an ongoing connection.

In rare cases ThreadSanitizer raises an issue in the form of:

  Read of size 8 at 0x722000004188 by main thread (mutexes: write M0):
    #0 std::__shared_count<(__gnu_cxx::_Lock_policy)2>::_M_swap(std::__shared_count<(__gnu_cxx::_Lock_policy)2>&) /usr/include/c++/14/bits/shared_ptr_base.h:1097 (param-server+0x46a270) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
    #1 std::__shared_ptr<Pistache::Async::Private::Core, (__gnu_cxx::_Lock_policy)2>::__shared_ptr(std::__shared_ptr<Pistache::Async::Private::Core, (__gnu_cxx::_Lock_policy)2>&&) <null> (param-server+0x777a90) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
   ...

  Previous write of size 8 at 0x722000004188 by thread T3 (mutexes: write M1, write M2, write M3):
    #0 operator new(unsigned long) <null> (libtsan.so.2+0x66286) (BuildId: c08afb1c60772d9b4e4d4be38d0c0434c5b41990)
    #1 void Pistache::Queue<Pistache::Tcp::Transport::WriteEntry>::push<Pistache::Tcp::Transport::WriteEntry>(Pistache::Tcp::Transport::WriteEntry&&) <null> (param-server+0x77d12e) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
    ...

  Location is heap block of size 120 at 0x722000004180 allocated by thread T3:
    #0 operator new(unsigned long) <null> (libtsan.so.2+0x66286) (BuildId: c08afb1c60772d9b4e4d4be38d0c0434c5b41990)
    #1 void Pistache::Queue<Pistache::Tcp::Transport::WriteEntry>::push<Pistache::Tcp::Transport::WriteEntry>(Pistache::Tcp::Transport::WriteEntry&&) <null> (param-server+0x77d12e) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
   ...

  Mutex M0 (0x7220000001d8) created at:
 [ mutex in my application ]

  Mutex M1 (0x728400004c38) created at:
    #0 pthread_mutex_lock <null> (libtsan.so.2+0x1f60e) (BuildId: c08afb1c60772d9b4e4d4be38d0c0434c5b41990)
    #1 __gthread_mutex_lock /usr/include/c++/14/x86_64-redhat-linux/bits/gthr-default.h:762 (param-server+0x50e5ab) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
    #2 std::mutex::lock() /usr/include/c++/14/bits/std_mutex.h:113 (param-server+0x50e8e2) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
    #3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) /usr/include/c++/14/bits/std_mutex.h:250 (param-server+0x5133fa) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
    #4 Pistache::Aio::SyncImpl::addHandler(std::shared_ptr<Pistache::Aio::Handler> const&, bool) <null> (param-server+0x80d365) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)

  Mutex M2 (0x728400004c08) created at:
    #0 pthread_mutex_lock <null> (libtsan.so.2+0x1f60e) (BuildId: c08afb1c60772d9b4e4d4be38d0c0434c5b41990)
    #1 __gthread_mutex_lock /usr/include/c++/14/x86_64-redhat-linux/bits/gthr-default.h:762 (param-server+0x50e5ab) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
    #2 std::mutex::lock() /usr/include/c++/14/bits/std_mutex.h:113 (param-server+0x50e8e2) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
    #3 std::lock_guard<std::mutex>::lock_guard(std::mutex&) /usr/include/c++/14/bits/std_mutex.h:250 (param-server+0x5133fa) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)
    #4 Pistache::Aio::SyncImpl::runOnce() <null> (param-server+0x80d8de) (BuildId: 1b65773e8330da60c73033ad3abacf9d10ca9aa8)

  Mutex M3 (0x722400000320) created at:
 [ mutex in my application ]

  Thread T3 (tid=528216, running) created by thread T2 at:
    #0 pthread_create <null> (libtsan.so.2+0x205e6) (BuildId: c08afb1c60772d9b4e4d4be38d0c0434c5b41990)
    #1 std::thread::_M_start_thread(std::unique_ptr<std::thread::_State, std::default_delete<std::thread::_State> >, void (*)()) <null> (libstdc++.so.6+0x4b5f8) (BuildId: 4afeee4717e6cbbc84c67861dfcfdeb165bfdad7)

There's somehow no debug info for the pistache lib, but this is the full stack when tsan does the reporting:

__tsan::ReportRace(__tsan::ThreadState *, __tsan::RawShadow *, __tsan::Shadow, __tsan::Shadow, unsigned long) 0x00007ffff6e7d970
std::__shared_count::_M_swap shared_ptr_base.h:1097
std::__shared_ptr::__shared_ptr(std::__shared_ptr<…> &&) 0x0000000000777a91
std::shared_ptr::shared_ptr(std::shared_ptr<…> &&) 0x0000000000774514
Pistache::Async::Resolver::Resolver(Pistache::Async::Resolver &&) 0x000000000077187d
Pistache::Async::Deferred::Deferred(Pistache::Async::Deferred<…> &&) 0x00000000007727ef
Pistache::Tcp::Transport::WriteEntry::WriteEntry(Pistache::Tcp::Transport::WriteEntry &&) 0x000000000077f189
Pistache::Queue::pop() 0x00000000007c6d7a
Pistache::PollableQueue::pop() 0x00000000007d0c7e
Pistache::Queue::popSafe() 0x00000000007c49f9
Pistache::Tcp::Transport::handleWriteQueue(bool) 0x00000000007c0f0b
Pistache::Tcp::Transport::flush() 0x00000000007bf5a5
Pistache::Http::ResponseStream::flush() 0x000000000076afab
   ...

Is my usage pattern not supported? Is there another approach?

From what I see it is violating the 'single consumer' assumption of the Pistache::Queue.
Just to test I added a mutex around the writesQueue in transport.h accesses, which silences the sanitizer.
I'm using commit 5d8d062, very close to master.

@dgreatwood
Copy link
Collaborator

Hi @bkueng -

Thanks for reporting this issue. Couple of things.

I'd be curious to see the full stack backtrace for both the read (main thread) and write (T3) in the sanitizer report, if you can provide that?

Because flush does an immediate dequeue and immediate send-initiation, it may need to be protected by a mutex among your own threads. For instance, in tests/streaming_test.cc:80, N_LETTERS (26) threads are used to do parallel write-flush pair sequences, 10 pairs per sequence. However, the mutex responseLock is used in streaming_test.cc to ensure that only one thread at a time may call write and then flush. Do you have a similar mechanism in your own code?

Are you using any other means of sending a response, e.g. calling send(...) at all?

There's somehow no debug info for the pistache lib

[DG] I don't know that any more info is needed beyond the sanitizer report, but if you do want Pistache's internal debug logs you can use the meson setup options: --buildtype=debug -DPISTACHE_DEBUG=true when building Pistache. You can also try the convenience script bldscripts/mesbuilddebug.sh, presuming that that set of build options suits you. The debug output will then be in syslog (presuming you're in Linux or BSD).

Is there another approach?

[DG] It is certainly more common to use send rather than flush (see tests/http_server_test.cc for examples). But perhaps you have reasons to use flush in your app?

Thanks again!

@bkueng
Copy link
Contributor Author

bkueng commented Feb 17, 2025

I was not able to trigger it with full debug build, unfortunately.

My setup is similar as in the streaming_test, except that I only have one main thread and one pistache worker thread.
Both threads can call write and then flush on the stream. I had to add the flush, as otherwise the data is not sent immediately, which I need.

So to test further I added a lock around all the write and flush calls, as in the unit test. With that it gets even harder to trigger, but the race still persists.
The problem is that the queue can get accessed (pop) directly from the pistache worker thread, which is outside of my control (and hence I cannot lock against that). This is the call: https://github.com/pistacheio/pistache/blob/master/src/common/transport.cc#L204

I captured a full stack trace for that:

__tsan::OnReport(const __tsan::ReportDesc *, bool) 0x00007ffff6e7a310
<unknown> 0x00007ffff6e7d6b6
<unknown> 0x00007ffff6e7e9a7
std::__shared_count::_M_swap shared_ptr_base.h:1097
std::__shared_ptr::__shared_ptr shared_ptr_base.h:1535
std::shared_ptr::shared_ptr shared_ptr.h:359
Pistache::Async::Resolver::Resolver async.h:849
Pistache::Async::Deferred::Deferred async.h:968
Pistache::Tcp::Transport::WriteEntry::WriteEntry transport.h:194
Pistache::Queue::pop mailbox.h:259
Pistache::PollableQueue::pop mailbox.h:367
Pistache::Queue::popSafe mailbox.h:272
Pistache::Tcp::Transport::handleWriteQueue transport.cc:978
Pistache::Tcp::Transport::onReady transport.cc:209
Pistache::Http::TransportImpl::onReady endpoint.cc:160
Pistache::Aio::SyncImpl::handleFds reactor.cc:367
Pistache::Aio::SyncImpl::runOnce reactor.cc:311
Pistache::Aio::SyncImpl::run reactor.cc:333
<lambda#1>::operator()() const reactor.cc:772
std::__invoke_impl<…>(std::__invoke_other, <lambda#1> &&) invoke.h:61
std::__invoke<…>(<lambda#1> &&) invoke.h:96
std::thread::_Invoker::_M_invoke<…>(std::_Index_tuple<…>) std_thread.h:301
std::thread::_Invoker::operator()() std_thread.h:308
std::thread::_State_impl::_M_run() std_thread.h:253
<unknown> 0x00007ffff6a4b524
<unknown> 0x00007ffff6e15c7a
start_thread 0x00007ffff6798168
__clone3 0x00007ffff681c14c

The other call from flush happens here: https://github.com/pistacheio/pistache/blob/master/src/common/transport.cc#L100

I was able to resolve the problem with these changes (I'm not suggesting this is how it should be solved though):

diff --git a/include/pistache/transport.h b/include/pistache/transport.h
index d406ae1..6e262e1 100644
--- a/include/pistache/transport.h
+++ b/include/pistache/transport.h
@@ -78,6 +78,7 @@ namespace Pistache::Tcp
                                      msg_more_style
 #endif
                     );
+					Guard guard(writesQueueLock);
                     writesQueue.push(std::move(write));
                 });
         }
@@ -261,6 +262,7 @@ namespace Pistache::Tcp
         std::shared_ptr<EventMethEpollEquiv> epoll_fd;
 #endif
 
+        Lock writesQueueLock;
         PollableQueue<WriteEntry> writesQueue;
         std::unordered_map<Fd, std::deque<WriteEntry>> toWrite;
         Lock toWriteLock;
diff --git a/src/common/transport.cc b/src/common/transport.cc
index f9a3fe8..d78c23b 100644
--- a/src/common/transport.cc
+++ b/src/common/transport.cc
@@ -968,6 +968,7 @@ namespace Pistache::Tcp
         // Let's drain the queue
         for (;;)
         {
+			Guard guard(writesQueueLock);
             auto write = writesQueue.popSafe();
             if (!write)
                 break;

Hence I think it needs a change in the pistache library.

@dgreatwood
Copy link
Collaborator

Thanks, this is helpful data.

Some sort of lock, as you described, could work. I'd also like to ponder whether response-flush could bypass the write queue completely, thus avoiding the issue of a lock on the queue. But I will need some more time to review that.

It would still be helpful to have the full stack trace you see for the flush call as well, if you have it.

Thanks once more.

@bkueng
Copy link
Contributor Author

bkueng commented Feb 18, 2025

Thanks for the quick reply.

I'd also like to ponder whether response-flush could bypass the write queue completely, thus avoiding the issue of a lock on the queue

I was also thinking in that direction, to let the epoll worker thread do the actual work. But I don't have enough knowledge about the library to propose something concrete.

It would still be helpful to have the full stack trace you see for the flush call as well, if you have it.

Yes, like this one (the ... are my application code):

Pistache::Queue::pop() 0x00000000007c6d7a
Pistache::PollableQueue::pop() 0x00000000007d0c7e
Pistache::Queue::popSafe() 0x00000000007c49f9
Pistache::Tcp::Transport::handleWriteQueue(bool) 0x00000000007c0f0b
Pistache::Tcp::Transport::flush() 0x00000000007bf5a5
Pistache::Http::ResponseStream::flush() 0x000000000076afab
   ...

I don't need a solution in pistache immediately, for now I can use the added lock, my application isn't performance-critical.

@dgreatwood
Copy link
Collaborator

Perfect, thanks. I will look for a solution here when I have the chance/time.

dgreatwood added a commit to dgreatwood/pistachefork that referenced this issue Mar 5, 2025
…1290

http.cc
Remove the call to transport_->flush in ResponseStream::flush. The
call was creating corruption in writesQueue as per Issue pistacheio#1290. See
code comment for more details.
@dgreatwood
Copy link
Collaborator

dgreatwood commented Mar 5, 2025

Hi @bkueng,

I made a branch on my personal Pistache fork that attempts to address this issue. I wonder if you could try it out?
git clone -b flushRace --single-branch https://github.com/dgreatwood/pistachefork.git

Alternatively, if it's easier for you to edit your own Pistache source, please just comment out the call transport_->flush(); in ResponseStream::flush() in src/common/http.cc. The comment in my branch explains why I think this is necessary and sufficient. And of course, you'd need to remove the mutex you added, to enable the test.

I'd be grateful to hear back whether the above change fixes the issue in your scenario, and also doesn't cause any other issues.

Thanks in advance!
Duncan
P.S. I don't think my prior idea of bypassing the write queue works, because sometimes when sending we have to give up and add the write back to the write queue because the Fd is not ready for write. So the write queue has to be involved. However, I believe there is no need to call transport_->flush(), which is what's causing the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants