Skip to content

Commit

Permalink
fix: multiprocessing example
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar committed Aug 12, 2024
1 parent 0dcc35b commit 52d4d6a
Showing 1 changed file with 61 additions and 61 deletions.
122 changes: 61 additions & 61 deletions examples/import_data_set_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
https://github.com/toddwschneider/nyc-taxi-data
"""
import concurrent.futures
import gzip
import io
import multiprocessing
from collections import OrderedDict
Expand Down Expand Up @@ -92,10 +93,10 @@ def parse_row(row: OrderedDict):

return Point("taxi-trip-data") \
.tag("dispatching_base_num", row['dispatching_base_num']) \
.tag("PULocationID", row['PULocationID']) \
.tag("DOLocationID", row['DOLocationID']) \
.tag("PULocationID", row['PUlocationID']) \
.tag("DOLocationID", row['DOlocationID']) \
.tag("SR_Flag", row['SR_Flag']) \
.field("dropoff_datetime", row['dropoff_datetime']) \
.field("dropoff_datetime", row['dropOff_datetime']) \
.time(row['pickup_datetime']) \
.to_line_protocol()

Expand Down Expand Up @@ -141,80 +142,79 @@ def init_counter(counter, progress, queue):
progress_ = Value('i', 0)
startTime = datetime.now()

url = "https://s3.amazonaws.com/nyc-tlc/trip+data/fhv_tripdata_2019-01.csv"
# url = "file:///Users/bednar/Developer/influxdata/influxdb-client-python/examples/fhv_tripdata_2019-01.csv"
url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-01.csv.gz"

"""
Open URL and for stream data
"""
response = urlopen(url)
if response.headers:
content_length = response.headers['Content-length']
io_wrapper = ProgressTextIOWrapper(response)
io_wrapper.progress = progress_

"""
Start writer as a new process
Open GZIP stream
"""
writer = InfluxDBWriter(queue_)
writer.start()
with gzip.open(response, 'rb') as stream:
io_wrapper = ProgressTextIOWrapper(stream, encoding='utf-8')
io_wrapper.progress = progress_

"""
Create process pool for parallel encoding into LineProtocol
"""
cpu_count = multiprocessing.cpu_count()
with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter,
initargs=(counter_, progress_, queue_)) as executor:
"""
Converts incoming HTTP stream into sequence of LineProtocol
Start writer as a new process
"""
data = rx \
.from_iterable(DictReader(io_wrapper)) \
.pipe(ops.buffer_with_count(10_000),
# Parse 10_000 rows into LineProtocol on subprocess
ops.flat_map(lambda rows: executor.submit(parse_rows, rows, content_length)))
writer = InfluxDBWriter(queue_)
writer.start()

"""
Write data into InfluxDB
Create process pool for parallel encoding into LineProtocol
"""
data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}'))

"""
Terminate Writer
"""
queue_.put(None)
queue_.join()
cpu_count = multiprocessing.cpu_count()
with concurrent.futures.ProcessPoolExecutor(cpu_count, initializer=init_counter,
initargs=(counter_, progress_, queue_)) as executor:
"""
Converts incoming HTTP stream into sequence of LineProtocol
"""
data = rx \
.from_iterable(DictReader(io_wrapper)) \
.pipe(ops.buffer_with_count(10_000),
# Parse 10_000 rows into LineProtocol on subprocess
ops.map(lambda rows: executor.submit(parse_rows, rows, content_length)))

"""
Write data into InfluxDB
"""
data.subscribe(on_next=lambda x: None, on_error=lambda ex: print(f'Unexpected error: {ex}'))

print()
print(f'Import finished in: {datetime.now() - startTime}')
print()

"""
Querying 10 pickups from dispatching 'B00008'
"""
query = 'from(bucket:"my-bucket")' \
'|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
'|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
'|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
'|> rename(columns: {_time: "pickup_datetime"})' \
'|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False)
result = client.query_api().query(query=query)
"""
Terminate Writer
"""
queue_.put(None)
queue_.join()

"""
Processing results
"""
print()
print("=== Querying 10 pickups from dispatching 'B00008' ===")
print()
for table in result:
for record in table.records:
print(
f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}')
print()
print(f'Import finished in: {datetime.now() - startTime}')
print()

"""
Close client
"""
client.close()
"""
Querying 10 pickups from dispatching 'B00008'
"""
query = 'from(bucket:"my-bucket")' \
'|> range(start: 2019-01-01T00:00:00Z, stop: now()) ' \
'|> filter(fn: (r) => r._measurement == "taxi-trip-data")' \
'|> filter(fn: (r) => r.dispatching_base_num == "B00008")' \
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' \
'|> rename(columns: {_time: "pickup_datetime"})' \
'|> drop(columns: ["_start", "_stop"])|> limit(n:10, offset: 0)'

with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client:
result = client.query_api().query(query=query)

"""
Processing results
"""
print()
print("=== Querying 10 pickups from dispatching 'B00008' ===")
print()
for table in result:
for record in table.records:
print(
f'Dispatching: {record["dispatching_base_num"]} pickup: {record["pickup_datetime"]} dropoff: {record["dropoff_datetime"]}')

0 comments on commit 52d4d6a

Please sign in to comment.