Skip to content

Commit 38d57ea

Browse files
yigal-rozenbergyigalrozenbergkevinjqliu
authored
Added support for Polars DataFrame and LazyFrame (#1614)
Polars (https://pola.rs) is an open-source library for data manipulation, known for being one of the fastest data processing solutions on a single machine. It features a well-structured, typed API that is both expressive and easy to use. this chnage is a simple 'to_polars' addiotn to the table api. iceberg_table = catalog.load_table('data.data_points') pdf = iceberg_table.scan().to_polars() print(pdf) --------- Co-authored-by: yigal.rozenberg <[email protected]> Co-authored-by: Kevin Liu <[email protected]> Co-authored-by: Kevin Liu <[email protected]>
1 parent 8e56ebb commit 38d57ea

File tree

5 files changed

+1203
-1703
lines changed

5 files changed

+1203
-1703
lines changed

mkdocs/docs/api.md

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,3 +1546,139 @@ df.show(2)
15461546

15471547
(Showing first 2 rows)
15481548
```
1549+
1550+
### Polars
1551+
1552+
PyIceberg interfaces closely with Polars Dataframes and LazyFrame which provides a full lazily optimized query engine interface on top of PyIceberg tables.
1553+
1554+
<!-- prettier-ignore-start -->
1555+
1556+
!!! note "Requirements"
1557+
This requires [`polars` to be installed](index.md).
1558+
1559+
```python
1560+
pip install pyiceberg['polars']
1561+
```
1562+
<!-- prettier-ignore-end -->
1563+
1564+
PyIceberg data can be analyzed and accessed through Polars using either DataFrame or LazyFrame.
1565+
If your code utilizes the Apache Iceberg data scanning and retrieval API and then analyzes the resulting DataFrame in Polars, use the `table.scan().to_polars()` API.
1566+
If the intent is to utilize Polars' high-performance filtering and retrieval functionalities, use LazyFrame exported from the Iceberg table with the `table.to_polars()` API.
1567+
1568+
```python
1569+
# Get LazyFrame
1570+
iceberg_table.to_polars()
1571+
1572+
# Get Data Frame
1573+
iceberg_table.scan().to_polars()
1574+
```
1575+
1576+
#### Working with Polars DataFrame
1577+
1578+
PyIceberg makes it easy to filter out data from a huge table and pull it into a Polars dataframe locally. This will only fetch the relevant Parquet files for the query and apply the filter. This will reduce IO and therefore improve performance and reduce cost.
1579+
1580+
```python
1581+
schema = Schema(
1582+
NestedField(field_id=1, name='ticket_id', field_type=LongType(), required=True),
1583+
NestedField(field_id=2, name='customer_id', field_type=LongType(), required=True),
1584+
NestedField(field_id=3, name='issue', field_type=StringType(), required=False),
1585+
NestedField(field_id=4, name='created_at', field_type=TimestampType(), required=True),
1586+
required=True
1587+
)
1588+
1589+
iceberg_table = catalog.create_table(
1590+
identifier='default.product_support_issues',
1591+
schema=schema
1592+
)
1593+
1594+
pa_table_data = pa.Table.from_pylist(
1595+
[
1596+
{'ticket_id': 1, 'customer_id': 546, 'issue': 'User Login issue', 'created_at': 1650020000000000},
1597+
{'ticket_id': 2, 'customer_id': 547, 'issue': 'Payment not going through', 'created_at': 1650028640000000},
1598+
{'ticket_id': 3, 'customer_id': 548, 'issue': 'Error on checkout', 'created_at': 1650037280000000},
1599+
{'ticket_id': 4, 'customer_id': 549, 'issue': 'Unable to reset password', 'created_at': 1650045920000000},
1600+
{'ticket_id': 5, 'customer_id': 550, 'issue': 'Account locked', 'created_at': 1650054560000000},
1601+
{'ticket_id': 6, 'customer_id': 551, 'issue': 'Order not received', 'created_at': 1650063200000000},
1602+
{'ticket_id': 7, 'customer_id': 552, 'issue': 'Refund not processed', 'created_at': 1650071840000000},
1603+
{'ticket_id': 8, 'customer_id': 553, 'issue': 'Shipping address issue', 'created_at': 1650080480000000},
1604+
{'ticket_id': 9, 'customer_id': 554, 'issue': 'Product damaged', 'created_at': 1650089120000000},
1605+
{'ticket_id': 10, 'customer_id': 555, 'issue': 'Unable to apply discount code', 'created_at': 1650097760000000},
1606+
{'ticket_id': 11, 'customer_id': 556, 'issue': 'Website not loading', 'created_at': 1650106400000000},
1607+
{'ticket_id': 12, 'customer_id': 557, 'issue': 'Incorrect order received', 'created_at': 1650115040000000},
1608+
{'ticket_id': 13, 'customer_id': 558, 'issue': 'Unable to track order', 'created_at': 1650123680000000},
1609+
{'ticket_id': 14, 'customer_id': 559, 'issue': 'Order delayed', 'created_at': 1650132320000000},
1610+
{'ticket_id': 15, 'customer_id': 560, 'issue': 'Product not as described', 'created_at': 1650140960000000},
1611+
{'ticket_id': 16, 'customer_id': 561, 'issue': 'Unable to contact support', 'created_at': 1650149600000000},
1612+
{'ticket_id': 17, 'customer_id': 562, 'issue': 'Duplicate charge', 'created_at': 1650158240000000},
1613+
{'ticket_id': 18, 'customer_id': 563, 'issue': 'Unable to update profile', 'created_at': 1650166880000000},
1614+
{'ticket_id': 19, 'customer_id': 564, 'issue': 'App crashing', 'created_at': 1650175520000000},
1615+
{'ticket_id': 20, 'customer_id': 565, 'issue': 'Unable to download invoice', 'created_at': 1650184160000000},
1616+
{'ticket_id': 21, 'customer_id': 566, 'issue': 'Incorrect billing amount', 'created_at': 1650192800000000},
1617+
], schema=iceberg_table.schema().as_arrow()
1618+
)
1619+
1620+
iceberg_table.append(
1621+
df=pa_table_data
1622+
)
1623+
1624+
table.scan(
1625+
row_filter="ticket_id > 10",
1626+
).to_polars()
1627+
```
1628+
1629+
This will return a Polars DataFrame:
1630+
1631+
```python
1632+
shape: (11, 4)
1633+
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
1634+
│ ticket_id ┆ customer_id ┆ issue ┆ created_at │
1635+
------------
1636+
│ i64 ┆ i64 ┆ str ┆ datetime[μs] │
1637+
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
1638+
11556 ┆ Website not loading ┆ 2022-04-16 10:53:20
1639+
12557 ┆ Incorrect order received ┆ 2022-04-16 13:17:20
1640+
13558 ┆ Unable to track order ┆ 2022-04-16 15:41:20
1641+
14559 ┆ Order delayed ┆ 2022-04-16 18:05:20
1642+
15560 ┆ Product not as described ┆ 2022-04-16 20:29:20
1643+
│ … ┆ … ┆ … ┆ … │
1644+
17562 ┆ Duplicate charge ┆ 2022-04-17 01:17:20
1645+
18563 ┆ Unable to update profile ┆ 2022-04-17 03:41:20
1646+
19564 ┆ App crashing ┆ 2022-04-17 06:05:20
1647+
20565 ┆ Unable to download invoice ┆ 2022-04-17 08:29:20
1648+
21566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20
1649+
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘
1650+
```
1651+
1652+
#### Working with Polars LazyFrame
1653+
1654+
PyIceberg supports creation of a Polars LazyFrame based on an Iceberg Table.
1655+
1656+
using the above code example:
1657+
1658+
```python
1659+
lf = iceberg_table.to_polars().filter(pl.col("ticket_id") > 10)
1660+
print(lf.collect())
1661+
```
1662+
1663+
This above code snippet returns a Polars LazyFrame and defines a filter to be executed by Polars:
1664+
1665+
```python
1666+
shape: (11, 4)
1667+
┌───────────┬─────────────┬────────────────────────────┬─────────────────────┐
1668+
│ ticket_id ┆ customer_id ┆ issue ┆ created_at │
1669+
------------
1670+
│ i64 ┆ i64 ┆ str ┆ datetime[μs] │
1671+
╞═══════════╪═════════════╪════════════════════════════╪═════════════════════╡
1672+
11556 ┆ Website not loading ┆ 2022-04-16 10:53:20
1673+
12557 ┆ Incorrect order received ┆ 2022-04-16 13:17:20
1674+
13558 ┆ Unable to track order ┆ 2022-04-16 15:41:20
1675+
14559 ┆ Order delayed ┆ 2022-04-16 18:05:20
1676+
15560 ┆ Product not as described ┆ 2022-04-16 20:29:20
1677+
│ … ┆ … ┆ … ┆ … │
1678+
17562 ┆ Duplicate charge ┆ 2022-04-17 01:17:20
1679+
18563 ┆ Unable to update profile ┆ 2022-04-17 03:41:20
1680+
19564 ┆ App crashing ┆ 2022-04-17 06:05:20
1681+
20565 ┆ Unable to download invoice ┆ 2022-04-17 08:29:20
1682+
21566 ┆ Incorrect billing amount ┆ 2022-04-17 10:53:20
1683+
└───────────┴─────────────┴────────────────────────────┴─────────────────────┘
1684+
```

mkdocs/docs/index.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ pip install "pyiceberg[s3fs,hive]"
4040

4141
You can mix and match optional dependencies depending on your needs:
4242

43-
| Key | Description: |
44-
|---------------|---------------------------------------------------------------------------|
45-
| hive | Support for the Hive metastore |
43+
| Key | Description: |
44+
| ------------ | ------------------------------------------------------------------------- |
45+
| hive | Support for the Hive metastore |
4646
| hive-kerberos | Support for Hive metastore in Kerberos environment |
4747
| glue | Support for AWS Glue |
4848
| dynamodb | Support for AWS DynamoDB |
@@ -53,6 +53,7 @@ You can mix and match optional dependencies depending on your needs:
5353
| duckdb | Installs both PyArrow and DuckDB |
5454
| ray | Installs PyArrow, Pandas, and Ray |
5555
| daft | Installs Daft |
56+
| polars | Installs Polars |
5657
| s3fs | S3FS as a FileIO implementation to interact with the object store |
5758
| adlfs | ADLFS as a FileIO implementation to interact with the object store |
5859
| snappy | Support for snappy Avro compression |

0 commit comments

Comments
 (0)