From 513185051061adb97d72f71f85b495c331dda090 Mon Sep 17 00:00:00 2001 From: Abdullahsab3 Date: Sun, 29 Dec 2024 11:22:19 +0100 Subject: [PATCH] Rust API documentation Signed-off-by: Abdullahsab3 --- docs/src/rust/check_constraints.rs | 10 +- docs/src/rust/read_cdf.rs | 30 +- .../appending-overwriting-delta-lake-table.md | 72 +++- docs/usage/create-delta-lake-table.md | 47 ++ .../deleting-rows-from-delta-lake-table.md | 28 +- docs/usage/examining-table.md | 168 +++++--- docs/usage/loading-table.md | 111 +++-- docs/usage/managing-tables.md | 39 +- docs/usage/merging-tables.md | 400 +++++++++++++----- docs/usage/optimize/delta-lake-z-order.md | 20 +- .../small-file-compaction-with-optimize.md | 334 ++++++++++++--- docs/usage/querying-delta-tables.md | 19 + docs/usage/read-cdf.md | 2 + 13 files changed, 966 insertions(+), 314 deletions(-) diff --git a/docs/src/rust/check_constraints.rs b/docs/src/rust/check_constraints.rs index fbc2cf18d6..cd3e88b95f 100644 --- a/docs/src/rust/check_constraints.rs +++ b/docs/src/rust/check_constraints.rs @@ -6,17 +6,19 @@ async fn main() -> Result<(), Box> { // --8<-- [start:add_constraint] let table = deltalake::open_table("../rust/tests/data/simple_table").await?; let ops = DeltaOps(table); - ops.with_constraint("id_gt_0", "id > 0").await?; + ops.add_constraint().with_constraint("id_gt_0", "id > 0").await?; // --8<-- [end:add_constraint] // --8<-- [start:add_data] - let table = deltalake::open_table("../rust/tests/data/simple_table").await?; - let schema = table.get_state().arrow_schema()?; + let mut table = deltalake::open_table("../rust/tests/data/simple_table").await?; + let schema = table.snapshot()?.arrow_schema()?; let invalid_values: Vec> = vec![ Arc::new(Int32Array::from(vec![-10])) ]; let batch = RecordBatch::try_new(schema, invalid_values)?; - table.write(vec![batch]).await?; + let mut writer = RecordBatchWriter::for_table(&table)?; + writer.write(batch).await?; + writer.flush_and_commit(&mut table).await?; // --8<-- [end:add_data] Ok(()) diff --git a/docs/src/rust/read_cdf.rs b/docs/src/rust/read_cdf.rs index 04de31e2bc..392ffa3835 100644 --- a/docs/src/rust/read_cdf.rs +++ b/docs/src/rust/read_cdf.rs @@ -1,15 +1,37 @@ #[tokio::main] async fn main() -> Result<(), Box> { - let table = deltalake::open_table("../rust/tests/data/cdf-table").await?; + let table = deltalake::open_table("tmp/some-table").await?; + let ctx = SessionContext::new(); let ops = DeltaOps(table); - let cdf = ops.load_cdf() + let cdf = ops + .load_cdf() .with_starting_version(0) .with_ending_version(4) .build() .await?; - arrow_cast::pretty::print_batches(&cdf)?; + let batches = collect_batches( + cdf.properties().output_partitioning().partition_count(), + &cdf, + ctx, + ).await?; + arrow_cast::pretty::print_batches(&batches)?; + Ok(()) -} \ No newline at end of file +} + +async fn collect_batches( + num_partitions: usize, + stream: &impl ExecutionPlan, + ctx: SessionContext, +) -> Result, Box> { + let mut batches = vec![]; + for p in 0..num_partitions { + let data: Vec = + collect_sendable_stream(stream.execute(p, ctx.task_ctx())?).await?; + batches.extend_from_slice(&data); + } + Ok(batches) +} diff --git a/docs/usage/appending-overwriting-delta-lake-table.md b/docs/usage/appending-overwriting-delta-lake-table.md index 397edb9d0d..68d00cdfcb 100644 --- a/docs/usage/appending-overwriting-delta-lake-table.md +++ b/docs/usage/appending-overwriting-delta-lake-table.md @@ -18,12 +18,30 @@ Suppose you have a Delta table with the following contents: Append two additional rows of data to the table: -```python -from deltalake import write_deltalake, DeltaTable - -df = pd.DataFrame({"num": [8, 9], "letter": ["dd", "ee"]}) -write_deltalake("tmp/some-table", df, mode="append") -``` +=== "Python" + + ```python + from deltalake import write_deltalake, DeltaTable + + df = pd.DataFrame({"num": [8, 9], "letter": ["dd", "ee"]}) + write_deltalake("tmp/some-table", df, mode="append") + ``` + +=== "Rust" + ```rust + let table = open_table("tmp/some-table").await?; + DeltaOps(table).write(RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("num", DataType::Int32, false), + Field::new("letter", DataType::Utf8, false), + ])), + vec![ + Arc::new(Int32Array::from(vec![8, 9])), + Arc::new(StringArray::from(vec![ + "dd", "ee" + ])), + ])).with_save_mode(SaveMode::Append).await?; + ``` Here are the updated contents of the Delta table: @@ -44,12 +62,27 @@ Now let's see how to perform an overwrite transaction. ## Delta Lake overwrite transactions Now let's see how to overwrite the exisitng Delta table. - -```python -df = pd.DataFrame({"num": [11, 22], "letter": ["aa", "bb"]}) -write_deltalake("tmp/some-table", df, mode="overwrite") -``` - +=== "Python" + ```python + df = pd.DataFrame({"num": [11, 22], "letter": ["aa", "bb"]}) + write_deltalake("tmp/some-table", df, mode="overwrite") + ``` + +=== "Rust" + ```rust + let table = open_table("tmp/some-table").await?; + DeltaOps(table).write(RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("num", DataType::Int32, false), + Field::new("letter", DataType::Utf8, false), + ])), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec![ + "a", "b", "c", + ])), + ])).with_save_mode(SaveMode::Overwrite).await?; + ``` Here are the contents of the Delta table after the overwrite operation: ``` @@ -63,9 +96,20 @@ Here are the contents of the Delta table after the overwrite operation: Overwriting just performs a logical delete. It doesn't physically remove the previous data from storage. Time travel back to the previous version to confirm that the old version of the table is still accessable. -```python -dt = DeltaTable("tmp/some-table", version=1) +=== "Python" + ```python + dt = DeltaTable("tmp/some-table", version=1) + ``` + +=== "Rust" + ```rust + let mut table = open_table("tmp/some-table").await?; + table.load_version(1).await?; + ``` + + +``` +-------+----------+ | num | letter | |-------+----------| diff --git a/docs/usage/create-delta-lake-table.md b/docs/usage/create-delta-lake-table.md index 2d76ac4cf7..5ab06ee2c8 100644 --- a/docs/usage/create-delta-lake-table.md +++ b/docs/usage/create-delta-lake-table.md @@ -24,6 +24,53 @@ You can easily write a DataFrame to a Delta table. df.write_delta("tmp/some-table") ``` +=== "Rust" + + ```rust + let delta_ops = DeltaOps::try_from_uri("tmp/some-table").await?; + let mut table = delta_ops + .create() + .with_table_name("some-table") + .with_save_mode(SaveMode::Overwrite) + .with_columns( + StructType::new(vec![ + StructField::new( + "num".to_string(), + DataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "letter".to_string(), + DataType::Primitive(PrimitiveType::String), + true, + ), + ]) + .fields() + .cloned(), + ) + .await?; + + let mut record_batch_writer = + deltalake::writer::RecordBatchWriter::for_table(&mut table)?; + record_batch_writer + .write( + RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("num", DataType::Int32, true), + Field::new("letter", Utf8, true), + ])), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec![ + "a", "b", "c", + ])), + ], + )?, + ) + .await?; + record_batch_writer.flush_and_commit(&mut table).await?; + ``` + Here are the contents of the Delta table in storage: ``` diff --git a/docs/usage/deleting-rows-from-delta-lake-table.md b/docs/usage/deleting-rows-from-delta-lake-table.md index 6c852100e6..3fc2ec7a5b 100644 --- a/docs/usage/deleting-rows-from-delta-lake-table.md +++ b/docs/usage/deleting-rows-from-delta-lake-table.md @@ -17,11 +17,29 @@ Suppose you have the following Delta table with four rows: Here's how to delete all the rows where the `num` is greater than 2: -```python -dt = DeltaTable("tmp/my-table") -dt.delete("num > 2") -``` - +=== "Python" + + ```python + dt = DeltaTable("tmp/my-table") + dt.delete("num > 2") + ``` + +=== "Rust" + ```rust + let table = deltalake::open_table("./data/simple_table").await?; + let (table, delete_metrics) = DeltaOps(table) + .delete() + .with_predicate(col("num").gt(lit(2))) + .await?; + ``` + `with_predicate` expects an argument that can be translated to a Datafusion `Expression`. This can be either using the Dataframe API, or using a `SQL where` clause: + ```rust + let table = deltalake::open_table("./data/simple_table").await?; + let (table, delete_metrics) = DeltaOps(table) + .delete() + .with_predicate("num > 2") + .await?; + ``` Here are the contents of the Delta table after the delete operation has been performed: ``` diff --git a/docs/usage/examining-table.md b/docs/usage/examining-table.md index dbe5e2c585..631d8a8ef8 100644 --- a/docs/usage/examining-table.md +++ b/docs/usage/examining-table.md @@ -16,12 +16,20 @@ The delta log maintains basic metadata about a table, including: Get metadata from a table with the [DeltaTable.metadata()][deltalake.table.DeltaTable.metadata] method: -``` python ->>> from deltalake import DeltaTable ->>> dt = DeltaTable("../rust/tests/data/simple_table") ->>> dt.metadata() -Metadata(id: 5fba94ed-9794-4965-ba6e-6ee3c0d22af9, name: None, description: None, partitionColumns: [], created_time: 1587968585495, configuration={}) -``` +=== "Python" + ``` python + >>> from deltalake import DeltaTable + >>> dt = DeltaTable("../rust/tests/data/simple_table") + >>> dt.metadata() + Metadata(id: 5fba94ed-9794-4965-ba6e-6ee3c0d22af9, name: None, description: None, partitionColumns: [], created_time: 1587968585495, configuration={}) + ``` +=== "Rust" + ```rust + let table = deltalake::open_table("../rust/tests/data/simple_table").await?; + let metadata = table.metadata()?; + println!("metadata: {:?}", metadata); + ``` + ## Schema @@ -32,30 +40,50 @@ PyArrow schema. The first allows you to introspect any column-level metadata stored in the schema, while the latter represents the schema the table will be loaded into. -Use [DeltaTable.schema][deltalake.table.DeltaTable.schema] to retrieve the delta lake schema: - -``` python ->>> from deltalake import DeltaTable ->>> dt = DeltaTable("../rust/tests/data/simple_table") ->>> dt.schema() -Schema([Field(id, PrimitiveType("long"), nullable=True)]) -``` - -These schemas have a JSON representation that can be retrieved. To -reconstruct from json, use -[DeltaTable.schema.to_json()][deltalake.schema.Schema.to_json]. -``` python ->>> dt.schema().to_json() -'{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}' -``` -Use [DeltaTable.schema.to_pyarrow()][deltalake.schema.Schema.to_pyarrow] to retrieve the PyArrow schema: - -``` python ->>> dt.schema().to_pyarrow() -id: int64 -``` +=== "Python" + Use [DeltaTable.schema][deltalake.table.DeltaTable.schema] to retrieve the delta lake schema: + ``` python + >>> from deltalake import DeltaTable + >>> dt = DeltaTable("../rust/tests/data/simple_table") + >>> dt.schema() + Schema([Field(id, PrimitiveType("long"), nullable=True)]) + ``` +=== "Rust" + Use `DeltaTable::get_schema` to retrieve the delta lake schema + ```rust + let table = deltalake::open_table("./data/simple_table").await?; + let schema = table.get_schema()?; + println!("schema: {:?}", schema); + ``` +These schemas have a JSON representation that can be retrieved. + +=== "Python" + To reconstruct from json, use [DeltaTable.schema.to_json()][deltalake.schema.Schema.to_json]. + ``` python + >>> dt.schema().to_json() + '{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}' + ``` +=== "Rust" + Use `serde_json` to get the schema as a json. + ```rust + println!("{}", serde_json::to_string_pretty(&schema)?); + ``` +It is also possible to retrieve the Arrow schema: +=== "Python" + + Use [DeltaTable.schema.to_pyarrow()][deltalake.schema.Schema.to_pyarrow] to retrieve the PyArrow schema: + + ``` python + >>> dt.schema().to_pyarrow() + id: int64 + ``` +=== "Rust" + ```rust + let arrow_schema = table.snapshot()?.arrow_schema()?; + println!("arrow_schema: {:?}", schema); + ``` ## History @@ -73,22 +101,27 @@ default, unless otherwise specified by the table configuration To view the available history, use `DeltaTable.history`: - -``` python -from deltalake import DeltaTable - -dt = DeltaTable("../rust/tests/data/simple_table") -dt.history() -``` - -``` -[{'timestamp': 1587968626537, 'operation': 'DELETE', 'operationParameters': {'predicate': '["((`id` % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]'}, 'readVersion': 3, 'isBlindAppend': False}, - {'timestamp': 1587968614187, 'operation': 'UPDATE', 'operationParameters': {'predicate': '((id#697L % cast(2 as bigint)) = cast(0 as bigint))'}, 'readVersion': 2, 'isBlindAppend': False}, - {'timestamp': 1587968604143, 'operation': 'WRITE', 'operationParameters': {'mode': 'Overwrite', 'partitionBy': '[]'}, 'readVersion': 1, 'isBlindAppend': False}, - {'timestamp': 1587968596254, 'operation': 'MERGE', 'operationParameters': {'predicate': '(oldData.`id` = newData.`id`)'}, 'readVersion': 0, 'isBlindAppend': False}, - {'timestamp': 1587968586154, 'operation': 'WRITE', 'operationParameters': {'mode': 'ErrorIfExists', 'partitionBy': '[]'}, 'isBlindAppend': True}] -``` - +=== "Python" + ``` python + from deltalake import DeltaTable + + dt = DeltaTable("../rust/tests/data/simple_table") + dt.history() + ``` + + ``` + [{'timestamp': 1587968626537, 'operation': 'DELETE', 'operationParameters': {'predicate': '["((`id` % CAST(2 AS BIGINT)) = CAST(0 AS BIGINT))"]'}, 'readVersion': 3, 'isBlindAppend': False}, + {'timestamp': 1587968614187, 'operation': 'UPDATE', 'operationParameters': {'predicate': '((id#697L % cast(2 as bigint)) = cast(0 as bigint))'}, 'readVersion': 2, 'isBlindAppend': False}, + {'timestamp': 1587968604143, 'operation': 'WRITE', 'operationParameters': {'mode': 'Overwrite', 'partitionBy': '[]'}, 'readVersion': 1, 'isBlindAppend': False}, + {'timestamp': 1587968596254, 'operation': 'MERGE', 'operationParameters': {'predicate': '(oldData.`id` = newData.`id`)'}, 'readVersion': 0, 'isBlindAppend': False}, + {'timestamp': 1587968586154, 'operation': 'WRITE', 'operationParameters': {'mode': 'ErrorIfExists', 'partitionBy': '[]'}, 'isBlindAppend': True}] + ``` +=== "Rust" + ```rust + let table = deltalake::open_table("../rust/tests/data/simple_table").await?; + let history = table.history(None).await?; + println!("Table history: {:#?}", history); + ``` ## Current Add Actions The active state for a delta table is determined by the Add actions, @@ -96,21 +129,36 @@ which provide the list of files that are part of the table and metadata about them, such as creation time, size, and statistics. You can get a data frame of the add actions data using `DeltaTable.get_add_actions`: -``` python ->>> from deltalake import DeltaTable ->>> dt = DeltaTable("../rust/tests/data/delta-0.8.0") ->>> dt.get_add_actions(flatten=True).to_pandas() - path size_bytes modification_time data_change num_records null_count.value min.value max.value -0 part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a... 440 2021-03-06 15:16:07 True 2 0 0 2 -1 part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe... 440 2021-03-06 15:16:16 True 2 0 2 4 -``` - +=== "Python" + ``` python + >>> from deltalake import DeltaTable + >>> dt = DeltaTable("../rust/tests/data/delta-0.8.0") + >>> dt.get_add_actions(flatten=True).to_pandas() + path size_bytes modification_time data_change num_records null_count.value min.value max.value + 0 part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a... 440 2021-03-06 15:16:07 True 2 0 0 2 + 1 part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe... 440 2021-03-06 15:16:16 True 2 0 2 4 + ``` + +=== "Rust" + ```rust + let table = deltalake::open_table("./data/simple_table").await?; + let actions = table.snapshot()?.add_actions_table(true)?; + println!("{}", pretty_format_batches(&vec![actions])?); + ``` This works even with past versions of the table: -``` python ->>> dt = DeltaTable("../rust/tests/data/delta-0.8.0", version=0) ->>> dt.get_add_actions(flatten=True).to_pandas() - path size_bytes modification_time data_change num_records null_count.value min.value max.value -0 part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a... 440 2021-03-06 15:16:07 True 2 0 0 2 -1 part-00001-911a94a2-43f6-4acb-8620-5e68c265498... 445 2021-03-06 15:16:07 True 3 0 2 4 -``` \ No newline at end of file +=== "Python" + ``` python + >>> dt = DeltaTable("../rust/tests/data/delta-0.8.0", version=0) + >>> dt.get_add_actions(flatten=True).to_pandas() + path size_bytes modification_time data_change num_records null_count.value min.value max.value + 0 part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a... 440 2021-03-06 15:16:07 True 2 0 0 2 + 1 part-00001-911a94a2-43f6-4acb-8620-5e68c265498... 445 2021-03-06 15:16:07 True 3 0 2 4 + ``` +=== "Rust" + ```rust + let mut table = deltalake::open_table("./data/simple_table").await?; + table.load_version(0).await?; + let actions = table.snapshot()?.add_actions_table(true)?; + println!("{}", pretty_format_batches(&vec![actions])?); + ``` \ No newline at end of file diff --git a/docs/usage/loading-table.md b/docs/usage/loading-table.md index 6c99bf7f74..478ed2391a 100644 --- a/docs/usage/loading-table.md +++ b/docs/usage/loading-table.md @@ -16,10 +16,20 @@ options](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfig [gcs options](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants). -```python ->>> storage_options = {"AWS_ACCESS_KEY_ID": "THE_AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY":"THE_AWS_SECRET_ACCESS_KEY"} ->>> dt = DeltaTable("../rust/tests/data/delta-0.2.0", storage_options=storage_options) -``` +=== "Python" + ```python + >>> storage_options = {"AWS_ACCESS_KEY_ID": "THE_AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY":"THE_AWS_SECRET_ACCESS_KEY"} + >>> dt = DeltaTable("../rust/tests/data/delta-0.2.0", storage_options=storage_options) + ``` + +=== "Rust" + ```rust + let storage_options = HashMap::from_iter(vec![ + ("AWS_ACCESS_KEY_ID".to_string(), "THE_AWS_ACCESS_KEY_ID".to_string()), + ("AWS_SECRET_ACCESS_KEY".to_string(), "THE_AWS_SECRET_ACCESS_KEY".to_string()), + ]); + let table = open_table_with_storage_options("../rust/tests/data/delta-0.2.0", storage_options).await?; + ``` The configuration can also be provided via the environment, and the basic service provider is derived from the URL being used. We try to @@ -64,26 +74,51 @@ For AWS Glue catalog, use AWS environment variables to authenticate. You can check whether or not a Delta table exists at a particular path by using the `DeltaTable.is_deltatable()` method. -```python -from deltalake import DeltaTable - -table_path = "" -DeltaTable.is_deltatable(table_path) -# True - -invalid_table_path = "" -DeltaTable.is_deltatable(invalid_table_path) -# False - -bucket_table_path = "" -storage_options = { - "AWS_ACCESS_KEY_ID": "THE_AWS_ACCESS_KEY_ID", - "AWS_SECRET_ACCESS_KEY": "THE_AWS_SECRET_ACCESS_KEY", - ... -} -DeltaTable.is_deltatable(bucket_table_path, storage_options) -# True -``` +=== "Python" + + ```python + from deltalake import DeltaTable + + table_path = "" + DeltaTable.is_deltatable(table_path) + # True + + invalid_table_path = "" + DeltaTable.is_deltatable(invalid_table_path) + # False + + bucket_table_path = "" + storage_options = { + "AWS_ACCESS_KEY_ID": "THE_AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY": "THE_AWS_SECRET_ACCESS_KEY", + ... + } + DeltaTable.is_deltatable(bucket_table_path, storage_options) + # True + ``` + +=== "Rust" + + ```rust + let table_path = ""; + let builder = deltalake::DeltaTableBuilder::from_uri(table_path); + builder.build()?.verify_deltatable_existence().await?; + // true + + let invalid_table_path = ""; + let builder = deltalake::DeltaTableBuilder::from_uri(invalid_table_path); + builder.build()?.verify_deltatable_existence().await?; + // false + + let bucket_table_path = ""; + let storage_options = HashMap::from_iter(vec![ + ("AWS_ACCESS_KEY_ID".to_string(), "THE_AWS_ACCESS_KEY_ID".to_string()), + ("AWS_SECRET_ACCESS_KEY".to_string(), "THE_AWS_SECRET_ACCESS_KEY".to_string()), + ]); + let builder = deltalake::DeltaTableBuilder::from_uri(bucket_table_path).with_storage_options(storage_options); + builder.build()?.verify_deltatable_existence().await?; + // true + ``` ## Custom Storage Backends @@ -127,18 +162,30 @@ ds = dt.to_pyarrow_dataset(filesystem=filesystem) To load previous table states, you can provide the version number you wish to load: -```python ->>> dt = DeltaTable("../rust/tests/data/simple_table", version=2) -``` +=== "Python" + ```python + >>> dt = DeltaTable("../rust/tests/data/simple_table", version=2) + ``` +=== "Rust" + ```rust + let mut table = open_table("./data/simple_table").await?; + table.load_version(1).await?; + ``` + Once you've loaded a table, you can also change versions using either a version number or datetime string: -```python ->>> dt.load_version(1) ->>> dt.load_with_datetime("2021-11-04 00:05:23.283+00:00") -``` - +=== "Python" + ```python + >>> dt.load_version(1) + >>> dt.load_with_datetime("2021-11-04 00:05:23.283+00:00") + ``` +=== "Rust" + ```rust + table.load_version(1).await?; + table.load_with_datetime("2021-11-04 00:05:23.283+00:00".parse().unwrap()).await?; + ``` !!! warning Previous table versions may not exist if they have been vacuumed, in diff --git a/docs/usage/managing-tables.md b/docs/usage/managing-tables.md index b3ab3540e1..9d465ee63e 100644 --- a/docs/usage/managing-tables.md +++ b/docs/usage/managing-tables.md @@ -14,15 +14,24 @@ to. Use `DeltaTable.vacuum` to perform the vacuum operation. Note that to prevent accidental deletion, the function performs a dry-run by default: it will only list the files to be deleted. Pass `dry_run=False` to actually delete files. -``` python ->>> dt = DeltaTable("../rust/tests/data/simple_table") ->>> dt.vacuum() -['../rust/tests/data/simple_table/part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet', - '../rust/tests/data/simple_table/part-00190-8ac0ae67-fb1d-461d-a3d3-8dc112766ff5-c000.snappy.parquet', - '../rust/tests/data/simple_table/part-00164-bf40481c-4afd-4c02-befa-90f056c2d77a-c000.snappy.parquet', - ...] ->>> dt.vacuum(dry_run=False) # Don't run this unless you are sure! -``` +=== "Python" + ``` python + >>> dt = DeltaTable("../rust/tests/data/simple_table") + >>> dt.vacuum() + ['../rust/tests/data/simple_table/part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet', + '../rust/tests/data/simple_table/part-00190-8ac0ae67-fb1d-461d-a3d3-8dc112766ff5-c000.snappy.parquet', + '../rust/tests/data/simple_table/part-00164-bf40481c-4afd-4c02-befa-90f056c2d77a-c000.snappy.parquet', + ...] + >>> dt.vacuum(dry_run=False) # Don't run this unless you are sure! + ``` +=== "Rust" + ```rust + let mut table = open_table("./data/simple_table").await?; + let (table, vacuum_metrics) = DeltaOps(table).vacuum().with_dry_run(true).await?; + println!("Files deleted: {:?}", vacuum_metrics.files_deleted); + + let (table, vacuum_metrics) = DeltaOps(table).vacuum().with_dry_run(false).await?; + ``` ## Optimizing tables @@ -34,3 +43,15 @@ A table `dt = DeltaTable(...)` has two methods for optimizing it: - `dt.optimize.z_order()` to compact and apply Z Ordering. See the section [Small file compaction](./optimize/small-file-compaction-with-optimize.md) for more information and a detailed example on `compact`, and the section [Z Order](./optimize/delta-lake-z-order.md) for more information on `z_order`. + +=== "Python" + ```python + dt = DeltaTable(...) + dt.optimize.compact() + ``` + +=== "Rust" + ```rust + let mut table = open_table("./data/simple_table").await?; + let (table, metrics) = DeltaOps(table).optimize().with_type(OptimizeType::Compact).await?; + ``` diff --git a/docs/usage/merging-tables.md b/docs/usage/merging-tables.md index 0e105caa92..d00209a0fa 100644 --- a/docs/usage/merging-tables.md +++ b/docs/usage/merging-tables.md @@ -41,41 +41,105 @@ You can define the rules for the update using the `updates` keyword. If a `predi For example, let’s update the value of a column in the target table based on a matching row in the source table. -```python -from deltalake import DeltaTable, write_deltalake -import pyarrow as pa - -# define target table -> target_data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) -> write_deltalake("tmp_table", target_data) -> dt = DeltaTable("tmp_table") -> dt.to_pandas().sort_values("x", ignore_index=True) - - x y -0 1 4 -1 2 5 -2 3 6 - -# define source table -> source_data = pa.table({"x": [2, 3], "y": [5,8]}) -> source_data - - x y -0 2 5 -1 3 8 - -# define merge logic -> ( -> dt.merge( -> source=source_data, -> predicate="target.x = source.x", -> source_alias="source", -> target_alias="target") -> .when_matched_update( -> updates={"x": "source.x", "y":"source.y"}) -> .execute() -> ) -``` +=== "Python" + ```python + from deltalake import DeltaTable, write_deltalake + import pyarrow as pa + + # define target table + > target_data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + > write_deltalake("tmp_table", target_data) + > dt = DeltaTable("tmp_table") + > dt.to_pandas().sort_values("x", ignore_index=True) + + x y + 0 1 4 + 1 2 5 + 2 3 6 + + # define source table + > source_data = pa.table({"x": [2, 3], "y": [5,8]}) + > source_data + + x y + 0 2 5 + 1 3 8 + + # define merge logic + > ( + > dt.merge( + > source=source_data, + > predicate="target.x = source.x", + > source_alias="source", + > target_alias="target") + > .when_matched_update( + > updates={"x": "source.x", "y":"source.y"}) + > .execute() + > ) + ``` + +=== "Rust" + ```rust + // define target table + let delta_ops = DeltaOps::try_from_uri("tmp/some-table").await?; + let mut table = delta_ops + .create() + .with_table_name("some-table") + .with_save_mode(SaveMode::Overwrite) + .with_columns( + StructType::new(vec![ + StructField::new( + "x".to_string(), + DataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "y".to_string(), + DataType::Primitive(PrimitiveType::Integer), + true, + ), + ]) + .fields() + .cloned(), + ) + .await?; + + let schema = Arc::new(Schema::new(vec![ + Field::new("x", arrow::datatypes::DataType::Int32, true), + Field::new("y", arrow::datatypes::DataType::Int32, true), + ])); + let mut record_batch_writer = deltalake::writer::RecordBatchWriter::for_table(&mut table)?; + record_batch_writer + .write(RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + ], + )?) + .await?; + + record_batch_writer.flush_and_commit(&mut table).await?; + + let ctx = SessionContext::new(); + let source_data = ctx.read_batch(RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![2, 3])), + Arc::new(Int32Array::from(vec![5, 6])), + ], + )?)?; + + DeltaOps(table) + .merge(source_data, "target.x = source.x") + .with_source_alias("source") + .with_target_alias("target") + .when_matched_update(|update| + update + .update("x", "source.x") + .update("y", "source.y"))? + .await?; + ``` First, we match rows for which the `x` values are the same using `predicate="target.x = source.x"`. We then update the `x` and `y` values of the matched row with the new (source) values using `updates={"x": "source.x", "y":"source.y"}`. @@ -99,51 +163,119 @@ Use [`when_not_matched_insert`][deltalake.table.TableMerger.when_not_matched_ins For example, let’s say we start with the same target table: -```python -> target_data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) -> write_deltalake("tmp_table", target_data) -> dt = DeltaTable("tmp_table") - - x y -0 1 4 -1 2 5 -2 3 6 -``` +=== "Python" + ```python + > target_data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + > write_deltalake("tmp_table", target_data) + > dt = DeltaTable("tmp_table") + + x y + 0 1 4 + 1 2 5 + 2 3 6 + ``` +=== "Rust" + ```rust + let delta_ops = DeltaOps::try_from_uri("./data/simple_table").await?; + let mut table = delta_ops + .create() + .with_table_name("some-table") + .with_save_mode(SaveMode::Overwrite) + .with_columns( + StructType::new(vec![ + StructField::new( + "x".to_string(), + DataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "y".to_string(), + DataType::Primitive(PrimitiveType::Integer), + true, + ), + ]) + .fields() + .cloned(), + ) + .await?; + + let schema = Arc::new(Schema::new(vec![ + Field::new("x", arrow::datatypes::DataType::Int32, true), + Field::new("y", arrow::datatypes::DataType::Int32, true), + ])); + let mut record_batch_writer = deltalake::writer::RecordBatchWriter::for_table(&mut table)?; + record_batch_writer + .write(RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + ], + )?) + .await?; + + record_batch_writer.flush_and_commit(&mut table).await?; + ``` And we want to merge only new records from our source data, without duplication: -```python -> source_data = pa.table({"x": [2,3,7], "y": [4,5,8]}) +=== "Python" + ```python + > source_data = pa.table({"x": [2,3,7], "y": [4,5,8]}) + + x y + 0 2 5 + 1 3 6 + 2 7 8 + ``` +=== "Rust" + ```rust + let ctx = SessionContext::new(); + let source_data = ctx.read_batch(RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(vec![2, 3])), + Arc::new(Int32Array::from(vec![5, 6])), + ], + )?)?; + ``` - x y -0 2 5 -1 3 6 -2 7 8 -``` The `MERGE` syntax would be as follows: -```python -( - dt.merge( - source=source_data, - predicate="target.x = source.x", - source_alias="source", - target_alias="target") - .when_not_matched_insert( - updates={"x": "source.x", "y":"source.y"}) - .execute() -) - -> # inspect result -> print(dt.to_pandas().sort_values("x", ignore_index=True)) +== "Python" + ```python + ( + dt.merge( + source=source_data, + predicate="target.x = source.x", + source_alias="source", + target_alias="target") + .when_not_matched_insert( + updates={"x": "source.x", "y":"source.y"}) + .execute() + ) - x y -0 1 4 -1 2 5 -2 3 6 -3 7 8 -``` + > # inspect result + > print(dt.to_pandas().sort_values("x", ignore_index=True)) + + x y + 0 1 4 + 1 2 5 + 2 3 6 + 3 7 8 + ``` + +=== "Rust" + ```rust + DeltaOps(table) + .merge(source_data, "target.x = source.x") + .with_source_alias("source") + .with_target_alias("target") + .when_not_matched_insert( + |insert| insert.set("x", "source.x").set("y", "source.y") + )?.await?; + ``` The new row has been successfully added to the target dataset. @@ -164,18 +296,29 @@ source_data = pa.table({"x": [2, 3], "deleted": [False, True]}) You can delete the rows that match a predicate (in this case `"deleted" = True`) using: -```python -( - dt.merge( - source=source_data, - predicate="target.x = source.x", - source_alias="source", - target_alias="target") +=== "Python" + ```python + ( + dt.merge( + source=source_data, + predicate="target.x = source.x", + source_alias="source", + target_alias="target") + .when_matched_delete( + predicate="source.deleted = true") + .execute() + ) + ``` +=== "Rust" + ```rust + DeltaOps(table) + .merge(source_data, "target.x = source.x") + .with_source_alias("source") + .with_target_alias("target") .when_matched_delete( - predicate="source.deleted = true") - .execute() -) -``` + |delete| delete.predicate("source.deleted = true") + )?.await?; + ``` This will result in: @@ -197,25 +340,41 @@ To perform an upsert operation, use `when_matched_update` and `when_not_matched_ For example: -```python -target_data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) -write_deltalake("tmp_table", target_data) -dt = DeltaTable("tmp_table") -source_data = pa.table({"x": [2, 3, 5], "y": [5, 8, 11]}) +=== "Python" -( - dt.merge( - source=source_data, - predicate="target.x = source.x", - source_alias="source", - target_alias="target") + ```python + target_data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]}) + write_deltalake("tmp_table", target_data) + dt = DeltaTable("tmp_table") + source_data = pa.table({"x": [2, 3, 5], "y": [5, 8, 11]}) + + ( + dt.merge( + source=source_data, + predicate="target.x = source.x", + source_alias="source", + target_alias="target") + .when_matched_update( + updates={"x": "source.x", "y":"source.y"}) + .when_not_matched_insert( + updates={"x": "source.x", "y":"source.y"}) + .execute() + ) + ``` +=== "Rust" + ```rust + DeltaOps(table) + .merge(source_data, "target.x = source.x") + .with_source_alias("source") + .with_target_alias("target") .when_matched_update( - updates={"x": "source.x", "y":"source.y"}) + |update| update.update("x", "source.x").update("y", "source.y"), + )? .when_not_matched_insert( - updates={"x": "source.x", "y":"source.y"}) - .execute() -) -``` + |insert| insert.set("x", "source.x").set("y", "source.y"), + )? + .await?; + ``` This will give you the following output: @@ -235,21 +394,38 @@ Use the [`when_matched_delete`][deltalake.table.TableMerger.when_matched_delete] For example, given the same `target_data` and `source_data` used in the section above: -```python -( - dt.merge( - source=source_data, - predicate="target.x = source.x", - source_alias="source", - target_alias="target") +=== "Python" + ```python + ( + dt.merge( + source=source_data, + predicate="target.x = source.x", + source_alias="source", + target_alias="target") + .when_matched_update( + updates={"x": "source.x", "y":"source.y"}) + .when_not_matched_insert( + updates={"x": "source.x", "y":"source.y"}) + .when_not_matched_by_source_delete() + .execute() + ) + ``` +=== "Rust" + ```rust + DeltaOps(table) + .merge(source_data, "target.x = source.x") + .with_source_alias("source") + .with_target_alias("target") .when_matched_update( - updates={"x": "source.x", "y":"source.y"}) + |update| update.update("x", "source.x").update("y", "source.y"), + )? .when_not_matched_insert( - updates={"x": "source.x", "y":"source.y"}) - .when_not_matched_by_source_delete() - .execute() -) -``` + |insert| insert.set("x", "source.x").set("y", "source.y"), + )? + .when_not_matched_by_source_delete(|delete| delete)? + .await?; + ``` + This will result in: diff --git a/docs/usage/optimize/delta-lake-z-order.md b/docs/usage/optimize/delta-lake-z-order.md index 1eb03a871e..d415d3c6ed 100644 --- a/docs/usage/optimize/delta-lake-z-order.md +++ b/docs/usage/optimize/delta-lake-z-order.md @@ -10,7 +10,19 @@ If you Z Order the data by the `country` column, then individuals from the same Here's how to Z Order a Delta table: -```python -dt = DeltaTable("tmp") -dt.optimize.z_order(["country"]) -``` +=== "Python" + ```python + dt = DeltaTable("tmp") + dt.optimize.z_order(["country"]) + ``` + +=== "Rust" + ```rust + let table = open_table("tmp").await?; + + let (table, metrics) = DeltaOps(table) + .optimize() + .with_type(OptimizeType::ZOrder(vec!["country".to_string()])) + .await?; + println!("{:?}", metrics); + ``` diff --git a/docs/usage/optimize/small-file-compaction-with-optimize.md b/docs/usage/optimize/small-file-compaction-with-optimize.md index d86bba54f7..f5a6561380 100644 --- a/docs/usage/optimize/small-file-compaction-with-optimize.md +++ b/docs/usage/optimize/small-file-compaction-with-optimize.md @@ -16,55 +16,172 @@ Let’s start by creating a Delta table with a lot of small files so we can demo Start by writing a function that generates on thousand rows of random data given a timestamp. -```python -def record_observations(date: datetime) -> pa.Table: - """Pulls data for a certain datetime""" - nrows = 1000 - return pa.table( - { - "date": pa.array([date.date()] * nrows), - "timestamp": pa.array([date] * nrows), - "value": pc.random(nrows), - } - ) -``` +=== "Python" + ```python + def record_observations(date: datetime) -> pa.Table: + """Pulls data for a certain datetime""" + nrows = 1000 + return pa.table( + { + "date": pa.array([date.date()] * nrows), + "timestamp": pa.array([date] * nrows), + "value": pc.random(nrows), + } + ) + ``` + +=== "Rust" + ```rust + pub fn record_observations(timestamp: DateTime) -> RecordBatch { + let nrows = 1000; + let date = timestamp.date_naive(); + let timestamp = timestamp; + let value = (0..nrows) + .map(|_| rand::random::()) + .collect::>(); + let date = (0..nrows).map(|_| date).collect::>(); + let timestamp = (0..nrows) + .map(|_| timestamp.timestamp_micros()) + .collect::>(); + + let schema = Schema::new(vec![ + Field::new("date", arrow::datatypes::DataType::Date32, false), + Field::new( + "timestamp", + arrow::datatypes::DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into())), + false, + ), + Field::new("value", arrow::datatypes::DataType::Float64, false), + ]); + + RecordBatch::try_new( + Arc::new(schema), + vec![ + Arc::new(Date32Array::from( + date.iter() + .map(|d| Date32Type::from_naive_date(*d)) + .collect::>(), + )), + Arc::new(TimestampMicrosecondArray::from(timestamp).with_timezone("UTC")), + Arc::new(Float64Array::from(value)), + ], + ) + .unwrap() + } + ``` Let’s run this function and observe the output: -```python -record_observations(datetime(2021, 1, 1, 12)).to_pandas() - - date timestamp value -0 2021-01-01 2021-01-01 12:00:00 0.3186397383362023 -1 2021-01-01 2021-01-01 12:00:00 0.04253766974259088 -2 2021-01-01 2021-01-01 12:00:00 0.9355682965171573 -… -999 2021-01-01 2021-01-01 12:00:00 0.23207037062879843 -``` +=== "Python" + ```python + record_observations(datetime(2021, 1, 1, 12)).to_pandas() + + date timestamp value + 0 2021-01-01 2021-01-01 12:00:00 0.3186397383362023 + 1 2021-01-01 2021-01-01 12:00:00 0.04253766974259088 + 2 2021-01-01 2021-01-01 12:00:00 0.9355682965171573 + … + 999 2021-01-01 2021-01-01 12:00:00 0.23207037062879843 + ``` + +=== "Rust" + ```rust + let batch = record_observations("2021-01-01T12:00:00Z".parse::>().unwrap()); + println!("{}", pretty_format_batches(&vec![batch])?); + // +------------+---------------------+------------------------+ + // | date | timestamp | value | + // +------------+---------------------+------------------------+ + // | 2021-01-01 | 2021-01-01T12:00:00 | 0.4061923494886005 | + // | 2021-01-01 | 2021-01-01T12:00:00 | 0.9987878410434536 | + // | 2021-01-01 | 2021-01-01T12:00:00 | 0.5731950954440364 | + // | 2021-01-01 | 2021-01-01T12:00:00 | 0.44535166836074713 | + // | 2021-01-01 | 2021-01-01T12:00:00 | 0.7122994421129841 | + // | 2021-01-01 | 2021-01-01T12:00:00 | 0.09947198303405769 | + // | 2021-01-01 | 2021-01-01T12:00:00 | 0.02835490232344251 | + // | 2021-01-01 | 2021-01-01T12:00:00 | 0.565059582551305 | + // | 2021-01-01 | 2021-01-01T12:00:00 | 0.2149121627261419 | + // ... + + ``` Let’s write 100 hours worth of data to the Delta table. -```python -# Every hour starting at midnight on 2021-01-01 -hours_iter = (datetime(2021, 1, 1) + timedelta(hours=i) for i in itertools.count()) - -# Write 100 hours worth of data -for timestamp in itertools.islice(hours_iter, 100): - write_deltalake( - "observation_data", - record_observations(timestamp), - partition_by=["date"], - mode="append", - ) -``` +=== "Python" + + ```python + # Every hour starting at midnight on 2021-01-01 + hours_iter = (datetime(2021, 1, 1) + timedelta(hours=i) for i in itertools.count()) + + # Write 100 hours worth of data + for timestamp in itertools.islice(hours_iter, 100): + write_deltalake( + "observation_data", + record_observations(timestamp), + partition_by=["date"], + mode="append", + ) + ``` + +=== "Rust" + ```rust + let mut table = DeltaOps::try_from_uri("observation_data") + .await? + .create() + .with_table_name("observations_data") + .with_columns( + StructType::new(vec![ + StructField::new( + "date".to_string(), + DataType::Primitive(PrimitiveType::Date), + false, + ), + StructField::new( + "timestamp".to_string(), + DataType::Primitive(PrimitiveType::Timestamp), + false, + ), + StructField::new( + "value".to_string(), + DataType::Primitive(PrimitiveType::Double), + false, + ), + ]) + .fields() + .cloned(), + ) + .with_partition_columns(vec!["date"]) + .with_save_mode(SaveMode::Append) + .await?; + + let hours_iter = (0..).map(|i| { + "2021-01-01T00:00:00Z".parse::>().unwrap() + chrono::Duration::hours(i) + }); + + // write 100 hours worth of data + for timestamp in hours_iter.take(100) { + let batch = record_observations(timestamp); + let mut writer = deltalake::writer::RecordBatchWriter::for_table(&table)?; + writer + .write(batch) + .await?; + writer.flush_and_commit(&mut table).await?; + } + ``` This data was appended to the Delta table in 100 separate transactions, so the table will contain 100 transaction log entries and 100 data files. You can see the number of files with the `files()` method. -```python -dt = DeltaTable("observation_data") -len(dt.files()) # 100 -``` - +=== "Python" + ```python + dt = DeltaTable("observation_data") + len(dt.files()) # 100 + ``` + +=== "Rust" + ```rust + let table = open_table("observation_data").await?; + let files = table.get_files_iter()?; + println!("len: {}", files.count()); // len: 100 + ``` Here’s how the files are persisted in storage. ``` @@ -101,11 +218,19 @@ Each of these Parquet files are tiny - they’re only 10 KB. Let’s see how to Let’s run the optimize command to compact the existing small files into larger files: -```python -dt = DeltaTable("observation_data") +=== "Python" + ```python + dt = DeltaTable("observation_data") -dt.optimize.compact() -``` + dt.optimize.compact() + ``` + +=== "Rust" + ```rust + let table = open_table("observation_data").await?; + let (table, metrics) = DeltaOps(table).optimize().with_type(OptimizeType::Compact).await?; + println!("{:?}", metrics); + ``` Here’s the output of the command: @@ -137,36 +262,93 @@ Let’s append some more data to the Delta table and see how we can selectively Let’s append another 24 hours of data to the Delta table: -```python -for timestamp in itertools.islice(hours_iter, 24): - write_deltalake( - dt, - record_observations(timestamp), - partition_by=["date"], - mode="append", - ) -``` +=== "Python" + ```python + for timestamp in itertools.islice(hours_iter, 24): + write_deltalake( + dt, + record_observations(timestamp), + partition_by=["date"], + mode="append", + ) + ``` +=== "Rust" + ```rust + let mut table = open_table("observation_data").await?; + let hours_iter = (0..).map(|i| { + "2021-01-01T00:00:00Z".parse::>().unwrap() + chrono::Duration::hours(i) + }); + for timestamp in hours_iter.skip(100).take(24) { + let batch = record_observations(timestamp); + let mut writer = deltalake::writer::RecordBatchWriter::for_table(&table)?; + writer + .write(batch) + .await?; + writer.flush_and_commit(&mut table).await?; + } + ``` We can use `get_add_actions()` to introspect the table state. We can see that `2021-01-06` has only a few hours of data so far, so we don't want to optimize that yet. But `2021-01-05` has all 24 hours of data, so it's ready to be optimized. -```python -dt.get_add_actions(flatten=True).to_pandas()[ - "partition.date" -].value_counts().sort_index() - -2021-01-01 1 -2021-01-02 1 -2021-01-03 1 -2021-01-04 1 -2021-01-05 21 -2021-01-06 4 -``` +=== "Python" + ```python + dt.get_add_actions(flatten=True).to_pandas()[ + "partition.date" + ].value_counts().sort_index() + + 2021-01-01 1 + 2021-01-02 1 + 2021-01-03 1 + 2021-01-04 1 + 2021-01-05 21 + 2021-01-06 4 + ``` + +=== "Rust" + ```rust + let table = open_table("observation_data").await?; + let batch = table.snapshot()?.add_actions_table(true)?; + let ctx = SessionContext::new(); + ctx.register_batch("observations", batch.clone())?; + let df = ctx.sql(" + SELECT \"partition.date\", + COUNT(*) + FROM observations + GROUP BY \"partition.date\" + ORDER BY \"partition.date\"").await?; + df.show().await?; + + + +----------------+----------+ + | partition.date | count(*) | + +----------------+----------+ + | 2021-01-01 | 1 | + | 2021-01-02 | 1 | + | 2021-01-03 | 1 | + | 2021-01-04 | 1 | + | 2021-01-05 | 21 | + | 2021-01-06 | 4 | + +----------------+----------+ + ``` To optimize a single partition, you can pass in a `partition_filters` argument speficying which partitions to optimize. +=== "Python" + ```python + dt.optimize(partition_filters=[("date", "=", "2021-01-05")]) + ``` + +=== "Rust" + ```rust + let table = open_table("observation_data").await?; + let (table, metrics) = DeltaOps(table) + .optimize() + .with_type(OptimizeType::Compact) + .with_filters(&vec![("date", "=", "2021-01-05").try_into()?]) + .await?; + println!("{:?}", metrics); + ``` ```python -dt.optimize(partition_filters=[("date", "=", "2021-01-05")]) - {'numFilesAdded': 1, 'numFilesRemoved': 21, 'filesAdded': {'min': 238282, @@ -247,10 +429,22 @@ The vacuum command deletes all files from storage that are marked for removal in It’s normally a good idea to have a retention period of at least 7 days. For purposes of this example, we will set the retention period to zero, just so you can see how the files get removed from storage. Adjusting the retention period in this manner isn’t recommended for production use cases. Let’s run the vacuum command: +=== "Python" + ```python + dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False) + ``` +=== "Rust" + ```rust + let table = open_table("observation_data").await?; + let (table, metrics) = DeltaOps(table) + .vacuum() + .with_retention_period(chrono::Duration::days(0)) + .with_enforce_retention_duration(false) + .with_dry_run(false) + .await?; + println!("{:?}", metrics); + ``` -```python -dt.vacuum(retention_hours=0, enforce_retention_duration=False, dry_run=False) -``` The command returns a list of all the files that are removed from storage: diff --git a/docs/usage/querying-delta-tables.md b/docs/usage/querying-delta-tables.md index 0d693e0f5a..848f005ba4 100644 --- a/docs/usage/querying-delta-tables.md +++ b/docs/usage/querying-delta-tables.md @@ -115,4 +115,23 @@ Dask Name: read-parquet, 6 tasks 0 5 2021 12 4 0 6 2021 12 20 1 7 2021 12 20 +``` + +When working with the Rust API, Apache Datafusion can be used to query data from a delta table. + +```rust +let table = deltalake::open_table("../rust/tests/data/delta-0.8.0-partitioned").await?; +let ctx = SessionContext::new(); +ctx.register_table("simple_table", Arc::new(table.clone()))?; +let df = ctx.sql("SELECT value FROM simple_table WHERE year = 2021").await?; +df.show().await?; +``` + +Apache Datafusion also supports a Dataframe interface than can be used instead of the SQL interface: +```rust +let table = deltalake::open_table("../rust/tests/data/delta-0.8.0-partitioned").await?; +let ctx = SessionContext::new(); +let dataframe = ctx.read_table( Arc::new(table.clone()))?; +let df = dataframe.filter(col("year").eq(lit(2021)))?.select(vec![col("value")])?; +df.show().await?; ``` \ No newline at end of file diff --git a/docs/usage/read-cdf.md b/docs/usage/read-cdf.md index afbc07a6f9..0a4e250a11 100644 --- a/docs/usage/read-cdf.md +++ b/docs/usage/read-cdf.md @@ -2,6 +2,8 @@ Reading the CDF data from a table with change data is easy. +The `delta.enableChangeDataFeed` configuration needs to be set to `true` when creating the delta table. + ## Reading CDF Log {{ code_example('read_cdf', None, []) }}