Skip to content

Commit

Permalink
fix: async write prec where DEFAULT_PRECISION should not be used (#675)
Browse files Browse the repository at this point in the history
* fix: (WIP) issue 669 write precision to default in async API

* chore: fix lint issues

* docs: update CHANGELOG.md

* chore: improve indexing of range
  • Loading branch information
karel-rehor authored Oct 9, 2024
1 parent 28a4a04 commit 7e01edb
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 23 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
## 1.47.0 [unreleased]

### Bug Fixes
1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Add type validation to url attribute in client object
1. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py

1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Adding type validation to url attribute in client object
2. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py
3. [#675](https://github.com/influxdata/influxdb-client-python/pull/675): Ensures WritePrecision in Point is preferred to `DEFAULT_PRECISION`

## 1.46.0 [2024-09-13]

Expand Down
27 changes: 18 additions & 9 deletions influxdb_client/client/write_api_async.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Collect and async write time series data to InfluxDB Cloud or InfluxDB OSS."""
import logging
from asyncio import ensure_future, gather
from collections import defaultdict
from typing import Union, Iterable, NamedTuple

Expand Down Expand Up @@ -114,12 +115,20 @@ async def write(self, bucket: str, org: str = None,
self._append_default_tags(record)

payloads = defaultdict(list)
self._serialize(record, write_precision, payloads, precision_from_point=False, **kwargs)

# joint list by \n
body = b'\n'.join(payloads[write_precision])
response = await self._write_service.post_write_async(org=org, bucket=bucket, body=body,
precision=write_precision, async_req=False,
_return_http_data_only=False,
content_type="text/plain; charset=utf-8")
return response[1] in (201, 204)
self._serialize(record, write_precision, payloads, precision_from_point=True, **kwargs)

futures = []
for payload_precision, payload_line in payloads.items():
futures.append(ensure_future
(self._write_service.post_write_async(org=org, bucket=bucket,
body=b'\n'.join(payload_line),
precision=payload_precision, async_req=False,
_return_http_data_only=False,
content_type="text/plain; charset=utf-8")))

results = await gather(*futures, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
raise result

return False not in [re[1] in (201, 204) for re in results]
149 changes: 137 additions & 12 deletions tests/test_InfluxDBClientAsync.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import asyncio
import dateutil.parser
import logging
import math
import re
import time
import unittest
import os
from datetime import datetime, timezone
from io import StringIO

import pandas
import pytest
import warnings
from aioresponses import aioresponses
Expand Down Expand Up @@ -199,30 +203,151 @@ async def test_write_empty_data(self):

self.assertEqual(True, response)

def gen_fractional_utc(self, nano, precision) -> str:
raw_sec = nano / 1_000_000_000
if precision == WritePrecision.NS:
rem = f"{nano % 1_000_000_000}".rjust(9,"0").rstrip("0")
return (datetime.fromtimestamp(math.floor(raw_sec), tz=timezone.utc)
.isoformat()
.replace("+00:00", "") + f".{rem}Z")
#f".{rem}Z"))
elif precision == WritePrecision.US:
# rem = f"{round(nano / 1_000) % 1_000_000}"#.ljust(6,"0")
return (datetime.fromtimestamp(round(raw_sec,6), tz=timezone.utc)
.isoformat()
.replace("+00:00","")
.strip("0") + "Z"
)
elif precision == WritePrecision.MS:
#rem = f"{round(nano / 1_000_000) % 1_000}".rjust(3, "0")
return (datetime.fromtimestamp(round(raw_sec,3), tz=timezone.utc)
.isoformat()
.replace("+00:00","")
.strip("0") + "Z"
)
elif precision == WritePrecision.S:
return (datetime.fromtimestamp(round(raw_sec), tz=timezone.utc)
.isoformat()
.replace("+00:00","Z"))
else:
raise ValueError(f"Unknown precision: {precision}")


@async_test
async def test_write_points_different_precision(self):
now_ns = time.time_ns()
now_us = now_ns / 1_000
now_ms = now_us / 1_000
now_s = now_ms / 1_000

now_date_s = self.gen_fractional_utc(now_ns, WritePrecision.S)
now_date_ms = self.gen_fractional_utc(now_ns, WritePrecision.MS)
now_date_us = self.gen_fractional_utc(now_ns, WritePrecision.US)
now_date_ns = self.gen_fractional_utc(now_ns, WritePrecision.NS)

points = {
WritePrecision.S: [],
WritePrecision.MS: [],
WritePrecision.US: [],
WritePrecision.NS: []
}

expected = {}

measurement = generate_name("measurement")
_point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) \
.time(datetime.fromtimestamp(0, tz=timezone.utc), write_precision=WritePrecision.S)
_point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3) \
.time(datetime.fromtimestamp(1, tz=timezone.utc), write_precision=WritePrecision.MS)
_point3 = Point(measurement).tag("location", "Berlin").field("temperature", 24.3) \
.time(datetime.fromtimestamp(2, tz=timezone.utc), write_precision=WritePrecision.NS)
await self.client.write_api().write(bucket="my-bucket", record=[_point1, _point2, _point3],
# basic date-time value
points[WritePrecision.S].append(Point(measurement).tag("method", "SecDateTime").field("temperature", 25.3) \
.time(datetime.fromtimestamp(round(now_s), tz=timezone.utc), write_precision=WritePrecision.S))
expected['SecDateTime'] = now_date_s
points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDateTime").field("temperature", 24.3) \
.time(datetime.fromtimestamp(round(now_s,3), tz=timezone.utc), write_precision=WritePrecision.MS))
expected['MilDateTime'] = now_date_ms
points[WritePrecision.US].append(Point(measurement).tag("method", "MicDateTime").field("temperature", 24.3) \
.time(datetime.fromtimestamp(round(now_s,6), tz=timezone.utc), write_precision=WritePrecision.US))
expected['MicDateTime'] = now_date_us
# N.B. datetime does not handle nanoseconds
# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDateTime").field("temperature", 24.3) \
# .time(datetime.fromtimestamp(now_s, tz=timezone.utc), write_precision=WritePrecision.NS))

# long timestamps based on POSIX time
points[WritePrecision.S].append(Point(measurement).tag("method", "SecPosix").field("temperature", 24.3) \
.time(round(now_s), write_precision=WritePrecision.S))
expected['SecPosix'] = now_date_s
points[WritePrecision.MS].append(Point(measurement).tag("method", "MilPosix").field("temperature", 24.3) \
.time(round(now_ms), write_precision=WritePrecision.MS))
expected['MilPosix'] = now_date_ms
points[WritePrecision.US].append(Point(measurement).tag("method", "MicPosix").field("temperature", 24.3) \
.time(round(now_us), write_precision=WritePrecision.US))
expected['MicPosix'] = now_date_us
points[WritePrecision.NS].append(Point(measurement).tag("method", "NanPosix").field("temperature", 24.3) \
.time(now_ns, write_precision=WritePrecision.NS))
expected['NanPosix'] = now_date_ns

# ISO Zulu datetime with ms, us and ns e.g. "2024-09-27T13:17:16.412399728Z"
points[WritePrecision.S].append(Point(measurement).tag("method", "SecDTZulu").field("temperature", 24.3) \
.time(now_date_s, write_precision=WritePrecision.S))
expected['SecDTZulu'] = now_date_s
points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDTZulu").field("temperature", 24.3) \
.time(now_date_ms, write_precision=WritePrecision.MS))
expected['MilDTZulu'] = now_date_ms
points[WritePrecision.US].append(Point(measurement).tag("method", "MicDTZulu").field("temperature", 24.3) \
.time(now_date_us, write_precision=WritePrecision.US))
expected['MicDTZulu'] = now_date_us
# This keeps resulting in micro second resolution in response
# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDTZulu").field("temperature", 24.3) \
# .time(now_date_ns, write_precision=WritePrecision.NS))

recs = [x for x in [v for v in points.values()]]

await self.client.write_api().write(bucket="my-bucket", record=recs,
write_precision=WritePrecision.NS)
query = f'''
from(bucket:"my-bucket")
|> range(start: 0)
|> filter(fn: (r) => r["_measurement"] == "{measurement}")
|> keep(columns: ["_time"])
|> keep(columns: ["method","_time"])
'''
query_api = self.client.query_api()

# ensure calls fully processed on server
await asyncio.sleep(1)

raw = await query_api.query_raw(query)
self.assertEqual(8, len(raw.splitlines()))
self.assertEqual(',,0,1970-01-01T00:00:02Z', raw.splitlines()[4])
self.assertEqual(',,0,1970-01-01T00:00:01Z', raw.splitlines()[5])
self.assertEqual(',,0,1970-01-01T00:00:00Z', raw.splitlines()[6])
linesRaw = raw.splitlines()[4:]

lines = []
for lnr in linesRaw:
lines.append(lnr[2:].split(","))

def get_time_for_method(lines, method):
for l in lines:
if l[2] == method:
return l[1]
return ""

self.assertEqual(15, len(raw.splitlines()))

for key in expected:
t = get_time_for_method(lines,key)
comp_time = dateutil.parser.isoparse(get_time_for_method(lines,key))
target_time = dateutil.parser.isoparse(expected[key])
self.assertEqual(target_time.date(), comp_time.date())
self.assertEqual(target_time.hour, comp_time.hour)
self.assertEqual(target_time.second,comp_time.second)
dif = abs(target_time.microsecond - comp_time.microsecond)
if key[:3] == "Sec":
# Already tested
pass
elif key[:3] == "Mil":
# may be slight rounding differences
self.assertLess(dif, 1500, f"failed to match timestamp for {key} {target_time} != {comp_time}")
elif key[:3] == "Mic":
# may be slight rounding differences
self.assertLess(dif, 150, f"failed to match timestamp for {key} {target_time} != {comp_time}")
elif key[:3] == "Nan":
self.assertEqual(expected[key], get_time_for_method(lines, key))
else:
raise Exception(f"Unhandled key {key}")

@async_test
async def test_delete_api(self):
Expand Down

0 comments on commit 7e01edb

Please sign in to comment.