Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Utility for General Purpose Parallelism #4009

Merged
merged 7 commits into from
Jan 9, 2024

Conversation

DavidBakerEffendi
Copy link
Collaborator

@DavidBakerEffendi DavidBakerEffendi commented Dec 22, 2023

Following the type recovery generator parallelism fix, as well as noting that faulty parallelism exists elsewhere such as in the slicing, I've created this general purpose concurrency tool as an easy means to bootstrap effective concurrency for execution order independent tasks.

Following the type recovery generator parallelism fix, as well as noting that faulty parallelism exists elsewhere such as in the slicing, I've created this general purpose concurrency tool as an easy means to bootstrap effect concurrency for execution order independent tasks.
@DavidBakerEffendi DavidBakerEffendi added performance Improves the performance of Joern frontends Relates to the shared x2cpg class labels Dec 22, 2023
@DavidBakerEffendi DavidBakerEffendi self-assigned this Dec 22, 2023
DavidBakerEffendi added a commit that referenced this pull request Dec 22, 2023
Following #4009, I have replaced the parallelism in the slicing with this utility.
@DavidBakerEffendi
Copy link
Collaborator Author

Related: #4010

@max-leuthaeuser
Copy link
Contributor

Please let @bbrehm double check on these two PRs before merging.

Comment on lines 34 to 51
val completionQueue = mutable.ArrayDeque.empty[Future[V]]
val results = mutable.ArrayBuffer.empty[Try[V]]

var done = false
while (!done) {
if (completionQueue.size < maxQueueSize && tasks.hasNext) {
val nextTask = tasks.next()
completionQueue.append(Future.apply(nextTask()))
} else if (completionQueue.nonEmpty) {
val future = completionQueue.removeHead()
val res = Try(Await.result(future, Duration.Inf))
results.append(res)
} else {
done = true
}
}

results.toList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val completionQueue = mutable.ArrayDeque.empty[Future[V]]
val results = mutable.ArrayBuffer.empty[Try[V]]
var done = false
while (!done) {
if (completionQueue.size < maxQueueSize && tasks.hasNext) {
val nextTask = tasks.next()
completionQueue.append(Future.apply(nextTask()))
} else if (completionQueue.nonEmpty) {
val future = completionQueue.removeHead()
val res = Try(Await.result(future, Duration.Inf))
results.append(res)
} else {
done = true
}
}
results.toList
val futures = tasks.map(task => Future(task())).toSeq
// starts them all ^
futures.map { future =>
Try(Await.result(future, Duration.Inf))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this do the same with less noise?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of the queue limits how many tasks are loaded into memory - which is also helped along with the use of the iterator parameter. Not sure, on the other hand, if it makes that much of an impact using this pattern outside of CPG writing to disk.

Copy link
Contributor

@maltek maltek Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can offer this third way of doing things, slightly adapted from one of my joern scripts:

def runInParallel[V](it: Iterator[() => V]): mutable.Buffer[V] = {
    StreamSupport
      .stream(Spliterators.spliteratorUnknownSize(it.asJava, Spliterator.NONNULL), /* parallel */ true)
      .map(_.apply())
      .toList
      .asScala
  }

I haven't benchmarked it though - I just assumed the Java streams are optimized well enough (including lazy reading from the stream/iterator)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess both @maltek and my suggestions ignore the fact that @DavidBakerEffendi wants to run x tasks in parallel - our suggestions both start them all eagerly.

So I guess it's best to just ignore us.
Final thought: doesn't ExecutorService with a fixed threadpool do that as well?
But again, feel free to ignore, we didn't mean to hold you up, it was just a thought.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got back from vacation today, I'll see if I can use @maltek's solution can be specified with a fixed x size as it looks much cleaner and Java's way of doing things are likely more battle-tested.

Yeah, my idea was that I don't want to pollute the memory too much for tasks which may produce unpredictable memory demands. Let me see if I can produce a cleaner variant that addresses this, and maybe reflect the intention of the method in the name.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a ThreadPoolExecutor is what I want, should limit the number of running tasks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes that's what I had in mind re ExecutorService

you get bonus points if you wrap it in a scala.util.Using - that'll ensure that the executorservice is always properly closed, similar to java's try-with-resource

* Refactored utility as `ConcurrentTaskUtil`
* Exposed both a fixed thread pool and spliterator variant
@DavidBakerEffendi
Copy link
Collaborator Author

Given all the feedback, I've added two variants: One with a fixed size pool, and one that uses @maltek's suggestion of a spliterator.

Note that the executor service doesn't play nice with Autocloseable given the shutdown and shutdownNow functions make closing ambiguous, so I've used a try-finally here with a call to shutdown.

If this is still a meaningful utility, we can then merge it and slowly refer to it as a common source of implementing correct concurrency. This is mostly due to observing the pitfalls of my previous shortcomings in implementing concurrent execution, and avoiding this happening to others.

@max-leuthaeuser
Copy link
Contributor

max-leuthaeuser commented Jan 9, 2024

Just a small side note:
These unit tests will fail on machines where Runtime.getRuntime.availableProcessors() returns 1.
This can be the case for virtual environments or local/physical developer machines running on battery or in certain energy savings modes.
(maybe we can tag the test to be run only if we got > 1 available processors)

(As a side story: I actually had this scenario in the university where I wanted to demonstrate some fancy multi-threaded application. But during that my laptop was running on battery. Everything then ran sequentially, which made me look pretty stupid).

@DavidBakerEffendi
Copy link
Collaborator Author

@max-leuthaeuser Good point, I've added checks with the assume call to ensure these tests are ignored if Runtime.getRuntime.availableProcessors() <= 1

@DavidBakerEffendi DavidBakerEffendi merged commit 126f942 into master Jan 9, 2024
5 checks passed
@DavidBakerEffendi DavidBakerEffendi deleted the dave/gen-purpose-parallelism branch January 9, 2024 14:51
DavidBakerEffendi added a commit that referenced this pull request Jan 9, 2024
Following #4009, I have replaced the parallelism in the slicing with this utility.
DavidBakerEffendi added a commit that referenced this pull request Jan 10, 2024
* Utility for General Purpose Parallelism
Following the type recovery generator parallelism fix, as well as noting that faulty parallelism exists elsewhere such as in the slicing, I've created this general purpose concurrency tool as an easy means to bootstrap effect concurrency for execution order independent tasks.

* Allowed execution context to be defined as an implicit arg

* [joern-slice] Fixed Parallelism
Following #4009, I have replaced the parallelism in the slicing with this utility.

* Brought new general purpose concurrent util in

* Increased number of tasks to ensure parallelism better over serial
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
frontends Relates to the shared x2cpg class performance Improves the performance of Joern
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants