Skip to content

[DISCUSSION] [object_store] New crate with object store combinators / utilitles #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
alamb opened this issue Mar 8, 2025 · 13 comments
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Mar 8, 2025

Please describe what you are trying to do.

TLDR: let's combine forces rather than all reimplementing caching / chunking / etc in object_store!

The ObjectStore trait is flexible and it is common to compose a stack of ObjectStore with one wrapping underlying stores

For example, the ThrottledStore and LimitStore provided with the object store crate does exactly this

┌──────────────────────────────┐
│        ThrottledStore        │
│(adds user configured delays) │
└──────────────────────────────┘
                ▲               
                │               
                │               
┌──────────────────────────────┐
│      Inner ObjectStore       │
│   (for example, AmazonS3)    │
└──────────────────────────────┘

Many Different Behaviors

There are many types of behaviors that can be implemented this way. Some examples I am aware of:

  1. The ThrottledStore and LimitStore provided with the object store crate
  2. Runs on a different tokio runtime (such as the DeltaIOStorageBackend in delta rs from @ion-elgreco.
  3. Limit the total size of any individual request (e.g. the LimitedRequestSizeObjectStore from Timeouts reading "large" files from object stores over "slow" connections datafusion#15067)
  4. Break single large requests into multiple concurrent small requests ("chunking") - @crepererum is working on this I think in influx
  5. Caches results of requests locally using memory / disk (see ObjectStoreMemCache in influxdb3_core), and this one in slatedb @criccomini (thanks @ion-elgreco for the pointer)
  6. Collect statistics / traces and report metrics (see ObjectStoreMetrics in influxdb3_core)
  7. Visualization of object store requests over time

Desired behavior is varied and application specific

Also, depending on the needs of the particular app, the ideal behavior / policy is likely different.

For example,

  1. In the case of Timeouts reading "large" files from object stores over "slow" connections datafusion#15067, splitting one large request into several small requests made in series is likely the desired approach (maximize the chance they succeed)
  2. If you are trying to maximize read bandwidth in a cloud server setting, splitting up ("Chunking") large requests into several parallel ones may be desired
  3. If you are trying to minimize costs (for example doing bulk reorganizations / compactions on historical data that are not latency sensitive), using a single request for large objects (what is done today) might be desired
  4. Maybe you want to adapt more dynamically to network and object store conditions as described in Exploiting Cloud Object Storage for High-Performance Analytics

So the point is that I don't think any one individual policy will work for all use cases (though we can certainly discuss changing the default policy)

Since ObjectStore is already composable, I already see projects implementing these types of things independently (for example, delta-rs and influxdb_iox both have a cross runtime object stores, and @mildbyte from splitgraph implemented some sort of visualization of object store requests over time)

I believe this is similar to the OpenDAL concept of layers but @Xuanwo please correct me if I am wrong

Desired Solution

I would like it ti be easier for users of object_store to access such features without having implement custom wrappers in parallel independently

Alternatives

New object_store_util crate

One alternative is to make a new crate, namedobject_store_util or similar mirroring futures-util and tokio-util that has a bunch of these ObjectStore combinators

This could be housed outside of the apache organization, but I think it would be most valuable for the community if it was inside

Add additional policies to provided implmenetations

An alternate is to implement a more sophisticated default implementations (for example, add more options to the AmazonS3 implementation.

One upside of this approach is it could take advantage of implementation specific features

One downside is additional code and configuration complexity, especially as the different strategies are all applicable to multiple stores (e.g. GCP, S3 and Azure). Another downside is specifying the policy might be complex (like specifying concurrency along with chunking and under what circumstances should each be used)

Additional context

@tustvold
Copy link
Contributor

tustvold commented Mar 8, 2025

Thank you for starting this discussion, I think we should definitely provide more utilities/primitives in this space.

The ThrottledStore and LimitStore provided with the object store crate

FWIW these should probably be deprecated and re-implemented at the HttpClient level.

Collect statistics / traces and report metrics (see ObjectStoreMetrics in influxdb3_core)
Runs on a different tokio runtime (such as the DeltaIOStorageBackend in delta rs from @ion-elgreco.
Collect statistics / traces and report metrics (see ObjectStoreMetrics in influxdb3_core)
Visualization of object store requests over time

Now we have the HttpClient abstraction, I think this is the level I would encourage implementing most of these.

Limit the total size of any individual request (e.g. the LimitedRequestSizeObjectStore from
apache/datafusion#15067)
Break single large requests into multiple concurrent small requests ("chunking") - @crepererum is working on this I think in influx
Limit the total size of any individual request (e.g. the LimitedRequestSizeObjectStore from apache/datafusion#15067)

This feels like something better built into some sort of TransferManager that sits on top of the ObjectStore API, as opposed to baking it in at the ObjectStore level. Perhaps in a similar vein to BufWriter.

This would, for example, allow registering a single ObjectStore, but then having different IO configurations for different areas of the stack. It would also potentially allow for greater concurrency, as the ObjectStore API has no mechanism by which chunks fetched in parallel could be returned out of order. This would be especially useful when downloading files to disk, as it avoids needing to hold chunks in memory unnecessarily.

See #267 for some prior discussion.

Add additional policies to provided implementations

FWIW all the first-party implementations share a lot of the same underlying logic, e.g. with things like GetClient, and so it may actually not be all that bad

@alamb
Copy link
Contributor Author

alamb commented Mar 8, 2025

This feels like something better built into some sort of TransferManager that sits on top of the ObjectStore API, as opposed to baking it in at the ObjectStore level. Perhaps in a similar vein to BufWriter.

I think there is room for both some lower level ObjectStore wrappers as well as more full featured transfer manager or higher abstraction depending on the needs, and resources of the underlying application

@tustvold
Copy link
Contributor

tustvold commented Mar 8, 2025

I've created apache/arrow-rs#7253 as an example of how the HttpClient abstraction can be used for more fine-grained control of requests, including spawning IO to a separate tokio runtime.

@flaneur2020
Copy link

flaneur2020 commented Mar 9, 2025

I believe this is similar to the OpenDAL concept of layers but @Xuanwo please correct me if I am wrong

i suppose this might be also related with the "Operator" concept in OpenDAL, which can help handling the chunking & concurrency parameter in a builder pattern like this:

    let s = op
        .reader_with("hello.txt")
        .concurrent(8)
        .chunk(256)
        .await?
        .into_stream(1024..2048)
        .await?;

Layer is a bit lower level than this Operator imo, we can wrap cache & metris to s3 operations like read, write, while chunking & concurrency is handled in the Operator level 🤔.

@alamb
Copy link
Contributor Author

alamb commented Mar 18, 2025

@crepererum and I spoke about this issue today.

In the case of apache/datafusion#15067, splitting one large request into several small requests made in series is likely the desired approach (maximize the chance they succeed)

@crepererum rightly pointed out that implementing retries (aka #15) would be better than splitting into smaller requests to make a timeout as the retry mechanism automatically adjusts to current network conditions

However, otherwise we have a few more potential items we may propose upstreaming

  • RacingReads (reduce latency of first (and complete) fetch by running multiple requests in parallel)
  • Chunking (reduce latency of completing requests on time)
  • MemoryCache (cache in memory, but handle teeing responses, implement streaming logic)
  • DiskCache (takes advantage of io_uring)
  • Testing framework

Roughtly speaking what we are thinking is:

@alamb
Copy link
Contributor Author

alamb commented Mar 18, 2025

@criccomini I am curious if you would have a use for RacingReads. This basically would reduce the overall latency for object store requests by running multiple requests in parallel and returning the one that completed first. The tradeoff is that this strategy increases $$$ linearly as it makes more requests

@ryzhyk
Copy link

ryzhyk commented Mar 19, 2025

@crepererum rightly pointed out that implementing retries (aka #7242) would be better than splitting into smaller requests to make a timeout as the retry mechanism automatically adjusts to current network conditions

Isn't there an upper bound on the timeout (30s by default)? And if the bound isn't large enough to push that 200MiB row group through a slow connection, won't the request fail anyway? And even if the request succeeds eventually, relying on retries to dynamically adjust the timeout seems wasteful compared to bounding request size, improving the chances the request will succeed the first time.

@alamb
Copy link
Contributor Author

alamb commented Mar 19, 2025

@crepererum rightly pointed out that implementing retries (aka #7242) would be better than splitting into smaller requests to make a timeout as the retry mechanism automatically adjusts to current network conditions

Isn't there an upper bound on the timeout (30s by default)? And if the bound isn't large enough to push that 200MiB row group through a slow connection, won't the request fail anyway?

I think the idea is you don't re-request the entire object, only bytes remaining

So let's say you had a 200 MB request but the network can only retrieve 10MB in 30s

  • The first request would fetch the first 10MB but timeout
  • Then the retry would request the remaining 190MB
  • The second request would fetch the second 10MB and timeout
  • Then the retry would request the remaining 180MB
  • .. and so on

I agree this is not clear -- I will post the same on #15

@ryzhyk
Copy link

ryzhyk commented Mar 19, 2025

That makes a lot of sense, thanks for clarifying! So this means that the same data won't get fetched multiple times, which is nice. Does the user still need to configure large enough retry_timeout and max number of retries or those bounds won't apply in this scenario where every retry fetches some data?

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2025

Does the user still need to configure large enough retry_timeout and max number of retries or those bounds won't apply in this scenario where every retry fetches some data?

I am not sure yet -- it will depend on how the feature is implemented. It is interesting to think about what to do when the process is making (very) slow progress.

@alamb
Copy link
Contributor Author

alamb commented Mar 20, 2025

Migrating from arrow-rs issue #7251

@criccomini
Copy link
Contributor

@criccomini I am curious if you would have a use for RacingReads. This basically would reduce the overall latency for object store requests by running multiple requests in parallel and returning the one that completed first. The tradeoff is that this strategy increases $$$ linearly as it makes more requests

This is a nice to have for us. It's certainly crossed my mind, but we haven't implemented it yet. In some cases, I suspect SlateDB users will want low latency at all costs. In other cases, cost is the main thing. :)

@alamb
Copy link
Contributor Author

alamb commented Apr 17, 2025

Chunked Reads as requested in this ticket is similar

ryzhyk pushed a commit to feldera/feldera that referenced this issue Apr 18, 2025
Pipelines with many delta connectors hit timeout errors, likely due to
apache/arrow-rs-object-store#14

Until that is fixed, we introduce a mechanism to restrict the number of
concurrent readers across all delta connectors.

From the docs:

--

Maximum number of concurrent object store reads performed by all Delta
Lake connectors.

This setting is used to limit the number of concurrent reads of the
object store in a pipeline with a large number of Delta Lake connectors.
When multiple connectors are simultaneously reading from the object
store, this can lead to transport timeouts.

When enabled, this setting limits the number of concurrent reads across
all connectors. This is a global setting that affects all Delta Lake
connectors, and not just the connector where it is specified. It should
therefore be used at most once in a pipeline.  If multiple connectors
specify this setting, they must all use the same value.

The default value is 6.

Signed-off-by: Leonid Ryzhyk <[email protected]>
github-merge-queue bot pushed a commit to feldera/feldera that referenced this issue Apr 19, 2025
Pipelines with many delta connectors hit timeout errors, likely due to
apache/arrow-rs-object-store#14

Until that is fixed, we introduce a mechanism to restrict the number of
concurrent readers across all delta connectors.

From the docs:

--

Maximum number of concurrent object store reads performed by all Delta
Lake connectors.

This setting is used to limit the number of concurrent reads of the
object store in a pipeline with a large number of Delta Lake connectors.
When multiple connectors are simultaneously reading from the object
store, this can lead to transport timeouts.

When enabled, this setting limits the number of concurrent reads across
all connectors. This is a global setting that affects all Delta Lake
connectors, and not just the connector where it is specified. It should
therefore be used at most once in a pipeline.  If multiple connectors
specify this setting, they must all use the same value.

The default value is 6.

Signed-off-by: Leonid Ryzhyk <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

5 participants