Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor WorkerThread runloop; avoid pathological starvation of pollers #4247

Open
wants to merge 5 commits into
base: series/3.6.x
Choose a base branch
from

Conversation

armanbilge
Copy link
Member

Fixes #4228.

At a high level, a WorkerThread is always in one of the following three states:

  1. Working (primarily on the local queue).
  2. Looking for work (external or stolen).
  3. Parked.

Previously, (3) was a separate parkLoop(), but (1) and (2) were entangled in the top-level runloop. Now, (2) is refactored into a separate lookForWork() loop. Thus, the top-level runloop is only responsible for tracking "ticks", such that polling runs every 64 ticks.

worker-runloop-fsm

Comment on lines +355 to +361
def lookForWork(): Unit = {
var state = 0
while (!done.get()) {
(state: @switch) match {
case 0 =>
// Check the external queue after a failed dequeue from the local
// queue (due to the local queue being empty).
Copy link
Member Author

@armanbilge armanbilge Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we could lift case 0 out of the loop. It's always guaranteed to run exactly once, and never again. The loop just transitions between cases 1 and 2.

@armanbilge armanbilge linked an issue Jan 21, 2025 that may be closed by this pull request
@armanbilge
Copy link
Member Author

I think the failures are legit 😕 something must be broken.

@armanbilge
Copy link
Member Author

armanbilge commented Jan 23, 2025

Aha, all the CI failures are on JDK 21, and this is the test it's hanging on.

if (javaMajorVersion >= 21)
"block in-place on virtual threads" in real {
val loomExec = classOf[Executors]
.getDeclaredMethod("newVirtualThreadPerTaskExecutor")
.invoke(null)
.asInstanceOf[ExecutorService]
val loomEc = ExecutionContext.fromExecutor(loomExec)
IO.blocking {
classOf[Thread]
.getDeclaredMethod("isVirtual")
.invoke(Thread.currentThread())
.asInstanceOf[Boolean]
}.evalOn(loomEc)
}

Edit: but ... we are getting this weirdness, on the new test I added in this PR.

java.lang.InterruptedException
  | => tat java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1100)
        at java.base/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:230)
        at cats.effect.IOPlatformSpecification.$anonfun$platformSpecs$184(IOPlatformSpecification.scala:601)
        at cats.effect.unsafe.WorkerThread.lookForWork$1(WorkerThread.scala:391)
        at cats.effect.unsafe.WorkerThread.run(WorkerThread.scala:819)

try {
latch.await() // wait until next task is in external queue
} catch {
case _: InterruptedException => // ignore, runtime is shutting down
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The stray InterruptedException is apparently fatal and was nuking all the runtimes, causing remaining tests to hang. We only noticed this on JDK 21 CI runners because the last test in the suite happens to be JDK 21+ only.

@djspiewak
Copy link
Member

Haven't reviewed the code yet, but I ran some benchmarks to sanity check. Looks like a very slight regression (note the parTraverse results in particular), but considering it fixes a number of fairness issues, I don't consider that to be an impediment.

Before

[info] Benchmark                                             (cpuTokens)   (size)   Mode  Cnt     Score   Error    Units
[info] ParallelBenchmark.parTraverse                               10000     1000  thrpt   10   755.287 ± 1.087    ops/s
[info] ParallelBenchmark.traverse                                  10000     1000  thrpt   10    68.832 ± 0.112    ops/s
[info] WorkStealingBenchmark.alloc                                   N/A  1000000  thrpt   10    11.866 ± 0.089  ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark          N/A  1000000  thrpt   10    24.042 ± 2.950  ops/min
[info] WorkStealingBenchmark.runnableScheduling                      N/A  1000000  thrpt   10  2541.428 ± 5.020  ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal           N/A  1000000  thrpt   10  1874.609 ± 6.710  ops/min
[info] WorkStealingBenchmark.scheduling                              N/A  1000000  thrpt   10    25.641 ± 3.496  ops/min

After

[info] Benchmark                                             (cpuTokens)   (size)   Mode  Cnt     Score   Error    Units
[info] ParallelBenchmark.parTraverse                               10000     1000  thrpt   10   732.683 ± 1.429    ops/s
[info] ParallelBenchmark.traverse                                  10000     1000  thrpt   10    68.975 ± 0.100    ops/s
[info] WorkStealingBenchmark.alloc                                   N/A  1000000  thrpt   10    11.892 ± 0.135  ops/min
[info] WorkStealingBenchmark.manyThreadsSchedulingBenchmark          N/A  1000000  thrpt   10    30.569 ± 3.926  ops/min
[info] WorkStealingBenchmark.runnableScheduling                      N/A  1000000  thrpt   10  2576.645 ± 5.654  ops/min
[info] WorkStealingBenchmark.runnableSchedulingScalaGlobal           N/A  1000000  thrpt   10  1905.347 ± 2.478  ops/min
[info] WorkStealingBenchmark.scheduling                              N/A  1000000  thrpt   10    25.254 ± 4.653  ops/min

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

External queue can starve timers/pollers
2 participants