diff --git a/src/axiom_py/annotations.py b/src/axiom_py/annotations.py index 91e056c..9f966f7 100644 --- a/src/axiom_py/annotations.py +++ b/src/axiom_py/annotations.py @@ -1,7 +1,6 @@ """This package provides annotation models and methods as well as an AnnotationsClient""" import ujson -from logging import Logger from requests import Session from typing import List, Optional from dataclasses import dataclass, asdict, field @@ -55,9 +54,8 @@ class AnnotationsClient: # pylint: disable=R0903 session: Session - def __init__(self, session: Session, logger: Logger): + def __init__(self, session: Session): self.session = session - self.logger = logger def get(self, id: str) -> Annotation: """ @@ -79,7 +77,6 @@ def create(self, req: AnnotationCreateRequest) -> Annotation: path = "/v2/annotations" res = self.session.post(path, data=ujson.dumps(asdict(req))) annotation = from_dict(Annotation, res.json()) - self.logger.info(f"created new annotation: {annotation.id}") return annotation def list( @@ -120,7 +117,6 @@ def update(self, id: str, req: AnnotationUpdateRequest) -> Annotation: path = "/v2/annotations/%s" % id res = self.session.put(path, data=ujson.dumps(asdict(req))) annotation = from_dict(Annotation, res.json()) - self.logger.info(f"updated annotation({annotation.id})") return annotation def delete(self, id: str): diff --git a/src/axiom_py/client.py b/src/axiom_py/client.py index 830ef24..885abbf 100644 --- a/src/axiom_py/client.py +++ b/src/axiom_py/client.py @@ -7,8 +7,7 @@ import os from enum import Enum from humps import decamelize -from typing import Optional, List, Dict -from logging import getLogger +from typing import Optional, List, Dict, Callable from dataclasses import dataclass, field, asdict from datetime import datetime from requests_toolbelt.sessions import BaseUrlSession @@ -130,7 +129,8 @@ class Client: # pylint: disable=R0903 datasets: DatasetsClient users: UsersClient annotations: AnnotationsClient - is_closed: bool # track if the client has been closed (for tests) + is_closed: bool = False # track if the client has been closed (for tests) + before_shutdown_funcs: List[Callable] = [] def __init__( self, @@ -146,7 +146,6 @@ def __init__( if url_base is None: url_base = AXIOM_URL - self.logger = getLogger() # set exponential retries retries = Retry( total=3, backoff_factor=2, status_forcelist=[500, 502, 503, 504] @@ -171,18 +170,21 @@ def __init__( # if there is an organization id passed, # set it in the header if org_id: - self.logger.info("found organization id: %s" % org_id) self.session.headers.update({"X-Axiom-Org-Id": org_id}) - self.datasets = DatasetsClient(self.session, self.logger) + self.datasets = DatasetsClient(self.session) self.users = UsersClient(self.session, is_personal_token(token)) - self.annotations = AnnotationsClient(self.session, self.logger) + self.annotations = AnnotationsClient(self.session) # wrap shutdown hook in a lambda passing in self as a ref - atexit.register(lambda: self.shutdown_hook()) - self.is_closed = False + atexit.register(self.shutdown_hook) + + def before_shutdown(self, func: Callable): + self.before_shutdown_funcs.append(func) def shutdown_hook(self): + for func in self.before_shutdown_funcs: + func() self.session.close() self.is_closed = True @@ -253,13 +255,10 @@ def query_legacy( path = "/v1/datasets/%s/query" % id payload = ujson.dumps(asdict(query), default=handle_json_serialization) - self.logger.debug("sending query %s" % payload) params = self._prepare_query_options(opts) res = self.session.post(path, data=payload, params=params) result = from_dict(QueryLegacyResult, res.json()) - self.logger.debug(f"query result: {result}") query_id = res.headers.get("X-Axiom-History-Query-Id") - self.logger.info(f"received query result with query_id: {query_id}") result.savedQueryID = query_id return result @@ -286,13 +285,10 @@ def query( self._prepare_apl_payload(apl, opts), default=handle_json_serialization, ) - self.logger.debug("sending query %s" % payload) params = self._prepare_apl_options(opts) res = self.session.post(path, data=payload, params=params) result = from_dict(QueryResult, res.json()) - self.logger.debug(f"apl query result: {result}") query_id = res.headers.get("X-Axiom-History-Query-Id") - self.logger.info(f"received query result with query_id: {query_id}") result.savedQueryID = query_id return result diff --git a/src/axiom_py/datasets.py b/src/axiom_py/datasets.py index c293b75..0f46c0f 100644 --- a/src/axiom_py/datasets.py +++ b/src/axiom_py/datasets.py @@ -3,7 +3,6 @@ """ import ujson -from logging import Logger from requests import Session from typing import List from dataclasses import dataclass, asdict, field @@ -52,9 +51,8 @@ class DatasetsClient: # pylint: disable=R0903 session: Session - def __init__(self, session: Session, logger: Logger): + def __init__(self, session: Session): self.session = session - self.logger = logger def get(self, id: str) -> Dataset: """ @@ -86,7 +84,6 @@ def create(self, name: str, description: str = "") -> Dataset: ), ) ds = from_dict(Dataset, res.json()) - self.logger.info(f"created new dataset: {ds.name}") return ds def get_list(self) -> List[Dataset]: @@ -123,9 +120,6 @@ def update(self, id: str, new_description: str) -> Dataset: ), ) ds = from_dict(Dataset, res.json()) - self.logger.info( - f"updated dataset({ds.name}) with new desc: {ds.description}" - ) return ds def delete(self, id: str): diff --git a/src/axiom_py/logging.py b/src/axiom_py/logging.py index f6d0aac..ae7e024 100644 --- a/src/axiom_py/logging.py +++ b/src/axiom_py/logging.py @@ -1,9 +1,9 @@ """Logging contains the AxiomHandler and related methods to do with logging.""" +from threading import Timer +from logging import Handler, NOTSET, getLogger, WARNING import time -import atexit -from logging import Handler, NOTSET, getLogger, WARNING from .client import Client @@ -14,37 +14,54 @@ class AxiomHandler(Handler): dataset: str buffer: list interval: int - last_run: float + last_flush: float + timer: Timer def __init__(self, client: Client, dataset: str, level=NOTSET, interval=1): super().__init__() - # set urllib3 logging level to warning, check: + # Set urllib3 logging level to warning, check: # https://github.com/axiomhq/axiom-py/issues/23 - # This is a temp solution that would stop requests - # library from flooding the logs with debug messages + # This is a temp solution that would stop requests library from + # flooding the logs with debug messages getLogger("urllib3").setLevel(WARNING) self.client = client self.dataset = dataset self.buffer = [] - self.last_run = time.monotonic() self.interval = interval + self.last_flush = time.monotonic() + + # We use a threading.Timer to make sure we flush every second, even + # if no more logs are emitted. + self.timer = Timer(self.interval, self.flush) + + # Make sure we flush before the client shuts down + def before_shutdown(): + self.flush() + self.timer.cancel() - # register flush on exit, - atexit.register(self.flush) + client.before_shutdown(before_shutdown) def emit(self, record): - """emit sends a log to Axiom.""" + """Emit sends a log to Axiom.""" self.buffer.append(record.__dict__) if ( len(self.buffer) >= 1000 - or time.monotonic() - self.last_run > self.interval + or time.monotonic() - self.last_flush > self.interval ): self.flush() + # Restart timer + self.timer.cancel() + self.timer = Timer(self.interval, self.flush) + self.timer.start() + def flush(self): - """flush sends all logs in the logcache to Axiom.""" - self.last_run = time.monotonic() + """Flush sends all logs in the buffer to Axiom.""" + + self.last_flush = time.monotonic() + if len(self.buffer) == 0: return - self.client.ingest_events(self.dataset, self.buffer) - self.buffer = [] + + local_buffer, self.buffer = self.buffer, [] + self.client.ingest_events(self.dataset, local_buffer) diff --git a/tests/test_logger.py b/tests/test_logger.py index e60cc6f..d244ce1 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -3,6 +3,8 @@ import os import logging import unittest +import time + from .helpers import get_random_name from axiom_py import Client from axiom_py.logging import AxiomHandler @@ -10,35 +12,49 @@ class TestLogger(unittest.TestCase): def test_log(self): - """Tests a simple log""" + """Tests the logger""" client = Client( os.getenv("AXIOM_TOKEN"), os.getenv("AXIOM_ORG_ID"), os.getenv("AXIOM_URL"), ) - # create a dataset for that purpose + # Create a dataset for that purpose dataset_name = get_random_name() client.datasets.create( - dataset_name, "a dataset to test axiom-py logger" + dataset_name, "A dataset to test axiom-py logger" ) - axiom_handler = AxiomHandler(client, dataset_name) + axiom_handler = AxiomHandler(client, dataset_name, interval=1.0) logger = logging.getLogger() logger.addHandler(axiom_handler) - logger.warning("foo") + logger.warning("This is a log!") - # this log shouldn't be ingested yet + # This log shouldn't be ingested yet res = client.apl_query(dataset_name) self.assertEqual(0, res.status.rowsExamined) - # flush events + # Flush events axiom_handler.flush() - # now we should have a log + # Wait a bit for the ingest to finish + time.sleep(0.5) + + # Now we should have a log res = client.apl_query(dataset_name) self.assertEqual(1, res.status.rowsExamined) - # cleanup created dataset + logger.warning( + "This log should be ingested without any subsequent call" + ) + + # Wait for the background flush. + time.sleep(1.5) + + # Now we should have two logs + res = client.apl_query(dataset_name) + self.assertEqual(2, res.status.rowsExamined) + + # Cleanup created dataset client.datasets.delete(dataset_name)