Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ratio stats for build speed increase #521

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from

Conversation

wagnerlmichael
Copy link
Member

@wagnerlmichael wagnerlmichael commented Jun 25, 2024

Summary

This is not a final PR for review, but a progress update to determine next steps.

Currently all of the assesspy functions used in this script are copied in. If we were to move forward with this solution, they would need to be refactored in the actual package rather than copy/pasted and changed in this script.

Currently with these changes the build time for reporting.ratio_stats table is ~15 minutes, a large improvement over the previous ~1 hour. All of this speed up came from editing the boot_ci function. I'm not sure how much speed we could get from editing the other functions. Changing sampling from pandas to numpy index sampling contributed to about ~10% of the speed up whereas parallel processing contributed to ~90% of the speed up.

Dev table here: "z_ci_436_refactor_ratio_stats_job_to_use_pyspark_reporting"."ratio_stats"

Other strategies tried

Spark

I tried for a while with different spark strategies. First I attempted to convert the data frame to a spark data frame and sample on that, but that didn't work. It was extremely slow, I'm assuming this was the case due to computationally intensive transformations from pandas to spark to pandas.

I tried to get around this issue by using a pandas udf. Supposedly, this allows the spark api to operate on the pandas data frame in a columnar format, maintaining speed increases from distributed processing. This also resulted in much longer build times or errors I couldn't work through.

I also tried a single pandas df conversion to spark, and then edit the remaining data structures in boot_ci so that they were all spark compatible, I also could not get this speed up working.

I am new to spark, so it is very possible I missed something obvious or there are remaining workable solutions.

Numba and Dask

I tried basic numba parallelization and Dask parallelization, but neither were able to be imported in properly. I because this is because they both have C bindings and Athena doesn't allow for this with third-party package additions.

concurrent.futures

I tried using this built-in python func but the parallelization was failing due to a pickling error, I switched to multiprocessing and that finally worked.

Considerations on current strategy

If we were to move forward with this solution, we would need to decide how to reconcile the changed boot_ci function with the assesspy build. One option is to edit the package itself and include a boolean param that turns parallel processing on/off. Another option is too just keep the copy pasted functions in this script, but that creates two sources of truth for the assesspy functions which isn't ideal.

One potential upside of not using spark is that we can potentially maintain these functions in assesspy rather than building out an entirely new set of spark assesspy functions.

Other ways forward

We could also continue to develop here. Two other paths forward for me could be:

  • Try to spend more time figuring out spark
  • Try further non-spark speed up in other functions

@wagnerlmichael wagnerlmichael linked an issue Jun 25, 2024 that may be closed by this pull request
Comment on lines 159 to 176
def bootstrap_worker(
data_array, fun, num_kwargs, n, nboot, start, end, result_queue
):
ests = []
for _ in range(start, end):
sample_indices = np.random.choice(
data_array.shape[0], size=n, replace=True
)
sample_array = data_array[sample_indices]
if fun.__name__ == "cod" or num_kwargs == 1:
ests.append(fun(sample_array[:, 0]))
elif fun.__name__ == "prd":
ests.append(fun(sample_array[:, 0], sample_array[:, 1]))
else:
raise Exception(
"Input function should require 1 argument or be assesspy.prd." # noqa
)
result_queue.put(ests)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function sets up for our parallel processing, it is the unit of work that a single core will be doing. It randomly samples just like the prior code.

Comment on lines 163 to 166
for _ in range(start, end):
sample_indices = np.random.choice(
data_array.shape[0], size=n, replace=True
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We substitute the old pandas sampling for a faster np.random.choice() sampling.

Comment on lines 178 to 189
def parallel_bootstrap(
data_array, fun, num_kwargs, n, nboot, num_processes=4
):
processes = []
result_queue = mp.Queue()
chunk_size = nboot // num_processes

for i in range(num_processes):
start = i * chunk_size
end = start + chunk_size if i < num_processes - 1 else nboot
p = mp.Process(
target=bootstrap_worker,
Copy link
Member Author

@wagnerlmichael wagnerlmichael Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function allocates the size of each process, and divides the ests bootstrap calculation into n_processes. This conditional if i < num_processes - 1 else nboot handles the case in which the nboot number isn't cleanly divisble by num_processes

Comment on lines 204 to 211
results = []
for _ in range(num_processes):
results.extend(result_queue.get())

for p in processes:
p.join()

return results
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grabs the data from all the processes and combines them at the end

result_queue.put(ests)

def parallel_bootstrap(
data_array, fun, num_kwargs, n, nboot, num_processes=4
Copy link
Member Author

@wagnerlmichael wagnerlmichael Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_processes=4 is the optimal number of cores here. 3 to 4 is a big speed increase but 4, 8, and 16 all give similar times. I'm guessing this is because of the data transfer bottleneck between cores.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question, non-blocking] I'm guessing you checked this, but do we know for sure that the machine that this was tested on had more than 4 cores available?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this in person and there were only 4 cores available, but currently aws only allows a single 4 core DPU for processing. If they change this we could probably get much faster speeds with more cores.

Copy link
Contributor

@jeancochrane jeancochrane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice work! It's a bummer to hear that the Spark code either didn't work or wasn't faster, but I don't have enough Spark experience to advise on a path forward at this point. Maybe it would make sense to take another crack at it in the future, but in the meantime I like the improvements you've made here, and I'm on board with the plan to make these changes to the assesspy package.

My recommended path forward would be to recreate these changes in a branch of assesspy, bundle and push the code from that branch to S3 as a .zip file, and then test it out by updating the sc.addPyFile() call in this model definition to point to the new version of the package. Then once we get the assesspy branch merged and released, we can update the ratio_stats model to depend on the new version. Does that sound reasonable to everyone else?

result_queue.put(ests)

def parallel_bootstrap(
data_array, fun, num_kwargs, n, nboot, num_processes=4
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question, non-blocking] I'm guessing you checked this, but do we know for sure that the machine that this was tested on had more than 4 cores available?

@dfsnow
Copy link
Member

dfsnow commented Jul 2, 2024

@wagnerlmichael I mocked up a working set of functions using only Spark-compatible abstractions and dropped the result in a gist. The result seems to be pretty fast, at least on the limited subset of columns I calculated.

You should be able to drop that code into an Athena Spark notebook and run it without modification. It runs the COD stat calculations at the township_code level in 1m45s. I didn't mock up the other functions but it should be fairly straightforward to build out from here.

Can you take another crack at this when you get some free time, building off the linked gist? I'm happy to walk through what I did in the functions/how I figured them out. If the gist is stuff you've already tried then let me know and I'll think about another way forward.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor ratio_stats job to use Pyspark
3 participants