Skip to content

Commit cdb92c8

Browse files
authored
docs: add apache parquet example (#476)
1 parent a1f924a commit cdb92c8

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

examples/README.md

+5
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,11 @@
1010
- [write_api_callbacks.py](write_api_callbacks.py) - How to handle batch events
1111
- [write_structured_data.py](write_structured_data.py) - How to write structured data - [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple), [Data Classes](https://docs.python.org/3/library/dataclasses.html) - (_requires Python v3.8+_)
1212
- [logging_handler.py](logging_handler.py) - How to set up a python native logging handler that writes to InfluxDB
13+
- [import_parquet.py](import_parquet.py) - How to import [Apache Parquet](https://parquet.apache.org/) data files,
14+
the example requires:
15+
- manually download [NYC TLC Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
16+
- install Apache Arrow `pip install pyarrow` dependency
17+
1318

1419
## Queries
1520
- [query.py](query.py) - How to query data into `FluxTable`s, `Stream` and `CSV`

examples/import_parquet.py

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import pyarrow.parquet as pq
2+
3+
from influxdb_client import InfluxDBClient, WriteOptions
4+
5+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", timeout=0, debug=False) as client:
6+
"""
7+
You can download NYC TLC Trip Record Data parquet file from https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
8+
"""
9+
table = pq.read_table('fhvhv_tripdata_2022-01.parquet')
10+
with client.write_api(write_options=WriteOptions(batch_size=50_000)) as write_api:
11+
12+
dataframe = table.to_pandas()
13+
"""
14+
Keep only interesting columns
15+
"""
16+
keep_df = dataframe[
17+
['dispatching_base_num', "PULocationID", "DOLocationID", "pickup_datetime", "dropoff_datetime", "shared_request_flag"]]
18+
print(keep_df.tail().to_string())
19+
20+
write_api.write(bucket="my-bucket", record=keep_df, data_frame_measurement_name="taxi-trip-data",
21+
data_frame_tag_columns=['dispatching_base_num', "shared_request_flag"],
22+
data_frame_timestamp_column="pickup_datetime")
23+
24+
"""
25+
Querying 10 pickups from dispatching 'B03404'
26+
"""
27+
query = '''
28+
from(bucket:"my-bucket")
29+
|> range(start: 2022-01-01T00:00:00Z, stop: now())
30+
|> filter(fn: (r) => r._measurement == "taxi-trip-data")
31+
|> filter(fn: (r) => r.dispatching_base_num == "B03404")
32+
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
33+
|> rename(columns: {_time: "pickup_datetime"})
34+
|> drop(columns: ["_start", "_stop"])
35+
|> limit(n:10, offset: 0)
36+
'''
37+
38+
result = client.query_api().query(query=query)
39+
40+
"""
41+
Processing results
42+
"""
43+
print()
44+
print("=== Querying 10 pickups from dispatching 'B03404' ===")
45+
print()
46+
for table in result:
47+
for record in table.records:
48+
print(
49+
f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}')

0 commit comments

Comments
 (0)