-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor ratio stats for build speed increase #521
base: master
Are you sure you want to change the base?
Refactor ratio stats for build speed increase #521
Conversation
def bootstrap_worker( | ||
data_array, fun, num_kwargs, n, nboot, start, end, result_queue | ||
): | ||
ests = [] | ||
for _ in range(start, end): | ||
sample_indices = np.random.choice( | ||
data_array.shape[0], size=n, replace=True | ||
) | ||
sample_array = data_array[sample_indices] | ||
if fun.__name__ == "cod" or num_kwargs == 1: | ||
ests.append(fun(sample_array[:, 0])) | ||
elif fun.__name__ == "prd": | ||
ests.append(fun(sample_array[:, 0], sample_array[:, 1])) | ||
else: | ||
raise Exception( | ||
"Input function should require 1 argument or be assesspy.prd." # noqa | ||
) | ||
result_queue.put(ests) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function sets up for our parallel processing, it is the unit of work that a single core will be doing. It randomly samples just like the prior code.
for _ in range(start, end): | ||
sample_indices = np.random.choice( | ||
data_array.shape[0], size=n, replace=True | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We substitute the old pandas sampling for a faster np.random.choice()
sampling.
def parallel_bootstrap( | ||
data_array, fun, num_kwargs, n, nboot, num_processes=4 | ||
): | ||
processes = [] | ||
result_queue = mp.Queue() | ||
chunk_size = nboot // num_processes | ||
|
||
for i in range(num_processes): | ||
start = i * chunk_size | ||
end = start + chunk_size if i < num_processes - 1 else nboot | ||
p = mp.Process( | ||
target=bootstrap_worker, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function allocates the size of each process, and divides the ests
bootstrap calculation into n_processes
. This conditional if i < num_processes - 1 else nboot
handles the case in which the nboot
number isn't cleanly divisble by num_processes
results = [] | ||
for _ in range(num_processes): | ||
results.extend(result_queue.get()) | ||
|
||
for p in processes: | ||
p.join() | ||
|
||
return results |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Grabs the data from all the processes and combines them at the end
result_queue.put(ests) | ||
|
||
def parallel_bootstrap( | ||
data_array, fun, num_kwargs, n, nboot, num_processes=4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
num_processes=4
is the optimal number of cores here. 3 to 4 is a big speed increase but 4, 8, and 16 all give similar times. I'm guessing this is because of the data transfer bottleneck between cores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Question, non-blocking] I'm guessing you checked this, but do we know for sure that the machine that this was tested on had more than 4 cores available?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed this in person and there were only 4 cores available, but currently aws only allows a single 4 core DPU for processing. If they change this we could probably get much faster speeds with more cores.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice work! It's a bummer to hear that the Spark code either didn't work or wasn't faster, but I don't have enough Spark experience to advise on a path forward at this point. Maybe it would make sense to take another crack at it in the future, but in the meantime I like the improvements you've made here, and I'm on board with the plan to make these changes to the assesspy package.
My recommended path forward would be to recreate these changes in a branch of assesspy, bundle and push the code from that branch to S3 as a .zip file, and then test it out by updating the sc.addPyFile()
call in this model definition to point to the new version of the package. Then once we get the assesspy branch merged and released, we can update the ratio_stats
model to depend on the new version. Does that sound reasonable to everyone else?
result_queue.put(ests) | ||
|
||
def parallel_bootstrap( | ||
data_array, fun, num_kwargs, n, nboot, num_processes=4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Question, non-blocking] I'm guessing you checked this, but do we know for sure that the machine that this was tested on had more than 4 cores available?
@wagnerlmichael I mocked up a working set of functions using only Spark-compatible abstractions and dropped the result in a gist. The result seems to be pretty fast, at least on the limited subset of columns I calculated. You should be able to drop that code into an Athena Spark notebook and run it without modification. It runs the COD stat calculations at the Can you take another crack at this when you get some free time, building off the linked gist? I'm happy to walk through what I did in the functions/how I figured them out. If the gist is stuff you've already tried then let me know and I'll think about another way forward. |
Summary
This is not a final PR for review, but a progress update to determine next steps.
Currently all of the assesspy functions used in this script are copied in. If we were to move forward with this solution, they would need to be refactored in the actual package rather than copy/pasted and changed in this script.
Currently with these changes the build time for
reporting.ratio_stats
table is ~15 minutes, a large improvement over the previous ~1 hour. All of this speed up came from editing theboot_ci
function. I'm not sure how much speed we could get from editing the other functions. Changing sampling from pandas to numpy index sampling contributed to about ~10% of the speed up whereas parallel processing contributed to ~90% of the speed up.Dev table here:
"z_ci_436_refactor_ratio_stats_job_to_use_pyspark_reporting"."ratio_stats"
Other strategies tried
Spark
I tried for a while with different spark strategies. First I attempted to convert the data frame to a spark data frame and sample on that, but that didn't work. It was extremely slow, I'm assuming this was the case due to computationally intensive transformations from pandas to spark to pandas.
I tried to get around this issue by using a pandas udf. Supposedly, this allows the spark api to operate on the pandas data frame in a columnar format, maintaining speed increases from distributed processing. This also resulted in much longer build times or errors I couldn't work through.
I also tried a single pandas df conversion to spark, and then edit the remaining data structures in
boot_ci
so that they were all spark compatible, I also could not get this speed up working.I am new to spark, so it is very possible I missed something obvious or there are remaining workable solutions.
Numba and Dask
I tried basic numba parallelization and Dask parallelization, but neither were able to be imported in properly. I because this is because they both have C bindings and Athena doesn't allow for this with third-party package additions.
concurrent.futures
I tried using this built-in python func but the parallelization was failing due to a pickling error, I switched to
multiprocessing
and that finally worked.Considerations on current strategy
If we were to move forward with this solution, we would need to decide how to reconcile the changed
boot_ci
function with the assesspy build. One option is to edit the package itself and include a boolean param that turns parallel processing on/off. Another option is too just keep the copy pasted functions in this script, but that creates two sources of truth for the assesspy functions which isn't ideal.One potential upside of not using spark is that we can potentially maintain these functions in assesspy rather than building out an entirely new set of spark assesspy functions.
Other ways forward
We could also continue to develop here. Two other paths forward for me could be: