-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduction of timeout in passes #5
Conversation
} | ||
val duration = | ||
if timeout == -1 then Duration.Inf else Duration(stopAt - currentTimeInMs, TimeUnit.MILLISECONDS) | ||
Try(Await.result(future, duration)) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have taken care of handling the overall timeout by honouring the remaining timeout at the thread level as well.
We need to use this place to introduce the thread-level timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really not parallelising the processing.
stream.forEach
creates one Future
(at line no 272) for a given part
and then the current main thread will wait for that future to finish (at line no 279). So In principle, you are processing the part
sequentially. Which is as good as not using the Future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to implement the parallelism you need to iterate over all parts and start all the Futures
and collect them in collection and apply timeout on Seqeunce of Futures.
Refer this code snippet as sample - https://github.com/Privado-Inc/privado-core/blob/main/src/main/scala/ai/privado/utility/ConcurrentProcessor.scala#L94-L103
No description provided.