diff --git a/CHANGELOG.md b/CHANGELOG.md index 1511f9f9..e7d08c81 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,10 @@ ## 1.47.0 [unreleased] ### Bug Fixes -1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Add type validation to url attribute in client object -1. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py + +1. [#672](https://github.com/influxdata/influxdb-client-python/pull/672): Adding type validation to url attribute in client object +2. [#674](https://github.com/influxdata/influxdb-client-python/pull/674): Add type linting to client.flux_table.FluxTable, remove duplicated `from pathlib import Path` at setup.py +3. [#675](https://github.com/influxdata/influxdb-client-python/pull/675): Ensures WritePrecision in Point is preferred to `DEFAULT_PRECISION` ## 1.46.0 [2024-09-13] diff --git a/influxdb_client/client/write_api_async.py b/influxdb_client/client/write_api_async.py index e9e2018b..38937eca 100644 --- a/influxdb_client/client/write_api_async.py +++ b/influxdb_client/client/write_api_async.py @@ -1,5 +1,6 @@ """Collect and async write time series data to InfluxDB Cloud or InfluxDB OSS.""" import logging +from asyncio import ensure_future, gather from collections import defaultdict from typing import Union, Iterable, NamedTuple @@ -114,12 +115,20 @@ async def write(self, bucket: str, org: str = None, self._append_default_tags(record) payloads = defaultdict(list) - self._serialize(record, write_precision, payloads, precision_from_point=False, **kwargs) - - # joint list by \n - body = b'\n'.join(payloads[write_precision]) - response = await self._write_service.post_write_async(org=org, bucket=bucket, body=body, - precision=write_precision, async_req=False, - _return_http_data_only=False, - content_type="text/plain; charset=utf-8") - return response[1] in (201, 204) + self._serialize(record, write_precision, payloads, precision_from_point=True, **kwargs) + + futures = [] + for payload_precision, payload_line in payloads.items(): + futures.append(ensure_future + (self._write_service.post_write_async(org=org, bucket=bucket, + body=b'\n'.join(payload_line), + precision=payload_precision, async_req=False, + _return_http_data_only=False, + content_type="text/plain; charset=utf-8"))) + + results = await gather(*futures, return_exceptions=True) + for result in results: + if isinstance(result, Exception): + raise result + + return False not in [re[1] in (201, 204) for re in results] diff --git a/tests/test_InfluxDBClientAsync.py b/tests/test_InfluxDBClientAsync.py index 7f8c6214..cb0586b9 100644 --- a/tests/test_InfluxDBClientAsync.py +++ b/tests/test_InfluxDBClientAsync.py @@ -1,11 +1,15 @@ import asyncio +import dateutil.parser import logging +import math import re +import time import unittest import os from datetime import datetime, timezone from io import StringIO +import pandas import pytest import warnings from aioresponses import aioresponses @@ -199,30 +203,151 @@ async def test_write_empty_data(self): self.assertEqual(True, response) + def gen_fractional_utc(self, nano, precision) -> str: + raw_sec = nano / 1_000_000_000 + if precision == WritePrecision.NS: + rem = f"{nano % 1_000_000_000}".rjust(9,"0").rstrip("0") + return (datetime.fromtimestamp(math.floor(raw_sec), tz=timezone.utc) + .isoformat() + .replace("+00:00", "") + f".{rem}Z") + #f".{rem}Z")) + elif precision == WritePrecision.US: + # rem = f"{round(nano / 1_000) % 1_000_000}"#.ljust(6,"0") + return (datetime.fromtimestamp(round(raw_sec,6), tz=timezone.utc) + .isoformat() + .replace("+00:00","") + .strip("0") + "Z" + ) + elif precision == WritePrecision.MS: + #rem = f"{round(nano / 1_000_000) % 1_000}".rjust(3, "0") + return (datetime.fromtimestamp(round(raw_sec,3), tz=timezone.utc) + .isoformat() + .replace("+00:00","") + .strip("0") + "Z" + ) + elif precision == WritePrecision.S: + return (datetime.fromtimestamp(round(raw_sec), tz=timezone.utc) + .isoformat() + .replace("+00:00","Z")) + else: + raise ValueError(f"Unknown precision: {precision}") + + @async_test async def test_write_points_different_precision(self): + now_ns = time.time_ns() + now_us = now_ns / 1_000 + now_ms = now_us / 1_000 + now_s = now_ms / 1_000 + + now_date_s = self.gen_fractional_utc(now_ns, WritePrecision.S) + now_date_ms = self.gen_fractional_utc(now_ns, WritePrecision.MS) + now_date_us = self.gen_fractional_utc(now_ns, WritePrecision.US) + now_date_ns = self.gen_fractional_utc(now_ns, WritePrecision.NS) + + points = { + WritePrecision.S: [], + WritePrecision.MS: [], + WritePrecision.US: [], + WritePrecision.NS: [] + } + + expected = {} + measurement = generate_name("measurement") - _point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3) \ - .time(datetime.fromtimestamp(0, tz=timezone.utc), write_precision=WritePrecision.S) - _point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3) \ - .time(datetime.fromtimestamp(1, tz=timezone.utc), write_precision=WritePrecision.MS) - _point3 = Point(measurement).tag("location", "Berlin").field("temperature", 24.3) \ - .time(datetime.fromtimestamp(2, tz=timezone.utc), write_precision=WritePrecision.NS) - await self.client.write_api().write(bucket="my-bucket", record=[_point1, _point2, _point3], + # basic date-time value + points[WritePrecision.S].append(Point(measurement).tag("method", "SecDateTime").field("temperature", 25.3) \ + .time(datetime.fromtimestamp(round(now_s), tz=timezone.utc), write_precision=WritePrecision.S)) + expected['SecDateTime'] = now_date_s + points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDateTime").field("temperature", 24.3) \ + .time(datetime.fromtimestamp(round(now_s,3), tz=timezone.utc), write_precision=WritePrecision.MS)) + expected['MilDateTime'] = now_date_ms + points[WritePrecision.US].append(Point(measurement).tag("method", "MicDateTime").field("temperature", 24.3) \ + .time(datetime.fromtimestamp(round(now_s,6), tz=timezone.utc), write_precision=WritePrecision.US)) + expected['MicDateTime'] = now_date_us + # N.B. datetime does not handle nanoseconds +# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDateTime").field("temperature", 24.3) \ +# .time(datetime.fromtimestamp(now_s, tz=timezone.utc), write_precision=WritePrecision.NS)) + + # long timestamps based on POSIX time + points[WritePrecision.S].append(Point(measurement).tag("method", "SecPosix").field("temperature", 24.3) \ + .time(round(now_s), write_precision=WritePrecision.S)) + expected['SecPosix'] = now_date_s + points[WritePrecision.MS].append(Point(measurement).tag("method", "MilPosix").field("temperature", 24.3) \ + .time(round(now_ms), write_precision=WritePrecision.MS)) + expected['MilPosix'] = now_date_ms + points[WritePrecision.US].append(Point(measurement).tag("method", "MicPosix").field("temperature", 24.3) \ + .time(round(now_us), write_precision=WritePrecision.US)) + expected['MicPosix'] = now_date_us + points[WritePrecision.NS].append(Point(measurement).tag("method", "NanPosix").field("temperature", 24.3) \ + .time(now_ns, write_precision=WritePrecision.NS)) + expected['NanPosix'] = now_date_ns + + # ISO Zulu datetime with ms, us and ns e.g. "2024-09-27T13:17:16.412399728Z" + points[WritePrecision.S].append(Point(measurement).tag("method", "SecDTZulu").field("temperature", 24.3) \ + .time(now_date_s, write_precision=WritePrecision.S)) + expected['SecDTZulu'] = now_date_s + points[WritePrecision.MS].append(Point(measurement).tag("method", "MilDTZulu").field("temperature", 24.3) \ + .time(now_date_ms, write_precision=WritePrecision.MS)) + expected['MilDTZulu'] = now_date_ms + points[WritePrecision.US].append(Point(measurement).tag("method", "MicDTZulu").field("temperature", 24.3) \ + .time(now_date_us, write_precision=WritePrecision.US)) + expected['MicDTZulu'] = now_date_us + # This keeps resulting in micro second resolution in response +# points[WritePrecision.NS].append(Point(measurement).tag("method", "NanDTZulu").field("temperature", 24.3) \ +# .time(now_date_ns, write_precision=WritePrecision.NS)) + + recs = [x for x in [v for v in points.values()]] + + await self.client.write_api().write(bucket="my-bucket", record=recs, write_precision=WritePrecision.NS) query = f''' from(bucket:"my-bucket") |> range(start: 0) |> filter(fn: (r) => r["_measurement"] == "{measurement}") - |> keep(columns: ["_time"]) + |> keep(columns: ["method","_time"]) ''' query_api = self.client.query_api() + # ensure calls fully processed on server + await asyncio.sleep(1) + raw = await query_api.query_raw(query) - self.assertEqual(8, len(raw.splitlines())) - self.assertEqual(',,0,1970-01-01T00:00:02Z', raw.splitlines()[4]) - self.assertEqual(',,0,1970-01-01T00:00:01Z', raw.splitlines()[5]) - self.assertEqual(',,0,1970-01-01T00:00:00Z', raw.splitlines()[6]) + linesRaw = raw.splitlines()[4:] + + lines = [] + for lnr in linesRaw: + lines.append(lnr[2:].split(",")) + + def get_time_for_method(lines, method): + for l in lines: + if l[2] == method: + return l[1] + return "" + + self.assertEqual(15, len(raw.splitlines())) + + for key in expected: + t = get_time_for_method(lines,key) + comp_time = dateutil.parser.isoparse(get_time_for_method(lines,key)) + target_time = dateutil.parser.isoparse(expected[key]) + self.assertEqual(target_time.date(), comp_time.date()) + self.assertEqual(target_time.hour, comp_time.hour) + self.assertEqual(target_time.second,comp_time.second) + dif = abs(target_time.microsecond - comp_time.microsecond) + if key[:3] == "Sec": + # Already tested + pass + elif key[:3] == "Mil": + # may be slight rounding differences + self.assertLess(dif, 1500, f"failed to match timestamp for {key} {target_time} != {comp_time}") + elif key[:3] == "Mic": + # may be slight rounding differences + self.assertLess(dif, 150, f"failed to match timestamp for {key} {target_time} != {comp_time}") + elif key[:3] == "Nan": + self.assertEqual(expected[key], get_time_for_method(lines, key)) + else: + raise Exception(f"Unhandled key {key}") @async_test async def test_delete_api(self):