Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Using BatchStatement instead of execute_concurrent_with_args #163

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

EXPEbdodla
Copy link
Collaborator

@EXPEbdodla EXPEbdodla commented Jan 6, 2025

What this PR does / why we need it:

fix: Using BatchStatement instead of execute_concurrent_with_args

Which issue(s) this PR fixes:

  1. execute_concurrent_with_args taking longer to inserts records. Using BatchStatement to write all records specific to an entity_key as a batch. This should avoid the network time. If we group different entity_keys in a single batch, it will run as BatchType.LOGGED mode. Based on the docs, it has performance impact. So using BatchType.UNLOGGED mode and batching on partition key.
  2. Concurrency is managed using Queues
  3. Setting TTL at row level (or during insert) instead of Table level
  4. Using rate limiting to control the writes. Noticed an impact to read performance when the materializations run. With this we can control the write throughput and reduce the impact on reads
  5. Removed Expedia specific spark_kafka_processor.py code because we are upgrading materialization process to Spark 3.5 and also noticed an increased write latency for streaming ingestion tasks using mapInPandas()
  6. Using feature view tags with prefix online_store_ to override online store configurations.

Misc

Comment on lines 505 to 534
futures = []
for batch in batches:
futures.append(session.execute_async(batch))
if len(futures) >= config.online_store.write_concurrency:
# Raises exception if at least one of the batch fails
try:
for future in futures:
future.result()
futures = []
except Exception as exc:
logger.error(f"Error writing a batch: {exc}")
print(f"Error writing a batch: {exc}")
raise Exception("Error writing a batch") from exc

if len(futures) > 0:
try:
for future in futures:
future.result()
futures = []
except Exception as exc:
logger.error(f"Error writing a batch: {exc}")
print(f"Error writing a batch: {exc}")
raise Exception("Error writing a batch") from exc

# execute_concurrent_with_args(
# session,
# insert_cql,
# rows,
# concurrency=config.online_store.write_concurrency,
# )
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This no longer allows for write_concurrency to be set in the feature_store.yaml then, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still using that. Refer Line feast-dev#508

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, missed that. thanks

@EXPEbdodla EXPEbdodla force-pushed the use_batch branch 3 times, most recently from 01e6130 to 075c4c0 Compare January 15, 2025 04:50
@EXPEbdodla EXPEbdodla force-pushed the use_batch branch 2 times, most recently from a0d66f2 to c535a17 Compare February 6, 2025 23:36
@EXPEbdodla EXPEbdodla changed the title fix: Trying BatchStatement instead of execute_concurrent_with_args fix: Using BatchStatement instead of execute_concurrent_with_args Feb 6, 2025
# this happens N-1 times, will be corrected outside:
if progress:
progress(1)
ttl = online_store_config.key_ttl_seconds or 0
Copy link

@zabarn zabarn Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we setting the feature view level TTL now? Or are we only wanting to do feature_store.yaml TTL?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving that logic to materialization process. So we can seamlessly sync with open source feast.

Comment on lines +378 to +381
write_concurrency = online_store_config.write_concurrency
write_rate_limit = online_store_config.write_rate_limit
concurrent_queue: Queue = Queue(maxsize=write_concurrency)
rate_limiter = SlidingWindowRateLimiter(write_rate_limit, 1)
Copy link

@OscarMiranda0 OscarMiranda0 Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, I think we should allow the spark materialization engine to manage the rate limiter. This implementation is a local rate limiter, where the true rate limit is N*write_rate_limit, where N is the number of workers. The objects in the _mapByPartition method are local to the executor as well, so we can't just add the rate limiter there. We've seen concurrent writes be an issue for other online stores as well, so while this is okay for now, I think in the future we should move towards having a centrally managed rate manager (probably the driver).

Copy link
Collaborator Author

@EXPEbdodla EXPEbdodla Feb 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. This should be generic. I'm trying to fix the problem for ScyllaDB at the moment.
For now I mentioned in the config description, rate limiter is per executor for spark materialization engine. Making it generic to all online stores, I'll leave it for future enhancement.

Comment on lines +395 to +413
for entity_key, values, timestamp, created_ts in data:
batch = BatchStatement(batch_type=BatchType.UNLOGGED)
entity_key_bin = serialize_entity_key(
entity_key,
entity_key_serialization_version=config.entity_key_serialization_version,
).hex()
for feature_name, val in values.items():
params: Tuple[str, bytes, str, datetime] = (
feature_name,
val.SerializeToString(),
entity_key_bin,
timestamp,
)
batch.add(insert_cql, params)

# Wait until the rate limiter allows
if not rate_limiter.acquire():
while not rate_limiter.acquire():
time.sleep(0.001)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we're rate limiting the number of batches per second. If we have a specific entity key with a large number of feature values, we could end up with some batches that have very large number of insert statements in them which will cost more to process than a less popular entity key. My assumption would be that we should rate limit the number of insert_cql statements instead, but maybe im making an incorrect assumption somewhere?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limit on number of inserts per batch is 65,536. For now batch contains inserts specific to one entity key so it acts only on one partition. Ideally all entity keys will have same number of features. I don't see this as a risk with the default higher limit per batch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants