From ecd004302f68dc398ababf032935f8cbdd8a9142 Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Mon, 23 Sep 2024 11:47:30 +0200 Subject: [PATCH 01/13] fix: Add a threading.Timer to ensure logs are flushed --- src/axiom_py/logging.py | 23 +++++++++++++++++------ tests/test_logger.py | 29 ++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/src/axiom_py/logging.py b/src/axiom_py/logging.py index f6d0aac..dedcfae 100644 --- a/src/axiom_py/logging.py +++ b/src/axiom_py/logging.py @@ -2,6 +2,7 @@ import time import atexit +from threading import Timer from logging import Handler, NOTSET, getLogger, WARNING from .client import Client @@ -15,13 +16,14 @@ class AxiomHandler(Handler): buffer: list interval: int last_run: float + timer: Timer def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): super().__init__() - # set urllib3 logging level to warning, check: + # Set urllib3 logging level to warning, check: # https://github.com/axiomhq/axiom-py/issues/23 - # This is a temp solution that would stop requests - # library from flooding the logs with debug messages + # This is a temp solution that would stop requests library from + # flooding the logs with debug messages getLogger("urllib3").setLevel(WARNING) self.client = client self.dataset = dataset @@ -29,20 +31,29 @@ def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): self.last_run = time.monotonic() self.interval = interval - # register flush on exit, + # We use a threading.Timer to make sure we flush every second, even + # if no more logs are emitted. + self.timer = Timer(self.interval, self.flush) + + # Register flush on exit, atexit.register(self.flush) def emit(self, record): - """emit sends a log to Axiom.""" + """Emit sends a log to Axiom.""" self.buffer.append(record.__dict__) if ( len(self.buffer) >= 1000 or time.monotonic() - self.last_run > self.interval ): self.flush() + else: + self.timer.cancel() + self.timer = Timer(self.interval, self.flush) + self.timer.start() def flush(self): - """flush sends all logs in the logcache to Axiom.""" + """Flush sends all logs in the buffer to Axiom.""" + self.timer.cancel() self.last_run = time.monotonic() if len(self.buffer) == 0: return diff --git a/tests/test_logger.py b/tests/test_logger.py index e60cc6f..e8de5ab 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -3,6 +3,8 @@ import os import logging import unittest +import time + from .helpers import get_random_name from axiom_py import Client from axiom_py.logging import AxiomHandler @@ -10,35 +12,44 @@ class TestLogger(unittest.TestCase): def test_log(self): - """Tests a simple log""" + """Tests the logger""" client = Client( os.getenv("AXIOM_TOKEN"), os.getenv("AXIOM_ORG_ID"), os.getenv("AXIOM_URL"), ) - # create a dataset for that purpose + # Create a dataset for that purpose dataset_name = get_random_name() client.datasets.create( - dataset_name, "a dataset to test axiom-py logger" + dataset_name, "A dataset to test axiom-py logger" ) - axiom_handler = AxiomHandler(client, dataset_name) + axiom_handler = AxiomHandler(client, dataset_name, interval=0.4) logger = logging.getLogger() logger.addHandler(axiom_handler) - logger.warning("foo") + logger.info("This is a log!") - # this log shouldn't be ingested yet + # This log shouldn't be ingested yet res = client.apl_query(dataset_name) self.assertEqual(0, res.status.rowsExamined) - # flush events + # Flush events axiom_handler.flush() - # now we should have a log + # Now we should have a log res = client.apl_query(dataset_name) self.assertEqual(1, res.status.rowsExamined) - # cleanup created dataset + logger.info("This log should be ingested without any subsequent call") + + # Sleep a bit to wait for the background flush. + time.sleep(0.5) + + # Now we should have two logs + res = client.apl_query(dataset_name) + self.assertEqual(2, res.status.rowsExamined) + + # Cleanup created dataset client.datasets.delete(dataset_name) From 8005357ea227097623758ae4807833fbe5cdcd1b Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Mon, 23 Sep 2024 17:31:58 +0200 Subject: [PATCH 02/13] test: Add a sleep --- tests/test_logger.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/test_logger.py b/tests/test_logger.py index e8de5ab..2ecebd9 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -38,6 +38,9 @@ def test_log(self): # Flush events axiom_handler.flush() + # Wait a bit (???) + time.sleep(0.2) + # Now we should have a log res = client.apl_query(dataset_name) self.assertEqual(1, res.status.rowsExamined) From c20c870aefaeb9a0d9e0a19a1c41e527b7c8645d Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Tue, 24 Sep 2024 17:27:54 +0200 Subject: [PATCH 03/13] test: Remove sleep --- tests/test_logger.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/test_logger.py b/tests/test_logger.py index 2ecebd9..e8de5ab 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -38,9 +38,6 @@ def test_log(self): # Flush events axiom_handler.flush() - # Wait a bit (???) - time.sleep(0.2) - # Now we should have a log res = client.apl_query(dataset_name) self.assertEqual(1, res.status.rowsExamined) From 069d756d265320b2138270baaf2f21a1192c83df Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Wed, 25 Sep 2024 11:09:47 +0200 Subject: [PATCH 04/13] fix: Remove monotonic time check, improve shutdown logic Change logger tests to warning Co-authored-by: Darach --- src/axiom_py/client.py | 13 +++++++++---- src/axiom_py/logging.py | 29 +++++++++++------------------ tests/test_logger.py | 9 ++++++--- 3 files changed, 26 insertions(+), 25 deletions(-) diff --git a/src/axiom_py/client.py b/src/axiom_py/client.py index 830ef24..dc4bb8d 100644 --- a/src/axiom_py/client.py +++ b/src/axiom_py/client.py @@ -7,7 +7,7 @@ import os from enum import Enum from humps import decamelize -from typing import Optional, List, Dict +from typing import Optional, List, Dict, Callable from logging import getLogger from dataclasses import dataclass, field, asdict from datetime import datetime @@ -130,7 +130,8 @@ class Client: # pylint: disable=R0903 datasets: DatasetsClient users: UsersClient annotations: AnnotationsClient - is_closed: bool # track if the client has been closed (for tests) + is_closed: bool = False # track if the client has been closed (for tests) + before_shutdown_funcs: List[Callable] = [] def __init__( self, @@ -179,10 +180,14 @@ def __init__( self.annotations = AnnotationsClient(self.session, self.logger) # wrap shutdown hook in a lambda passing in self as a ref - atexit.register(lambda: self.shutdown_hook()) - self.is_closed = False + atexit.register(self.shutdown_hook) + + def before_shutdown(self, func: Callable): + self.before_shutdown_funcs.append(func) def shutdown_hook(self): + for func in self.before_shutdown_funcs: + func() self.session.close() self.is_closed = True diff --git a/src/axiom_py/logging.py b/src/axiom_py/logging.py index dedcfae..3786999 100644 --- a/src/axiom_py/logging.py +++ b/src/axiom_py/logging.py @@ -1,7 +1,5 @@ """Logging contains the AxiomHandler and related methods to do with logging.""" -import time -import atexit from threading import Timer from logging import Handler, NOTSET, getLogger, WARNING @@ -15,7 +13,6 @@ class AxiomHandler(Handler): dataset: str buffer: list interval: int - last_run: float timer: Timer def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): @@ -28,34 +25,30 @@ def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): self.client = client self.dataset = dataset self.buffer = [] - self.last_run = time.monotonic() self.interval = interval # We use a threading.Timer to make sure we flush every second, even # if no more logs are emitted. self.timer = Timer(self.interval, self.flush) - # Register flush on exit, - atexit.register(self.flush) + # Make sure we flush before the client shuts down + client.before_shutdown(lambda: self.flush(False)) def emit(self, record): """Emit sends a log to Axiom.""" self.buffer.append(record.__dict__) - if ( - len(self.buffer) >= 1000 - or time.monotonic() - self.last_run > self.interval - ): + if len(self.buffer) >= 1000: self.flush() - else: - self.timer.cancel() - self.timer = Timer(self.interval, self.flush) - self.timer.start() - def flush(self): + def flush(self, restart_timer=True): """Flush sends all logs in the buffer to Axiom.""" self.timer.cancel() - self.last_run = time.monotonic() + if restart_timer: + self.timer = Timer(self.interval, self.flush) + self.timer.start() + if len(self.buffer) == 0: return - self.client.ingest_events(self.dataset, self.buffer) - self.buffer = [] + + local_buffer, self.buffer = self.buffer, [] + self.client.ingest_events(self.dataset, local_buffer) diff --git a/tests/test_logger.py b/tests/test_logger.py index e8de5ab..442594d 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -29,7 +29,7 @@ def test_log(self): logger = logging.getLogger() logger.addHandler(axiom_handler) - logger.info("This is a log!") + logger.warning("This is a log!") # This log shouldn't be ingested yet res = client.apl_query(dataset_name) @@ -40,12 +40,15 @@ def test_log(self): # Now we should have a log res = client.apl_query(dataset_name) + print(res) self.assertEqual(1, res.status.rowsExamined) - logger.info("This log should be ingested without any subsequent call") + logger.warning( + "This log should be ingested without any subsequent call" + ) # Sleep a bit to wait for the background flush. - time.sleep(0.5) + time.sleep(1) # Now we should have two logs res = client.apl_query(dataset_name) From 77ffb9a12140c0285cede15facbb9525f8546cef Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Wed, 25 Sep 2024 11:19:59 +0200 Subject: [PATCH 05/13] fix: Move timer restart to emit func --- src/axiom_py/logging.py | 13 +++++++------ tests/test_logger.py | 5 ++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/axiom_py/logging.py b/src/axiom_py/logging.py index 3786999..2325189 100644 --- a/src/axiom_py/logging.py +++ b/src/axiom_py/logging.py @@ -32,7 +32,7 @@ def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): self.timer = Timer(self.interval, self.flush) # Make sure we flush before the client shuts down - client.before_shutdown(lambda: self.flush(False)) + client.before_shutdown(self.flush) def emit(self, record): """Emit sends a log to Axiom.""" @@ -40,12 +40,13 @@ def emit(self, record): if len(self.buffer) >= 1000: self.flush() - def flush(self, restart_timer=True): - """Flush sends all logs in the buffer to Axiom.""" + # Restart timer self.timer.cancel() - if restart_timer: - self.timer = Timer(self.interval, self.flush) - self.timer.start() + self.timer = Timer(self.interval, self.flush) + self.timer.start() + + def flush(self): + """Flush sends all logs in the buffer to Axiom.""" if len(self.buffer) == 0: return diff --git a/tests/test_logger.py b/tests/test_logger.py index 442594d..fe92f9a 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -24,7 +24,7 @@ def test_log(self): dataset_name, "A dataset to test axiom-py logger" ) - axiom_handler = AxiomHandler(client, dataset_name, interval=0.4) + axiom_handler = AxiomHandler(client, dataset_name, interval=0.5) logger = logging.getLogger() logger.addHandler(axiom_handler) @@ -40,7 +40,6 @@ def test_log(self): # Now we should have a log res = client.apl_query(dataset_name) - print(res) self.assertEqual(1, res.status.rowsExamined) logger.warning( @@ -48,7 +47,7 @@ def test_log(self): ) # Sleep a bit to wait for the background flush. - time.sleep(1) + time.sleep(1.0) # Now we should have two logs res = client.apl_query(dataset_name) From 240b32baf45a717d9c6f34daf82bebda4fabd0ff Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Wed, 25 Sep 2024 11:29:48 +0200 Subject: [PATCH 06/13] fix: Cancel the active timer on shutdown --- src/axiom_py/logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/axiom_py/logging.py b/src/axiom_py/logging.py index 2325189..1d72357 100644 --- a/src/axiom_py/logging.py +++ b/src/axiom_py/logging.py @@ -32,7 +32,7 @@ def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): self.timer = Timer(self.interval, self.flush) # Make sure we flush before the client shuts down - client.before_shutdown(self.flush) + client.before_shutdown(lambda: self.flush(), self.timer.cancel()) def emit(self, record): """Emit sends a log to Axiom.""" From c33459ca53f3ec0e2c05cda3772c9d9dc6e82428 Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Wed, 25 Sep 2024 13:19:15 +0200 Subject: [PATCH 07/13] chore: Remove sdk info logs --- src/axiom_py/annotations.py | 6 +----- src/axiom_py/client.py | 9 ++------- src/axiom_py/datasets.py | 8 +------- 3 files changed, 4 insertions(+), 19 deletions(-) diff --git a/src/axiom_py/annotations.py b/src/axiom_py/annotations.py index 91e056c..9f966f7 100644 --- a/src/axiom_py/annotations.py +++ b/src/axiom_py/annotations.py @@ -1,7 +1,6 @@ """This package provides annotation models and methods as well as an AnnotationsClient""" import ujson -from logging import Logger from requests import Session from typing import List, Optional from dataclasses import dataclass, asdict, field @@ -55,9 +54,8 @@ class AnnotationsClient: # pylint: disable=R0903 session: Session - def __init__(self, session: Session, logger: Logger): + def __init__(self, session: Session): self.session = session - self.logger = logger def get(self, id: str) -> Annotation: """ @@ -79,7 +77,6 @@ def create(self, req: AnnotationCreateRequest) -> Annotation: path = "/v2/annotations" res = self.session.post(path, data=ujson.dumps(asdict(req))) annotation = from_dict(Annotation, res.json()) - self.logger.info(f"created new annotation: {annotation.id}") return annotation def list( @@ -120,7 +117,6 @@ def update(self, id: str, req: AnnotationUpdateRequest) -> Annotation: path = "/v2/annotations/%s" % id res = self.session.put(path, data=ujson.dumps(asdict(req))) annotation = from_dict(Annotation, res.json()) - self.logger.info(f"updated annotation({annotation.id})") return annotation def delete(self, id: str): diff --git a/src/axiom_py/client.py b/src/axiom_py/client.py index dc4bb8d..9ca3094 100644 --- a/src/axiom_py/client.py +++ b/src/axiom_py/client.py @@ -8,7 +8,6 @@ from enum import Enum from humps import decamelize from typing import Optional, List, Dict, Callable -from logging import getLogger from dataclasses import dataclass, field, asdict from datetime import datetime from requests_toolbelt.sessions import BaseUrlSession @@ -147,7 +146,6 @@ def __init__( if url_base is None: url_base = AXIOM_URL - self.logger = getLogger() # set exponential retries retries = Retry( total=3, backoff_factor=2, status_forcelist=[500, 502, 503, 504] @@ -172,12 +170,11 @@ def __init__( # if there is an organization id passed, # set it in the header if org_id: - self.logger.info("found organization id: %s" % org_id) self.session.headers.update({"X-Axiom-Org-Id": org_id}) - self.datasets = DatasetsClient(self.session, self.logger) + self.datasets = DatasetsClient(self.session) self.users = UsersClient(self.session, is_personal_token(token)) - self.annotations = AnnotationsClient(self.session, self.logger) + self.annotations = AnnotationsClient(self.session) # wrap shutdown hook in a lambda passing in self as a ref atexit.register(self.shutdown_hook) @@ -264,7 +261,6 @@ def query_legacy( result = from_dict(QueryLegacyResult, res.json()) self.logger.debug(f"query result: {result}") query_id = res.headers.get("X-Axiom-History-Query-Id") - self.logger.info(f"received query result with query_id: {query_id}") result.savedQueryID = query_id return result @@ -297,7 +293,6 @@ def query( result = from_dict(QueryResult, res.json()) self.logger.debug(f"apl query result: {result}") query_id = res.headers.get("X-Axiom-History-Query-Id") - self.logger.info(f"received query result with query_id: {query_id}") result.savedQueryID = query_id return result diff --git a/src/axiom_py/datasets.py b/src/axiom_py/datasets.py index c293b75..0f46c0f 100644 --- a/src/axiom_py/datasets.py +++ b/src/axiom_py/datasets.py @@ -3,7 +3,6 @@ """ import ujson -from logging import Logger from requests import Session from typing import List from dataclasses import dataclass, asdict, field @@ -52,9 +51,8 @@ class DatasetsClient: # pylint: disable=R0903 session: Session - def __init__(self, session: Session, logger: Logger): + def __init__(self, session: Session): self.session = session - self.logger = logger def get(self, id: str) -> Dataset: """ @@ -86,7 +84,6 @@ def create(self, name: str, description: str = "") -> Dataset: ), ) ds = from_dict(Dataset, res.json()) - self.logger.info(f"created new dataset: {ds.name}") return ds def get_list(self) -> List[Dataset]: @@ -123,9 +120,6 @@ def update(self, id: str, new_description: str) -> Dataset: ), ) ds = from_dict(Dataset, res.json()) - self.logger.info( - f"updated dataset({ds.name}) with new desc: {ds.description}" - ) return ds def delete(self, id: str): From eb325df64596a6fa7ebed2e9f7122f80ea31c419 Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Mon, 30 Sep 2024 15:20:09 +0200 Subject: [PATCH 08/13] fix: Add condition to flush after 1s This fixes a bug where if you emit an event every 900ms it'll never flush (until you get to 1000 events). --- src/axiom_py/logging.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/axiom_py/logging.py b/src/axiom_py/logging.py index 1d72357..8af0944 100644 --- a/src/axiom_py/logging.py +++ b/src/axiom_py/logging.py @@ -1,8 +1,9 @@ """Logging contains the AxiomHandler and related methods to do with logging.""" from threading import Timer - from logging import Handler, NOTSET, getLogger, WARNING +import time + from .client import Client @@ -13,6 +14,7 @@ class AxiomHandler(Handler): dataset: str buffer: list interval: int + last_flush: float timer: Timer def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): @@ -37,7 +39,10 @@ def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): def emit(self, record): """Emit sends a log to Axiom.""" self.buffer.append(record.__dict__) - if len(self.buffer) >= 1000: + if ( + len(self.buffer) >= 1000 + or time.monotonic() - self.last_run > self.interval + ): self.flush() # Restart timer @@ -48,6 +53,8 @@ def emit(self, record): def flush(self): """Flush sends all logs in the buffer to Axiom.""" + self.last_flush = time.monotonic() + if len(self.buffer) == 0: return From 7221766f5e60eeb813aa124cc903ac9d9b0db6df Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Mon, 30 Sep 2024 15:27:05 +0200 Subject: [PATCH 09/13] fix: Store before_shutdown hook in def --- src/axiom_py/logging.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/axiom_py/logging.py b/src/axiom_py/logging.py index 8af0944..ea998e9 100644 --- a/src/axiom_py/logging.py +++ b/src/axiom_py/logging.py @@ -34,7 +34,11 @@ def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): self.timer = Timer(self.interval, self.flush) # Make sure we flush before the client shuts down - client.before_shutdown(lambda: self.flush(), self.timer.cancel()) + def before_shutdown(): + self.flush() + self.timer.cancel() + + client.before_shutdown(before_shutdown) def emit(self, record): """Emit sends a log to Axiom.""" From 54dd6319f7504255cd08bc51e3b154d7d59adb53 Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Mon, 30 Sep 2024 15:28:25 +0200 Subject: [PATCH 10/13] fix: Wrong var name --- src/axiom_py/logging.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/axiom_py/logging.py b/src/axiom_py/logging.py index ea998e9..f911708 100644 --- a/src/axiom_py/logging.py +++ b/src/axiom_py/logging.py @@ -45,7 +45,7 @@ def emit(self, record): self.buffer.append(record.__dict__) if ( len(self.buffer) >= 1000 - or time.monotonic() - self.last_run > self.interval + or time.monotonic() - self.last_flush > self.interval ): self.flush() From 86791a88fab4e8d9bd958986331160913b97ab03 Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Mon, 30 Sep 2024 15:30:12 +0200 Subject: [PATCH 11/13] fix: Initialize last_flush --- src/axiom_py/logging.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/axiom_py/logging.py b/src/axiom_py/logging.py index f911708..ae7e024 100644 --- a/src/axiom_py/logging.py +++ b/src/axiom_py/logging.py @@ -28,6 +28,7 @@ def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): self.dataset = dataset self.buffer = [] self.interval = interval + self.last_flush = time.monotonic() # We use a threading.Timer to make sure we flush every second, even # if no more logs are emitted. From 9cabec7150d8ace2b15354e972f9b504f8b5c766 Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Mon, 30 Sep 2024 15:32:59 +0200 Subject: [PATCH 12/13] fix: Remove more sdk log statements --- src/axiom_py/client.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/axiom_py/client.py b/src/axiom_py/client.py index 9ca3094..885abbf 100644 --- a/src/axiom_py/client.py +++ b/src/axiom_py/client.py @@ -255,11 +255,9 @@ def query_legacy( path = "/v1/datasets/%s/query" % id payload = ujson.dumps(asdict(query), default=handle_json_serialization) - self.logger.debug("sending query %s" % payload) params = self._prepare_query_options(opts) res = self.session.post(path, data=payload, params=params) result = from_dict(QueryLegacyResult, res.json()) - self.logger.debug(f"query result: {result}") query_id = res.headers.get("X-Axiom-History-Query-Id") result.savedQueryID = query_id return result @@ -287,11 +285,9 @@ def query( self._prepare_apl_payload(apl, opts), default=handle_json_serialization, ) - self.logger.debug("sending query %s" % payload) params = self._prepare_apl_options(opts) res = self.session.post(path, data=payload, params=params) result = from_dict(QueryResult, res.json()) - self.logger.debug(f"apl query result: {result}") query_id = res.headers.get("X-Axiom-History-Query-Id") result.savedQueryID = query_id return result From 4d34bf36349800435cfbf6c2adfa63c55679f8e6 Mon Sep 17 00:00:00 2001 From: Arne Bahlo Date: Mon, 30 Sep 2024 16:49:44 +0200 Subject: [PATCH 13/13] test: Adjust timings for test_logger.py --- tests/test_logger.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_logger.py b/tests/test_logger.py index fe92f9a..d244ce1 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -24,7 +24,7 @@ def test_log(self): dataset_name, "A dataset to test axiom-py logger" ) - axiom_handler = AxiomHandler(client, dataset_name, interval=0.5) + axiom_handler = AxiomHandler(client, dataset_name, interval=1.0) logger = logging.getLogger() logger.addHandler(axiom_handler) @@ -38,6 +38,9 @@ def test_log(self): # Flush events axiom_handler.flush() + # Wait a bit for the ingest to finish + time.sleep(0.5) + # Now we should have a log res = client.apl_query(dataset_name) self.assertEqual(1, res.status.rowsExamined) @@ -46,8 +49,8 @@ def test_log(self): "This log should be ingested without any subsequent call" ) - # Sleep a bit to wait for the background flush. - time.sleep(1.0) + # Wait for the background flush. + time.sleep(1.5) # Now we should have two logs res = client.apl_query(dataset_name)