-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Using BatchStatement instead of execute_concurrent_with_args #163
base: master
Are you sure you want to change the base?
Conversation
futures = [] | ||
for batch in batches: | ||
futures.append(session.execute_async(batch)) | ||
if len(futures) >= config.online_store.write_concurrency: | ||
# Raises exception if at least one of the batch fails | ||
try: | ||
for future in futures: | ||
future.result() | ||
futures = [] | ||
except Exception as exc: | ||
logger.error(f"Error writing a batch: {exc}") | ||
print(f"Error writing a batch: {exc}") | ||
raise Exception("Error writing a batch") from exc | ||
|
||
if len(futures) > 0: | ||
try: | ||
for future in futures: | ||
future.result() | ||
futures = [] | ||
except Exception as exc: | ||
logger.error(f"Error writing a batch: {exc}") | ||
print(f"Error writing a batch: {exc}") | ||
raise Exception("Error writing a batch") from exc | ||
|
||
# execute_concurrent_with_args( | ||
# session, | ||
# insert_cql, | ||
# rows, | ||
# concurrency=config.online_store.write_concurrency, | ||
# ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This no longer allows for write_concurrency
to be set in the feature_store.yaml
then, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still using that. Refer Line feast-dev#508
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, missed that. thanks
01e6130
to
075c4c0
Compare
a0d66f2
to
c535a17
Compare
# this happens N-1 times, will be corrected outside: | ||
if progress: | ||
progress(1) | ||
ttl = online_store_config.key_ttl_seconds or 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we setting the feature view level TTL now? Or are we only wanting to do feature_store.yaml TTL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving that logic to materialization process. So we can seamlessly sync with open source feast.
fce1ea4
to
b6ff0f9
Compare
write_concurrency = online_store_config.write_concurrency | ||
write_rate_limit = online_store_config.write_rate_limit | ||
concurrent_queue: Queue = Queue(maxsize=write_concurrency) | ||
rate_limiter = SlidingWindowRateLimiter(write_rate_limit, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, I think we should allow the spark materialization engine to manage the rate limiter. This implementation is a local rate limiter, where the true rate limit is N*write_rate_limit, where N is the number of workers. The objects in the _mapByPartition method are local to the executor as well, so we can't just add the rate limiter there. We've seen concurrent writes be an issue for other online stores as well, so while this is okay for now, I think in the future we should move towards having a centrally managed rate manager (probably the driver).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. This should be generic. I'm trying to fix the problem for ScyllaDB at the moment.
For now I mentioned in the config description, rate limiter is per executor for spark materialization engine. Making it generic to all online stores, I'll leave it for future enhancement.
for entity_key, values, timestamp, created_ts in data: | ||
batch = BatchStatement(batch_type=BatchType.UNLOGGED) | ||
entity_key_bin = serialize_entity_key( | ||
entity_key, | ||
entity_key_serialization_version=config.entity_key_serialization_version, | ||
).hex() | ||
for feature_name, val in values.items(): | ||
params: Tuple[str, bytes, str, datetime] = ( | ||
feature_name, | ||
val.SerializeToString(), | ||
entity_key_bin, | ||
timestamp, | ||
) | ||
batch.add(insert_cql, params) | ||
|
||
# Wait until the rate limiter allows | ||
if not rate_limiter.acquire(): | ||
while not rate_limiter.acquire(): | ||
time.sleep(0.001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see we're rate limiting the number of batches per second. If we have a specific entity key with a large number of feature values, we could end up with some batches that have very large number of insert statements in them which will cost more to process than a less popular entity key. My assumption would be that we should rate limit the number of insert_cql statements instead, but maybe im making an incorrect assumption somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Limit on number of inserts per batch is 65,536. For now batch contains inserts specific to one entity key so it acts only on one partition. Ideally all entity keys will have same number of features. I don't see this as a risk with the default higher limit per batch.
What this PR does / why we need it:
fix: Using BatchStatement instead of execute_concurrent_with_args
Which issue(s) this PR fixes:
mapInPandas()
online_store_
to override online store configurations.Misc