-
Notifications
You must be signed in to change notification settings - Fork 296
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Utility for General Purpose Parallelism #4009
Conversation
Following the type recovery generator parallelism fix, as well as noting that faulty parallelism exists elsewhere such as in the slicing, I've created this general purpose concurrency tool as an easy means to bootstrap effect concurrency for execution order independent tasks.
Following #4009, I have replaced the parallelism in the slicing with this utility.
Related: #4010 |
Please let @bbrehm double check on these two PRs before merging. |
val completionQueue = mutable.ArrayDeque.empty[Future[V]] | ||
val results = mutable.ArrayBuffer.empty[Try[V]] | ||
|
||
var done = false | ||
while (!done) { | ||
if (completionQueue.size < maxQueueSize && tasks.hasNext) { | ||
val nextTask = tasks.next() | ||
completionQueue.append(Future.apply(nextTask())) | ||
} else if (completionQueue.nonEmpty) { | ||
val future = completionQueue.removeHead() | ||
val res = Try(Await.result(future, Duration.Inf)) | ||
results.append(res) | ||
} else { | ||
done = true | ||
} | ||
} | ||
|
||
results.toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val completionQueue = mutable.ArrayDeque.empty[Future[V]] | |
val results = mutable.ArrayBuffer.empty[Try[V]] | |
var done = false | |
while (!done) { | |
if (completionQueue.size < maxQueueSize && tasks.hasNext) { | |
val nextTask = tasks.next() | |
completionQueue.append(Future.apply(nextTask())) | |
} else if (completionQueue.nonEmpty) { | |
val future = completionQueue.removeHead() | |
val res = Try(Await.result(future, Duration.Inf)) | |
results.append(res) | |
} else { | |
done = true | |
} | |
} | |
results.toList | |
val futures = tasks.map(task => Future(task())).toSeq | |
// starts them all ^ | |
futures.map { future => | |
Try(Await.result(future, Duration.Inf)) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this do the same with less noise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The point of the queue limits how many tasks are loaded into memory - which is also helped along with the use of the iterator parameter. Not sure, on the other hand, if it makes that much of an impact using this pattern outside of CPG writing to disk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can offer this third way of doing things, slightly adapted from one of my joern scripts:
def runInParallel[V](it: Iterator[() => V]): mutable.Buffer[V] = {
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(it.asJava, Spliterator.NONNULL), /* parallel */ true)
.map(_.apply())
.toList
.asScala
}
I haven't benchmarked it though - I just assumed the Java streams are optimized well enough (including lazy reading from the stream/iterator)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess both @maltek and my suggestions ignore the fact that @DavidBakerEffendi wants to run x
tasks in parallel - our suggestions both start them all eagerly.
So I guess it's best to just ignore us.
Final thought: doesn't ExecutorService with a fixed threadpool do that as well?
But again, feel free to ignore, we didn't mean to hold you up, it was just a thought.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got back from vacation today, I'll see if I can use @maltek's solution can be specified with a fixed x
size as it looks much cleaner and Java's way of doing things are likely more battle-tested.
Yeah, my idea was that I don't want to pollute the memory too much for tasks which may produce unpredictable memory demands. Let me see if I can produce a cleaner variant that addresses this, and maybe reflect the intention of the method in the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like a ThreadPoolExecutor is what I want, should limit the number of running tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes that's what I had in mind re ExecutorService
you get bonus points if you wrap it in a scala.util.Using - that'll ensure that the executorservice is always properly closed, similar to java's try-with-resource
* Refactored utility as `ConcurrentTaskUtil` * Exposed both a fixed thread pool and spliterator variant
Given all the feedback, I've added two variants: One with a fixed size pool, and one that uses @maltek's suggestion of a spliterator. Note that the executor service doesn't play nice with If this is still a meaningful utility, we can then merge it and slowly refer to it as a common source of implementing correct concurrency. This is mostly due to observing the pitfalls of my previous shortcomings in implementing concurrent execution, and avoiding this happening to others. |
Just a small side note: (As a side story: I actually had this scenario in the university where I wanted to demonstrate some fancy multi-threaded application. But during that my laptop was running on battery. Everything then ran sequentially, which made me look pretty stupid). |
@max-leuthaeuser Good point, I've added checks with the |
Following #4009, I have replaced the parallelism in the slicing with this utility.
* Utility for General Purpose Parallelism Following the type recovery generator parallelism fix, as well as noting that faulty parallelism exists elsewhere such as in the slicing, I've created this general purpose concurrency tool as an easy means to bootstrap effect concurrency for execution order independent tasks. * Allowed execution context to be defined as an implicit arg * [joern-slice] Fixed Parallelism Following #4009, I have replaced the parallelism in the slicing with this utility. * Brought new general purpose concurrent util in * Increased number of tasks to ensure parallelism better over serial
Following the type recovery generator parallelism fix, as well as noting that faulty parallelism exists elsewhere such as in the slicing, I've created this general purpose concurrency tool as an easy means to bootstrap effective concurrency for execution order independent tasks.