-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Reuse online store for materialization writes #166
Conversation
"""Load pandas df to online store""" | ||
for pdf in iterator: | ||
pdf_row_count = pdf.shape[0] | ||
start_time = time.time() | ||
# convert to pyarrow table | ||
if pdf_row_count == 0: | ||
print("INFO!!! Dataframe has 0 records to process") | ||
return | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we break
instead of continue
? If the row count is zero we should stop looping which is essentially what the return
was doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I made this change is because I am not certain that if we encounter an empty pdf that that necessarily implies the remaining pdfs in the partition are also empty. The pull_latest_from_table_or_query method applies a filter within the SQL query in it. Namely, the
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'
filter within the SQL query makes me think it is possible for one of the pdfs to be empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can leave it as return
. May be Empty partitions in spark leads to empty data frames in iterator (when you have less data than the number of partitions).
If you have a data frame with X partitions, mapInPandas converts each partition to a iterator[DataFrame]. Each DataFrame in the iterator has length of 10,000 as default size. If the partition has more than 10K records, then the iterator will help to iterate over each Dataframe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, changed it back.
What this PR does / why we need it:
This PR allows instances of the OnlineStore to be shared from within the spark materialization engine. This means a new connection to the store doesn't need to be created for each write.
Which issue(s) this PR fixes:
Misc