Replies: 4 comments 1 reply
-
Perhaps @metesynnada or @ozankabak have some thoughts in this area or can help |
Beta Was this translation helpful? Give feedback.
-
Yes, we are building a streaming system using Datafusion -- happy to discuss. However, it is holiday time for our team right now, so we will circle back to you towards the end of the month 🙂 |
Beta Was this translation helpful? Give feedback.
-
@dadepo /// Updates the DeltaTable to the most recent state committed to the transaction log.
#[cfg(not(any(feature = "parquet", feature = "parquet2")))]
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
self.update_incremental(None).await
} In spark, the delta libs automatically fetch the latest and greatest for you IIRC, but maybe that's not the case for In any case, even after calling There's a dedicated @houqp , if you're still involved in these, any opinions? |
Beta Was this translation helpful? Give feedback.
-
Although I don't have extensive knowledge about Taking a page from Kafka's playbook, it maintains group id offsets to keep track of data. In an ideal design, the source should be capable of determining what data will come next. There's |
Beta Was this translation helpful? Give feedback.
-
This is from from ASF slack: https://the-asf.slack.com/archives/C01QUFS30TD/p1686751525575309 from a conversation between @dadepo and myself
Dade
Where the data that needs to be queried/transformed is supposed to be a stream
I stumble on this https://github.com/datafusion-contrib/datafusion-streams but looks like that is mostly an abandoned effort…
Andrew Lamb
I would recommend looking at "infinite" streams as in https://docs.rs/datafusion/latest/datafusion/datasource/streaming/struct.StreamingTable.html
I think the basics are there, though there is definitely more work to be done, in the realm of documentation and probable features
Dade
Got a couple of question:
Will I be be correct to say that the end result of this is RecordBatchStream, which is essentially a stream of RecordBatch?
What would be the general approach if I want to make use of a delta lake table for instance as the streaming source?
Does using this means keeping track of some state? For example to allow DataFusion keep track of the part of the stream it has already processed? (edited)
Andrew Lamb
I don't think DataFusion can track state of what has been processed at the moment -- that would have to be done out of line.
Dade
Specifically trying to create my own StreamingTable that read data as streams from a delta lake table.
I have a Java process that write the “rate” data (this is a test data source) as stream to a delta table.
I confirm I can read this as stream using Scala from the console
This continually reads the data and writes it out to the console.
So my first question, to see if I can read this delta table as a stream, I first tried reading it normally. Then introduced sleep, and then read it again. Something like
I noticed that both reads gives the same data despite the 30 seconds delay, but If I stop the process and run it again, it gives fresh data. Any reason why this is the case?
I see the core of having the StreamingTable seems to be the execute function on the PartitionStream trait…and looking for an example implementation I found
https://github.com/apache/arrow-datafusion/blob/01eb72af4ccdc911ba3cfe22e41f2d71389c5eb9/datafusion/core/tests/memory_limit.rs#L259
Which is basically returning the record batch as a stream.
If I am going to provide an implementation of this for a delta table I open manually, that means continually execute the query that fetch the data from the table? For starters that does not seem to even work based on my previous message…plus it feels adhoc?
Is there any feature that can be used for example to only start reading from when the last batch ends?
Will I have to do these kind of bookkeeping manually?
memory_limit.rs
Or perhaps I am going going by this with the wrong design/approach?
Only way I could get a new batch is to basically load and register the table afresh
Still have doubts with this. Does not feel like most optimal. And how can I guarantee the next read starts from the where the last read ended?
Beta Was this translation helpful? Give feedback.
All reactions