Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Reuse online store for materialization writes #166

Merged
merged 2 commits into from
Jan 31, 2025

Conversation

omirandadev
Copy link
Collaborator

What this PR does / why we need it:

This PR allows instances of the OnlineStore to be shared from within the spark materialization engine. This means a new connection to the store doesn't need to be created for each write.

Which issue(s) this PR fixes:

Misc

"""Load pandas df to online store"""
for pdf in iterator:
pdf_row_count = pdf.shape[0]
start_time = time.time()
# convert to pyarrow table
if pdf_row_count == 0:
print("INFO!!! Dataframe has 0 records to process")
return
continue
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we break instead of continue? If the row count is zero we should stop looping which is essentially what the return was doing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I made this change is because I am not certain that if we encounter an empty pdf that that necessarily implies the remaining pdfs in the partition are also empty. The pull_latest_from_table_or_query method applies a filter within the SQL query in it. Namely, the
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}' filter within the SQL query makes me think it is possible for one of the pdfs to be empty.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave it as return. May be Empty partitions in spark leads to empty data frames in iterator (when you have less data than the number of partitions).

If you have a data frame with X partitions, mapInPandas converts each partition to a iterator[DataFrame]. Each DataFrame in the iterator has length of 10,000 as default size. If the partition has more than 10K records, then the iterator will help to iterate over each Dataframe.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, changed it back.

@omirandadev omirandadev requested a review from piket January 30, 2025 22:54
@omirandadev omirandadev merged commit b5f14f2 into master Jan 31, 2025
23 checks passed
@omirandadev omirandadev deleted the reuse_connections branch January 31, 2025 20:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants