Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(observability): more descriptive and value adding spans #1241

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 87 additions & 10 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ def get_tracer(tracer_provider=None):
return tracer_provider.get_tracer(TRACER_NAME, TRACER_VERSION)


@contextmanager
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
if session:
session._last_use_time = datetime.now()

if not (HAS_OPENTELEMETRY_INSTALLED and name):
# Empty context manager. Users will have to check if the generated value is None or a span
yield None
return
def _make_tracer_and_span_attributes(
session=None, extra_attributes=None, observability_options=None
):
if not HAS_OPENTELEMETRY_INSTALLED:
return None, None

tracer_provider = None

Expand Down Expand Up @@ -103,9 +99,77 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=

if not enable_extended_tracing:
attributes.pop("db.statement", False)
attributes.pop("sql", False)
else:
# Otherwise there are places where the annotated sql was inserted
# directly from the arguments as "sql", and transform those into "db.statement".
db_statement = attributes.get("db.statement", None)
if not db_statement:
sql = attributes.get("sql", None)
if sql:
attributes = attributes.copy()
attributes.pop("sql", False)
attributes["db.statement"] = sql

return tracer, attributes


def trace_call_end_lazily(
name, session=None, extra_attributes=None, observability_options=None
):
"""
trace_call_end_lazily is used in situations where you don't want a context managed
span in a with statement to end as soon as a block exits. This is useful for example
after a Database.batch or Database.snapshot but without a context manager.
If you need to directly invoke tracing with a context manager, please invoke
`trace_call` with which you can invoke
 `with trace_call(...) as span:`
It is the caller's responsibility to explicitly invoke the returned ending function.
"""
if not name:
return None

tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
return None

span = tracer.start_span(
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
)
ctx_manager = trace.use_span(span, end_on_exit=True, record_exception=True)
ctx_manager.__enter__()

def discard(exc_type=None, exc_value=None, exc_traceback=None):
if not exc_type:
span.set_status(Status(StatusCode.OK))

ctx_manager.__exit__(exc_type, exc_value, exc_traceback)

return discard


@contextmanager
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
"""
 trace_call is used in situations where you need to end a span with a context manager
 or after a scope is exited. If you need to keep a span alive and lazily end it, please
 invoke `trace_call_end_lazily`.
"""
if not name:
yield None
return

tracer, span_attributes = _make_tracer_and_span_attributes(
session, extra_attributes, observability_options
)
if not tracer:
yield None
return

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
name, kind=trace.SpanKind.CLIENT, attributes=span_attributes
) as span:
try:
yield span
Expand Down Expand Up @@ -135,3 +199,16 @@ def get_current_span():
def add_span_event(span, event_name, event_attributes=None):
if span:
span.add_event(event_name, event_attributes)


def add_event_on_current_span(event_name, event_attributes=None, span=None):
if not span:
span = get_current_span()

add_span_event(span, event_name, event_attributes)


def record_span_exception_and_status(span, exc):
if span:
span.set_status(Status(StatusCode.ERROR, str(exc)))
span.record_exception(exc)
42 changes: 40 additions & 2 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_event_on_current_span,
trace_call,
trace_call_end_lazily,
)
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1._helpers import _retry
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
Expand All @@ -46,6 +50,12 @@ class _BatchBase(_SessionWrapper):
def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []
self.__base_discard_span = trace_call_end_lazily(
f"CloudSpanner.{type(self).__name__}",
self._session,
None,
getattr(self._session._database, "observability_options", None),
)

def _check_state(self):
"""Helper for :meth:`commit` et al.
Expand All @@ -69,6 +79,10 @@ def insert(self, table, columns, values):
:type values: list of lists
:param values: Values to be modified.
"""
add_event_on_current_span(
"insert mutations added",
dict(table=table, columns=columns),
)
self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))
# TODO: Decide if we should add a span event per mutation:
# https://github.com/googleapis/python-spanner/issues/1269
Expand Down Expand Up @@ -137,6 +151,17 @@ def delete(self, table, keyset):
# TODO: Decide if we should add a span event per mutation:
# https://github.com/googleapis/python-spanner/issues/1269

def _discard_on_end(self, exc_type=None, exc_val=None, exc_traceback=None):
if self.__base_discard_span:
self.__base_discard_span(exc_type, exc_val, exc_traceback)
self.__base_discard_span = None

def __exit__(self, exc_type=None, exc_value=None, exc_traceback=None):
self._discard_on_end(exc_type, exc_val, exc_traceback)

def __enter__(self):
return self


class Batch(_BatchBase):
"""Accumulate mutations for transmission during :meth:`commit`."""
Expand Down Expand Up @@ -233,18 +258,31 @@ def commit(
)
self.committed = response.commit_timestamp
self.commit_stats = response.commit_stats
self._discard_on_end()
return self.committed

def __enter__(self):
"""Begin ``with`` block."""
self._check_state()
observability_options = getattr(
self._session._database, "observability_options", None
)
self.__discard_span = trace_call_end_lazily(
"CloudSpanner.Batch",
self._session,
observability_options=observability_options,
)

return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if exc_type is None:
self.commit()
if self.__discard_span:
self.__discard_span(exc_type, exc_val, exc_tb)
self.__discard_span = None
self._discard_on_end()


class MutationGroup(_BatchBase):
Expand Down Expand Up @@ -336,7 +374,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
)
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.BatchWrite",
"CloudSpanner.batch_write",
self._session,
trace_attributes,
observability_options=observability_options,
Expand Down
Loading