Skip to content

Commit

Permalink
Use add_event_on_current_span helper more
Browse files Browse the repository at this point in the history
  • Loading branch information
odeke-em committed Dec 2, 2024
1 parent 47ffdfc commit 54bed9c
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 45 deletions.
8 changes: 5 additions & 3 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ def get_current_span():
return trace.get_current_span()


def add_event_on_current_span(self, event_name, attributes=None):
current_span = get_current_span()
def add_event_on_current_span(self, event_name, attributes=None, current_span=None):
if not current_span:
current_span = get_current_span()

if current_span:
current_span.add_event(event_commentary, attributes)
current_span.add_event(event_name, attributes)
32 changes: 18 additions & 14 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,11 @@ class _BatchBase(_SessionWrapper):
def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []
observability_options = getattr(
self._session._database, "observability_options", None
)
self.__span = trace_call_end_lazily(
"CloudSpannerX." + type(self).__name__,
f"CloudSpanner.{type(self).__name}",
self._session,
observability_options=observability_options,
None,
getattr(self._session._database, "observability_options", None),
)

def _check_state(self):
Expand All @@ -81,8 +79,10 @@ def insert(self, table, columns, values):
:type values: list of lists
:param values: Values to be modified.
"""
add_event_on_span(
self.__span, "insert mutations added", dict(table=table, columns=columns)
add_event_on_current_span(
"insert mutations added",
dict(table=table, columns=columns),
self.__span,
)
self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))

Expand All @@ -99,8 +99,10 @@ def update(self, table, columns, values):
:param values: Values to be modified.
"""
self._mutations.append(Mutation(update=_make_write_pb(table, columns, values)))
add_event_on_span(
self.__span, "update mutations added", dict(table=table, columns=columns)
add_event_on_current_span(
"update mutations added",
dict(table=table, columns=columns),
self.__span,
)

def insert_or_update(self, table, columns, values):
Expand All @@ -118,10 +120,10 @@ def insert_or_update(self, table, columns, values):
self._mutations.append(
Mutation(insert_or_update=_make_write_pb(table, columns, values))
)
add_event_on_span(
self.__span,
add_event_on_current_span(
"insert_or_update mutations added",
dict(table=table, columns=columns),
self.__span,
)

def replace(self, table, columns, values):
Expand All @@ -137,8 +139,8 @@ def replace(self, table, columns, values):
:param values: Values to be modified.
"""
self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values)))
add_event_on_span(
self.__span, "replace mutations added", dict(table=table, columns=columns)
add_event_on_current_span(
"replace mutations added", dict(table=table, columns=columns), self.__span
)

def delete(self, table, keyset):
Expand All @@ -152,7 +154,9 @@ def delete(self, table, keyset):
"""
delete = Mutation.Delete(table=table, key_set=keyset._to_pb())
self._mutations.append(Mutation(delete=delete))
add_event_on_span(self.__span, "delete mutations added", dict(table=table))
add_event_on_current_span(
"delete mutations added", dict(table=table), self.__span
)


class Batch(_BatchBase):
Expand Down
20 changes: 11 additions & 9 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import (
trace_call,
trace_call_end_lazily,
add_event_on_current_span,
set_span_status_error,
set_span_status_ok,
trace_call,
trace_call_end_lazily,
)
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.batch import MutationGroups
Expand Down Expand Up @@ -701,16 +702,17 @@ def execute_partitioned_dml(

def execute_pdml():
def do_execute_pdml(session, span):
if span:
span.add_event("Starting BeginTransaction")
add_event_on_current_span(
"Starting BeginTransaction", current_span=span
)
txn = api.begin_transaction(
session=session.name, options=txn_options, metadata=metadata
)
if span:
span.add_event(
"Completed BeginTransaction", {"transaction.id": txn.id}
)

add_event_on_current_span(
"Completed BeginTransaction",
{"transaction.id": txn.id},
current_span=span,
)
txn_selector = TransactionSelector(id=txn.id)

request = ExecuteSqlRequest(
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/spanner_v1/merged_result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@

if TYPE_CHECKING:
from google.cloud.spanner_v1.database import BatchSnapshot
from google.cloud.spanner_v1._opentelemetry_tracing import (
trace_call,
)

QUEUE_SIZE_PER_WORKER = 32
MAX_PARALLELISM = 16
Expand Down
41 changes: 23 additions & 18 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_event_on_current_span,
trace_call,
)
from google.cloud.spanner_v1.batch import Batch
from google.cloud.spanner_v1.snapshot import Snapshot
from google.cloud.spanner_v1.transaction import Transaction
Expand Down Expand Up @@ -431,8 +434,7 @@ def run_in_transaction(self, func, *args, **kw):
) as span:
while True:
if self._transaction is None:
if span:
span.add_event("Creating Transaction")
add_event_on_current_span("Creating Transaction", span=span)
txn = self.transaction()
txn.transaction_tag = transaction_tag
txn.exclude_txn_from_change_streams = (
Expand All @@ -445,8 +447,7 @@ def run_in_transaction(self, func, *args, **kw):

txn_id = getattr(txn, "_transaction_id", None) or ""
span_attributes = {"transaction.id": txn_id, "attempt": attempts}
if span:
span.add_event("Using Transaction", span_attributes)
add_event_on_current_span("Using Transaction", span_attributes, span)

try:
return_value = func(txn, *args, **kw)
Expand All @@ -456,24 +457,26 @@ def run_in_transaction(self, func, *args, **kw):
delay_seconds = _get_retry_delay(exc.errors[0], attempts)
attributes = dict(delay_seconds=delay_seconds)
attributes.update(span_attributes)
span.add_event("Transaction was aborted, retrying", attributes)
add_event_on_current_span(
"Transaction was aborted, retrying", attributes, span
)

_delay_until_retry(exc, deadline, attempts)
continue
except GoogleAPICallError:
del self._transaction
if span:
span.add_event(
"Transaction.commit failed due to GoogleAPICallError, not retrying",
span_attributes,
)
add_event_on_current_span(
"Transaction.commit failed due to GoogleAPICallError, not retrying",
span_attributes,
span,
)
raise
except Exception:
if span:
span.add_event(
"Invoking Transaction.rollback(), not retrying",
span_attributes,
)
add_event_on_current_span(
"Invoking Transaction.rollback(), not retrying",
span_attributes,
span,
)
txn.rollback()
raise

Expand All @@ -489,18 +492,20 @@ def run_in_transaction(self, func, *args, **kw):
delay_seconds = _get_retry_delay(exc.errors[0], attempts)
attributes = dict(delay_seconds=delay_seconds)
attributes.update(span_attributes)
span.add_event(
add_event_on_current_span(
"Transaction.commit was aborted, retrying afresh",
attributes,
span,
)

_delay_until_retry(exc, deadline, attempts)
except GoogleAPICallError:
del self._transaction
if span:
span.add_event(
add_event_on_current_span(
"Transaction.commit failed due to GoogleAPICallError, not retrying",
span_attributes,
span,
)
raise
else:
Expand Down
1 change: 0 additions & 1 deletion google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,6 @@ def execute_sql(
)
else:
return self._get_streamed_result_set(
span_name,
restart,
request,
trace_attributes,
Expand Down

0 comments on commit 54bed9c

Please sign in to comment.