From d65d19bb57156978b9ea7b203b48a8a9122340df Mon Sep 17 00:00:00 2001 From: David Baker Effendi Date: Fri, 22 Dec 2023 10:41:56 +0200 Subject: [PATCH 1/6] Utility for General Purpose Parallelism Following the type recovery generator parallelism fix, as well as noting that faulty parallelism exists elsewhere such as in the slicing, I've created this general purpose concurrency tool as an easy means to bootstrap effect concurrency for execution order independent tasks. --- .../utils/ConcurrentTaskExecutionUtil.scala | 53 +++++++++++++++++++ .../ConcurrentTaskExecutionUtilTests.scala | 33 ++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala create mode 100644 joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtilTests.scala diff --git a/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala b/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala new file mode 100644 index 000000000000..4c73459d7d22 --- /dev/null +++ b/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala @@ -0,0 +1,53 @@ +package io.joern.x2cpg.utils + +import io.shiftleft.utils.ExecutionContextProvider + +import scala.collection.mutable +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.Try + +/** Following ConcurrentWriterCpgPass, this creates a generic re-usable utility for bootstrapping basic non-CPG pass + * concurrency. + */ +object ConcurrentTaskExecutionUtil { + + private val MAX_QUEUE_SIZE_DEFAULT = 2 + 4 * Runtime.getRuntime.availableProcessors() + private implicit val ec: ExecutionContext = ExecutionContextProvider.getExecutionContext + + /** Uses the parallel queue strategy from [[io.shiftleft.passes.ConcurrentWriterCpgPass]] to offer a generic means of + * executing an iterator of tasks that share an output type in parallel. + * + * @param tasks + * the tasks to parallelize. + * @param maxQueueSize + * the max number of tasks to queue for parallel execution. + * @tparam V + * the output type of each task. + * @return + * an array of the executed tasks as either a success or failure. + * @see + * [[io.shiftleft.passes.ConcurrentWriterCpgPass]] + */ + def runInParallel[V](tasks: Iterator[() => V], maxQueueSize: Int = MAX_QUEUE_SIZE_DEFAULT): List[Try[V]] = { + val completionQueue = mutable.ArrayDeque.empty[Future[V]] + val results = mutable.ArrayBuffer.empty[Try[V]] + + var done = false + while (!done) { + if (completionQueue.size < maxQueueSize && tasks.hasNext) { + val nextTask = tasks.next() + completionQueue.append(Future.apply(nextTask())) + } else if (completionQueue.nonEmpty) { + val future = completionQueue.removeHead() + val res = Try(Await.result(future, Duration.Inf)) + results.append(res) + } else { + done = true + } + } + + results.toList + } + +} diff --git a/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtilTests.scala b/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtilTests.scala new file mode 100644 index 000000000000..bf571644b6d2 --- /dev/null +++ b/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtilTests.scala @@ -0,0 +1,33 @@ +package io.joern.x2cpg.utils + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import scala.util.Success + +class ConcurrentTaskExecutionUtilTests extends AnyWordSpec with Matchers { + + "a large number of 'expensive' operations should perform faster in parallel" in { + def problem = Iterator.fill(100)(() => Thread.sleep(10)) + + val parStart = System.nanoTime() + ConcurrentTaskExecutionUtil.runInParallel(problem) + val parTotal = System.nanoTime() - parStart + + val serStart = System.nanoTime() + problem.foreach(x => x()) + val serTotal = System.nanoTime() - serStart + + parTotal should be < serTotal + } + + "provide the means to let the caller handle unsuccessful operations without propagating an exception" in { + val problem = Iterator(() => "Success!", () => "Success!", () => throw new RuntimeException("Failure!")) + val result = ConcurrentTaskExecutionUtil.runInParallel(problem) + result.count { + case Success(_) => true + case _ => false + } shouldBe 2 + } + +} From 500f0f3ccf9eda4d1a41050b5f8524bc572c7518 Mon Sep 17 00:00:00 2001 From: David Baker Effendi Date: Fri, 22 Dec 2023 12:14:25 +0200 Subject: [PATCH 2/6] Allowed execution context to be defined as an implicit arg --- .../io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala b/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala index 4c73459d7d22..e0b9dad6b2e5 100644 --- a/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala +++ b/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala @@ -12,8 +12,7 @@ import scala.util.Try */ object ConcurrentTaskExecutionUtil { - private val MAX_QUEUE_SIZE_DEFAULT = 2 + 4 * Runtime.getRuntime.availableProcessors() - private implicit val ec: ExecutionContext = ExecutionContextProvider.getExecutionContext + private val MAX_QUEUE_SIZE_DEFAULT = 2 + 4 * Runtime.getRuntime.availableProcessors() /** Uses the parallel queue strategy from [[io.shiftleft.passes.ConcurrentWriterCpgPass]] to offer a generic means of * executing an iterator of tasks that share an output type in parallel. @@ -29,7 +28,9 @@ object ConcurrentTaskExecutionUtil { * @see * [[io.shiftleft.passes.ConcurrentWriterCpgPass]] */ - def runInParallel[V](tasks: Iterator[() => V], maxQueueSize: Int = MAX_QUEUE_SIZE_DEFAULT): List[Try[V]] = { + def runInParallel[V](tasks: Iterator[() => V], maxQueueSize: Int = MAX_QUEUE_SIZE_DEFAULT)(implicit + ec: ExecutionContext = ExecutionContextProvider.getExecutionContext + ): List[Try[V]] = { val completionQueue = mutable.ArrayDeque.empty[Future[V]] val results = mutable.ArrayBuffer.empty[Try[V]] From 471b769514da83efdf87e8ea807b9ac7936fae35 Mon Sep 17 00:00:00 2001 From: David Baker Effendi Date: Tue, 9 Jan 2024 11:01:02 +0200 Subject: [PATCH 3/6] Review notes * Refactored utility as `ConcurrentTaskUtil` * Exposed both a fixed thread pool and spliterator variant --- .../utils/ConcurrentTaskExecutionUtil.scala | 54 ---------------- .../x2cpg/utils/ConcurrentTaskUtil.scala | 62 ++++++++++++++++++ .../ConcurrentTaskExecutionUtilTests.scala | 33 ---------- .../x2cpg/utils/ConcurrentTaskUtilTests.scala | 64 +++++++++++++++++++ 4 files changed, 126 insertions(+), 87 deletions(-) delete mode 100644 joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala create mode 100644 joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskUtil.scala delete mode 100644 joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtilTests.scala create mode 100644 joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala diff --git a/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala b/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala deleted file mode 100644 index e0b9dad6b2e5..000000000000 --- a/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtil.scala +++ /dev/null @@ -1,54 +0,0 @@ -package io.joern.x2cpg.utils - -import io.shiftleft.utils.ExecutionContextProvider - -import scala.collection.mutable -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.util.Try - -/** Following ConcurrentWriterCpgPass, this creates a generic re-usable utility for bootstrapping basic non-CPG pass - * concurrency. - */ -object ConcurrentTaskExecutionUtil { - - private val MAX_QUEUE_SIZE_DEFAULT = 2 + 4 * Runtime.getRuntime.availableProcessors() - - /** Uses the parallel queue strategy from [[io.shiftleft.passes.ConcurrentWriterCpgPass]] to offer a generic means of - * executing an iterator of tasks that share an output type in parallel. - * - * @param tasks - * the tasks to parallelize. - * @param maxQueueSize - * the max number of tasks to queue for parallel execution. - * @tparam V - * the output type of each task. - * @return - * an array of the executed tasks as either a success or failure. - * @see - * [[io.shiftleft.passes.ConcurrentWriterCpgPass]] - */ - def runInParallel[V](tasks: Iterator[() => V], maxQueueSize: Int = MAX_QUEUE_SIZE_DEFAULT)(implicit - ec: ExecutionContext = ExecutionContextProvider.getExecutionContext - ): List[Try[V]] = { - val completionQueue = mutable.ArrayDeque.empty[Future[V]] - val results = mutable.ArrayBuffer.empty[Try[V]] - - var done = false - while (!done) { - if (completionQueue.size < maxQueueSize && tasks.hasNext) { - val nextTask = tasks.next() - completionQueue.append(Future.apply(nextTask())) - } else if (completionQueue.nonEmpty) { - val future = completionQueue.removeHead() - val res = Try(Await.result(future, Duration.Inf)) - results.append(res) - } else { - done = true - } - } - - results.toList - } - -} diff --git a/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskUtil.scala b/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskUtil.scala new file mode 100644 index 000000000000..abcb5408e677 --- /dev/null +++ b/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskUtil.scala @@ -0,0 +1,62 @@ +package io.joern.x2cpg.utils + +import java.util +import java.util.concurrent.{Callable, Executors} +import java.util.stream.{Collectors, StreamSupport} +import java.util.{Collections, Spliterator, Spliterators} +import scala.jdk.CollectionConverters.* +import scala.util.Try + +/** A utility for providing out-of-the-box basic concurrent execution for a collection of Scala functions. + */ +object ConcurrentTaskUtil { + + private val MAX_POOL_SIZE = Runtime.getRuntime.availableProcessors() + + /** Uses a thread pool with a limited number of active threads executing a task at any given point. This is effective + * when tasks may require large amounts of memory, or single tasks are too short lived. + * + * @param tasks + * the tasks to parallelize. + * @param maxPoolSize + * the max number of tasks to queue for parallel execution. + * @tparam V + * the output type of each task. + * @return + * an array of the executed tasks as either a success or failure. + */ + def runUsingThreadPool[V](tasks: Iterator[() => V], maxPoolSize: Int = MAX_POOL_SIZE): List[Try[V]] = { + val ex = Executors.newFixedThreadPool(maxPoolSize) + try { + val callables = Collections.list(tasks.map { x => + new Callable[V] { + override def call(): V = x.apply() + } + }.asJavaEnumeration) + ex.invokeAll(callables).asScala.map(x => Try(x.get())).toList + } finally { + ex.shutdown() + } + } + + /** Uses a Spliterator to run a number of tasks in parallel, where any number of threads may be alive at any point. + * This is useful for running a large number of tasks with low memory consumption. Spliterator's default thread pool + * is ForkJoinPool.commonPool(). + * + * @param tasks + * the tasks to parallelize. + * @tparam V + * the output type of each task. + * @return + * an array of the executed tasks as either a success or failure. + */ + def runUsingSpliterator[V](tasks: Iterator[() => V]): List[Try[V]] = { + StreamSupport + .stream(Spliterators.spliteratorUnknownSize(tasks.asJava, Spliterator.NONNULL), /* parallel */ true) + .map(task => Try(task.apply())) + .collect(Collectors.toList()) + .asScala + .toList + } + +} diff --git a/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtilTests.scala b/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtilTests.scala deleted file mode 100644 index bf571644b6d2..000000000000 --- a/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskExecutionUtilTests.scala +++ /dev/null @@ -1,33 +0,0 @@ -package io.joern.x2cpg.utils - -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpec - -import scala.util.Success - -class ConcurrentTaskExecutionUtilTests extends AnyWordSpec with Matchers { - - "a large number of 'expensive' operations should perform faster in parallel" in { - def problem = Iterator.fill(100)(() => Thread.sleep(10)) - - val parStart = System.nanoTime() - ConcurrentTaskExecutionUtil.runInParallel(problem) - val parTotal = System.nanoTime() - parStart - - val serStart = System.nanoTime() - problem.foreach(x => x()) - val serTotal = System.nanoTime() - serStart - - parTotal should be < serTotal - } - - "provide the means to let the caller handle unsuccessful operations without propagating an exception" in { - val problem = Iterator(() => "Success!", () => "Success!", () => throw new RuntimeException("Failure!")) - val result = ConcurrentTaskExecutionUtil.runInParallel(problem) - result.count { - case Success(_) => true - case _ => false - } shouldBe 2 - } - -} diff --git a/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala b/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala new file mode 100644 index 000000000000..c5de632704f3 --- /dev/null +++ b/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala @@ -0,0 +1,64 @@ +package io.joern.x2cpg.utils + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import scala.util.Success + +class ConcurrentTaskUtilTests extends AnyWordSpec with Matchers { + + "compared to serial execution, concurrent execution" should { + + "perform better against a large number of 'expensive' operations using a spliterator" in { + def problem = Iterator.fill(500)(() => Thread.sleep(10)) + + val parStart = System.nanoTime() + ConcurrentTaskUtil.runUsingSpliterator(problem) + val parTotal = System.nanoTime() - parStart + + val serStart = System.nanoTime() + problem.foreach(x => x()) + val serTotal = System.nanoTime() - serStart + + parTotal should be < serTotal + } + + "perform better against a large number of 'cheap' operations using a thread pool" in { + def problem = Iterator.fill(500)(() => Thread.sleep(1)) + + val parStart = System.nanoTime() + ConcurrentTaskUtil.runUsingThreadPool(problem) + val parTotal = System.nanoTime() - parStart + + val serStart = System.nanoTime() + problem.foreach(x => x()) + val serTotal = System.nanoTime() - serStart + + parTotal should be < serTotal + } + } + + "a large number of operations should not perform faster in a thread pool when compared to a spliterator" in { + def problem = Iterator.fill(1000)(() => Thread.sleep(1)) + + val threadPoolStart = System.nanoTime() + ConcurrentTaskUtil.runUsingThreadPool(problem) + val threadPoolTotal = System.nanoTime() - threadPoolStart + + val spliteratorStart = System.nanoTime() + ConcurrentTaskUtil.runUsingSpliterator(problem) + val spliteratorTotal = System.nanoTime() - spliteratorStart + + threadPoolTotal should be > spliteratorTotal + } + + "provide the means to let the caller handle unsuccessful operations without propagating an exception" in { + val problem = Iterator(() => "Success!", () => "Success!", () => throw new RuntimeException("Failure!")) + val result = ConcurrentTaskUtil.runUsingThreadPool(problem) + result.count { + case Success(_) => true + case _ => false + } shouldBe 2 + } + +} From d323245659595928fe3e2729920cdbc7d8b32448 Mon Sep 17 00:00:00 2001 From: David Baker Effendi Date: Tue, 9 Jan 2024 11:11:50 +0200 Subject: [PATCH 4/6] Scaladoc comment --- .../main/scala/io/joern/x2cpg/utils/ConcurrentTaskUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskUtil.scala b/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskUtil.scala index abcb5408e677..97303cd67d96 100644 --- a/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskUtil.scala +++ b/joern-cli/frontends/x2cpg/src/main/scala/io/joern/x2cpg/utils/ConcurrentTaskUtil.scala @@ -19,7 +19,7 @@ object ConcurrentTaskUtil { * @param tasks * the tasks to parallelize. * @param maxPoolSize - * the max number of tasks to queue for parallel execution. + * the max pool size to allow for active threads. * @tparam V * the output type of each task. * @return From 9a3ab572c946bb1cf63540820a4ff559e6f745f4 Mon Sep 17 00:00:00 2001 From: David Baker Effendi Date: Tue, 9 Jan 2024 11:31:28 +0200 Subject: [PATCH 5/6] Removed parallel comparison as it's too flaky --- .../x2cpg/utils/ConcurrentTaskUtilTests.scala | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala b/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala index c5de632704f3..422edf70847b 100644 --- a/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala +++ b/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala @@ -38,20 +38,6 @@ class ConcurrentTaskUtilTests extends AnyWordSpec with Matchers { } } - "a large number of operations should not perform faster in a thread pool when compared to a spliterator" in { - def problem = Iterator.fill(1000)(() => Thread.sleep(1)) - - val threadPoolStart = System.nanoTime() - ConcurrentTaskUtil.runUsingThreadPool(problem) - val threadPoolTotal = System.nanoTime() - threadPoolStart - - val spliteratorStart = System.nanoTime() - ConcurrentTaskUtil.runUsingSpliterator(problem) - val spliteratorTotal = System.nanoTime() - spliteratorStart - - threadPoolTotal should be > spliteratorTotal - } - "provide the means to let the caller handle unsuccessful operations without propagating an exception" in { val problem = Iterator(() => "Success!", () => "Success!", () => throw new RuntimeException("Failure!")) val result = ConcurrentTaskUtil.runUsingThreadPool(problem) From 024189c9215cc4b37322a2b0ac494901688b28f6 Mon Sep 17 00:00:00 2001 From: David Baker Effendi Date: Tue, 9 Jan 2024 16:06:00 +0200 Subject: [PATCH 6/6] Checking for more than 1 processor before running tests --- .../io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala b/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala index 422edf70847b..8000878288c6 100644 --- a/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala +++ b/joern-cli/frontends/x2cpg/src/test/scala/io/joern/x2cpg/utils/ConcurrentTaskUtilTests.scala @@ -1,5 +1,6 @@ package io.joern.x2cpg.utils +import org.scalatest.Assertions import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -7,9 +8,13 @@ import scala.util.Success class ConcurrentTaskUtilTests extends AnyWordSpec with Matchers { + private def assumeMultipleProcessors = + assume(Runtime.getRuntime.availableProcessors() > 1, "!!! Number of available processors not larger than 1 !!!") + "compared to serial execution, concurrent execution" should { "perform better against a large number of 'expensive' operations using a spliterator" in { + assumeMultipleProcessors def problem = Iterator.fill(500)(() => Thread.sleep(10)) val parStart = System.nanoTime() @@ -24,6 +29,7 @@ class ConcurrentTaskUtilTests extends AnyWordSpec with Matchers { } "perform better against a large number of 'cheap' operations using a thread pool" in { + assumeMultipleProcessors def problem = Iterator.fill(500)(() => Thread.sleep(1)) val parStart = System.nanoTime()