Skip to content

Commit

Permalink
Merge pull request #226 from treeverse/iceberg-add-row-level-diff
Browse files Browse the repository at this point in the history
Added Data Diff in Iceberg notebooks
  • Loading branch information
kesarwam authored Aug 27, 2024
2 parents 50ec28b + 9fb2f78 commit bf0d177
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 6 deletions.
1 change: 1 addition & 0 deletions 00_notebooks/00_index.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"* [**Data Collaboration**](./data-collaboration.ipynb)\n",
"* **lakeFS and Apache Iceberg**\n",
" * [Basic example](./iceberg-lakefs-basic.ipynb)\n",
" * [Book Sales example with multiple Iceberg tables](./iceberg-books.ipynb)\n",
" * [NYC Film Permits example](./iceberg-lakefs-nyc.ipynb)\n",
" * [What happens if you use Iceberg without the lakeFS support](./iceberg-lakefs-default.ipynb)\n",
"* **Using R with lakeFS**\n",
Expand Down
33 changes: 31 additions & 2 deletions 00_notebooks/iceberg-books.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.appName(\"Iceberg / Jupyter\") \\\n",
" .config(\"spark.jars.packages\", \"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,io.lakefs:lakefs-iceberg:0.1.3\") \\\n",
" .config(\"spark.jars.packages\", \"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,io.lakefs:lakefs-iceberg:0.1.3,io.lakefs:lakefs-spark-extensions_2.12:0.0.3\") \\\n",
" .config(\"spark.hadoop.fs.s3a.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\") \\\n",
" .config(\"spark.hadoop.fs.s3a.endpoint\", lakefsEndPoint) \\\n",
" .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\") \\\n",
Expand All @@ -243,7 +243,7 @@
" .config(\"spark.sql.catalog.lakefs.uri\", lakefsEndPoint) \\\n",
" .config(\"spark.sql.catalog.lakefs.cache-enabled\", \"false\") \\\n",
" .config(\"spark.sql.defaultCatalog\", \"lakefs\") \\\n",
" .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\") \\\n",
" .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.lakefs.iceberg.extension.LakeFSSparkSessionExtensions\") \\\n",
" .getOrCreate()\n",
"spark.sparkContext.setLogLevel(\"INFO\")\n",
"\n",
Expand Down Expand Up @@ -629,6 +629,35 @@
"\n"
]
},
{
"cell_type": "markdown",
"id": "dcb1e2d4-aeb1-4ce1-9d45-c9c99a8a7b99",
"metadata": {},
"source": [
"## `Data diff`\n",
"refs_data_diff is an SQL table-valued function (TVF). The expression:\n",
"##### `refs_data_diff(PREFIX, FROM_SCHEMA, TO_SCHEMA, TABLE)`\n",
"yields a relation that compares the \"from\" table PREFIX.FROM_SCHEMA.TABLE with the \"to\" table PREFIX.TO_SCHEMA.TABLE. Its output is the difference: a relation (like a view) that adds a single column lakefs_change to the table schema.\n",
"\n",
"* Rows that appear only in the first version of the table (in the example, on branch main) appear in the difference with lakefs_change==’-’.\n",
"* Rows that appear only in the second version of the table (in the example, on branch dev) appear in the difference with lakefs_change==’+’.\n",
"* Rows that appear in both versions of the table do not appear in the difference."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4f2a2788-9fea-4eac-9f8c-257ed96c916c",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT * FROM refs_data_diff('lakefs', 'main', 'dev', 'lakefs_demo.book_sales');"
]
},
{
"cell_type": "markdown",
"id": "b0cacc8c-32e0-4eb7-8271-89136e82bc64",
Expand Down
77 changes: 75 additions & 2 deletions 00_notebooks/iceberg-lakefs-basic.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.appName(\"Iceberg / Jupyter\") \\\n",
" .config(\"spark.jars.packages\", \"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,io.lakefs:lakefs-iceberg:0.1.3\") \\\n",
" .config(\"spark.jars.packages\", \"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,io.lakefs:lakefs-iceberg:0.1.3,io.lakefs:lakefs-spark-extensions_2.12:0.0.3\") \\\n",
" .config(\"spark.hadoop.fs.s3a.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\") \\\n",
" .config(\"spark.hadoop.fs.s3a.endpoint\", lakefsEndPoint) \\\n",
" .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\") \\\n",
Expand All @@ -245,7 +245,7 @@
" .config(\"spark.sql.catalog.lakefs.uri\", lakefsEndPoint) \\\n",
" .config(\"spark.sql.catalog.lakefs.cache-enabled\", \"false\") \\\n",
" .config(\"spark.sql.defaultCatalog\", \"lakefs\") \\\n",
" .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\") \\\n",
" .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.lakefs.iceberg.extension.LakeFSSparkSessionExtensions\") \\\n",
" .getOrCreate()\n",
"spark.sparkContext.setLogLevel(\"INFO\")\n",
"\n",
Expand Down Expand Up @@ -539,6 +539,35 @@
"SELECT * FROM `dev`.lakefs_demo.my_table EXCEPT SELECT * FROM `dev@`.lakefs_demo.my_table "
]
},
{
"cell_type": "markdown",
"id": "9d57536c-a931-416d-9060-7fb74612308f",
"metadata": {},
"source": [
"## `Data diff`\n",
"refs_data_diff is an SQL table-valued function (TVF). The expression:\n",
"##### `refs_data_diff(PREFIX, FROM_SCHEMA, TO_SCHEMA, TABLE)`\n",
"yields a relation that compares the \"from\" table PREFIX.FROM_SCHEMA.TABLE with the \"to\" table PREFIX.TO_SCHEMA.TABLE. Its output is the difference: a relation (like a view) that adds a single column lakefs_change to the table schema.\n",
"\n",
"* Rows that appear only in the first version of the table (in the example, on branch main) appear in the difference with lakefs_change==’-’.\n",
"* Rows that appear only in the second version of the table (in the example, on branch dev) appear in the difference with lakefs_change==’+’.\n",
"* Rows that appear in both versions of the table do not appear in the difference."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "19230643-b084-4ada-9013-962c377f24c5",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT * FROM refs_data_diff('lakefs', 'main', 'dev', 'lakefs_demo.my_table');"
]
},
{
"cell_type": "markdown",
"id": "945baddf-61bf-4260-8cc7-e362a1a4ee50",
Expand Down Expand Up @@ -817,6 +846,28 @@
"SELECT * FROM `dev`.lakefs_demo.my_table EXCEPT SELECT * FROM `dev^1`.lakefs_demo.my_table "
]
},
{
"cell_type": "markdown",
"id": "bc8b1436-db2e-45fb-bdd7-68ac3042b750",
"metadata": {},
"source": [
"## `Data diff between main and dev`"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "90cf61b2-d173-4992-b5f7-f600b803f841",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT * FROM refs_data_diff('lakefs', 'main', 'dev', 'lakefs_demo.my_table');"
]
},
{
"cell_type": "markdown",
"id": "a9dbca95-1081-445f-9fd6-5710d90bf17d",
Expand Down Expand Up @@ -925,6 +976,28 @@
"SELECT * FROM dev.lakefs_demo.my_table EXCEPT SELECT * FROM `dev-tag-01`.lakefs_demo.my_table"
]
},
{
"cell_type": "markdown",
"id": "ad00da04-574e-4288-952b-3f489d069001",
"metadata": {},
"source": [
"## `Data diff between main and dev`"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "04391123-15c3-4371-9ad3-01b9a5a022e6",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT * FROM refs_data_diff('lakefs', 'main', 'dev', 'lakefs_demo.my_table');"
]
},
{
"cell_type": "markdown",
"id": "7c5982ee-c99d-4914-a836-e33cb70ca3b3",
Expand Down
91 changes: 89 additions & 2 deletions 00_notebooks/iceberg-lakefs-nyc.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@
"source": [
"from pyspark.sql import SparkSession\n",
"spark = SparkSession.builder.appName(\"Iceberg / Jupyter\") \\\n",
" .config(\"spark.jars.packages\", \"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,io.lakefs:lakefs-iceberg:0.1.1\") \\\n",
" .config(\"spark.jars.packages\", \"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,io.lakefs:lakefs-iceberg:0.1.1,io.lakefs:lakefs-spark-extensions_2.12:0.0.3\") \\\n",
" .config(\"spark.hadoop.fs.s3.impl\", \"org.apache.hadoop.fs.s3a.S3AFileSystem\") \\\n",
" .config(\"spark.hadoop.fs.s3a.endpoint\", lakefsEndPoint) \\\n",
" .config(\"spark.hadoop.fs.s3a.path.style.access\", \"true\") \\\n",
Expand All @@ -244,7 +244,7 @@
" .config(\"spark.sql.catalog.lakefs.uri\", lakefsEndPoint) \\\n",
" .config(\"spark.sql.catalog.lakefs.cache-enabled\", \"false\") \\\n",
" .config(\"spark.sql.defaultCatalog\", \"lakefs\") \\\n",
" .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\") \\\n",
" .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.lakefs.iceberg.extension.LakeFSSparkSessionExtensions\") \\\n",
" .getOrCreate()\n",
"spark.sparkContext.setLogLevel(\"INFO\")\n",
"\n",
Expand Down Expand Up @@ -728,6 +728,93 @@
"GROUP BY borough"
]
},
{
"cell_type": "markdown",
"id": "678319c2-7c8a-4542-89e0-fa028dd6d789",
"metadata": {},
"source": [
"## `Data diff`\n",
"refs_data_diff is an SQL table-valued function (TVF). The expression:\n",
"##### `refs_data_diff(PREFIX, FROM_SCHEMA, TO_SCHEMA, TABLE)`\n",
"yields a relation that compares the \"from\" table PREFIX.FROM_SCHEMA.TABLE with the \"to\" table PREFIX.TO_SCHEMA.TABLE. Its output is the difference: a relation (like a view) that adds a single column lakefs_change to the table schema.\n",
"\n",
"* Rows that appear only in the first version of the table (in the example, on branch main) appear in the difference with lakefs_change==’-’.\n",
"* Rows that appear only in the second version of the table (in the example, on branch dev) appear in the difference with lakefs_change==’+’.\n",
"* Rows that appear in both versions of the table do not appear in the difference."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "78f52d4f-6c0e-4da3-9d36-2ef4cdf6ad59",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT * FROM refs_data_diff('lakefs', 'main', 'dev', 'nyc.permits') LIMIT 5;"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "11768471-5f67-419f-80f5-3a1ac6314b19",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT lakefs_change, borough, count(*) AS permit_diffs_cnt\n",
"FROM refs_data_diff('lakefs', 'main', 'dev', 'nyc.permits')\n",
"GROUP BY lakefs_change, borough;"
]
},
{
"cell_type": "markdown",
"id": "4da9f250-ad50-4655-ae96-a52208f2ab06",
"metadata": {},
"source": [
"# Partition the data in the `dev` branch"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07434883-1ce5-4acc-afea-65afaaed428a",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"CREATE TABLE lakefs.dev.nyc.permits_partitioned\n",
"USING iceberg\n",
"PARTITIONED BY (borough)\n",
"AS SELECT * FROM lakefs.dev.nyc.permits\n",
"ORDER BY borough;"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "096d031f-5fc5-47d0-9b29-05a4b6222441",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%sql\n",
"\n",
"SELECT borough, count(*) permit_cnt\n",
"FROM lakefs.dev.nyc.permits_partitioned\n",
"GROUP BY borough"
]
},
{
"cell_type": "markdown",
"id": "fa771262-2dde-4ecc-b1a1-3dac9f580cfa",
Expand Down

0 comments on commit bf0d177

Please sign in to comment.