Skip to content

Commit 873cc18

Browse files
committed
observability: add updated span events + traace more methods
This change carves out parts of PR googleapis#1241 in smaller pieces to ease with smaller reviews. This change adds more span events, updates important spans to make them more distinct like changing: "CloudSpanner.ReadWriteTransaction" to more direct and more pointed spans like: * CloudSpanner.Transaction.execute_streaming_sql Also added important spans: * CloudSpanner.Database.run_in_transaction * CloudSpanner.Session.run_in_transaction
1 parent a6811af commit 873cc18

15 files changed

+489
-140
lines changed

google/cloud/spanner_v1/_opentelemetry_tracing.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,36 +56,40 @@ def get_tracer(tracer_provider=None):
5656

5757

5858
@contextmanager
59-
def trace_call(name, session, extra_attributes=None, observability_options=None):
60-
if session:
61-
session._last_use_time = datetime.now()
62-
63-
if not HAS_OPENTELEMETRY_INSTALLED or not session:
59+
def trace_call(name, session=None, extra_attributes=None, observability_options=None):
60+
if not (HAS_OPENTELEMETRY_INSTALLED and name):
6461
# Empty context manager. Users will have to check if the generated value is None or a span
6562
yield None
6663
return
6764

65+
if session:
66+
session._last_use_time = datetime.now()
67+
6868
tracer_provider = None
6969

7070
# By default enable_extended_tracing=True because in a bid to minimize
7171
# breaking changes and preserve legacy behavior, we are keeping it turned
7272
# on by default.
7373
enable_extended_tracing = True
7474

75+
db_name = ""
76+
if session and getattr(session, "_database", None):
77+
db_name = session._database.name
78+
7579
if isinstance(observability_options, dict): # Avoid false positives with mock.Mock
7680
tracer_provider = observability_options.get("tracer_provider", None)
7781
enable_extended_tracing = observability_options.get(
7882
"enable_extended_tracing", enable_extended_tracing
7983
)
84+
db_name = observability_options.get("db_name", db_name)
8085

8186
tracer = get_tracer(tracer_provider)
8287

8388
# Set base attributes that we know for every trace created
84-
db = session._database
8589
attributes = {
8690
"db.type": "spanner",
8791
"db.url": SpannerClient.DEFAULT_ENDPOINT,
88-
"db.instance": "" if not db else db.name,
92+
"db.instance": db_name,
8993
"net.host.name": SpannerClient.DEFAULT_ENDPOINT,
9094
OTEL_SCOPE_NAME: TRACER_NAME,
9195
OTEL_SCOPE_VERSION: TRACER_VERSION,
@@ -99,6 +103,17 @@ def trace_call(name, session, extra_attributes=None, observability_options=None)
99103

100104
if not enable_extended_tracing:
101105
attributes.pop("db.statement", False)
106+
attributes.pop("sql", False)
107+
else:
108+
# Otherwise there are places where the annotated sql was inserted
109+
# directly from the arguments as "sql", and transform those into "db.statement".
110+
db_statement = attributes.get("db.statement", None)
111+
if not db_statement:
112+
sql = attributes.get("sql", None)
113+
if sql:
114+
attributes = attributes.copy()
115+
attributes.pop("sql", False)
116+
attributes["db.statement"] = sql
102117

103118
with tracer.start_as_current_span(
104119
name, kind=trace.SpanKind.CLIENT, attributes=attributes
@@ -131,3 +146,16 @@ def get_current_span():
131146
def add_span_event(span, event_name, event_attributes=None):
132147
if span:
133148
span.add_event(event_name, event_attributes)
149+
150+
151+
def add_event_on_current_span(event_name, event_attributes=None, span=None):
152+
if not span:
153+
span = get_current_span()
154+
155+
add_span_event(span, event_name, event_attributes)
156+
157+
158+
def record_span_exception_and_status(span, exc):
159+
if span:
160+
span.set_status(Status(StatusCode.ERROR, str(exc)))
161+
span.record_exception(exc)

google/cloud/spanner_v1/batch.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
_metadata_with_prefix,
2727
_metadata_with_leader_aware_routing,
2828
)
29-
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
29+
from google.cloud.spanner_v1._opentelemetry_tracing import (
30+
add_event_on_current_span,
31+
trace_call,
32+
)
3033
from google.cloud.spanner_v1 import RequestOptions
3134
from google.cloud.spanner_v1._helpers import _retry
3235
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
@@ -70,6 +73,10 @@ def insert(self, table, columns, values):
7073
:param values: Values to be modified.
7174
"""
7275
self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))
76+
add_event_on_current_span(
77+
"insert mutations added",
78+
dict(table=table, columns=columns),
79+
)
7380

7481
def update(self, table, columns, values):
7582
"""Update one or more existing table rows.
@@ -84,6 +91,10 @@ def update(self, table, columns, values):
8491
:param values: Values to be modified.
8592
"""
8693
self._mutations.append(Mutation(update=_make_write_pb(table, columns, values)))
94+
add_event_on_current_span(
95+
"update mutations added",
96+
dict(table=table, columns=columns),
97+
)
8798

8899
def insert_or_update(self, table, columns, values):
89100
"""Insert/update one or more table rows.
@@ -100,6 +111,10 @@ def insert_or_update(self, table, columns, values):
100111
self._mutations.append(
101112
Mutation(insert_or_update=_make_write_pb(table, columns, values))
102113
)
114+
add_event_on_current_span(
115+
"insert_or_update mutations added",
116+
dict(table=table, columns=columns),
117+
)
103118

104119
def replace(self, table, columns, values):
105120
"""Replace one or more table rows.
@@ -114,6 +129,10 @@ def replace(self, table, columns, values):
114129
:param values: Values to be modified.
115130
"""
116131
self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values)))
132+
add_event_on_current_span(
133+
"replace mutations added",
134+
dict(table=table, columns=columns),
135+
)
117136

118137
def delete(self, table, keyset):
119138
"""Delete one or more table rows.
@@ -126,6 +145,10 @@ def delete(self, table, keyset):
126145
"""
127146
delete = Mutation.Delete(table=table, key_set=keyset._to_pb())
128147
self._mutations.append(Mutation(delete=delete))
148+
add_event_on_current_span(
149+
"delete mutations added",
150+
dict(table=table),
151+
)
129152

130153

131154
class Batch(_BatchBase):
@@ -207,7 +230,7 @@ def commit(
207230
)
208231
observability_options = getattr(database, "observability_options", None)
209232
with trace_call(
210-
"CloudSpanner.Commit",
233+
f"CloudSpanner.{type(self).__name__}.commit",
211234
self._session,
212235
trace_attributes,
213236
observability_options=observability_options,

google/cloud/spanner_v1/database.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
from google.cloud.spanner_v1._opentelemetry_tracing import (
7171
add_span_event,
7272
get_current_span,
73+
trace_call,
7374
)
7475

7576

@@ -720,6 +721,7 @@ def execute_pdml():
720721

721722
iterator = _restart_on_unavailable(
722723
method=method,
724+
trace_name="CloudSpanner.ExecuteStreamingSql",
723725
request=request,
724726
transaction_selector=txn_selector,
725727
observability_options=self.observability_options,
@@ -881,20 +883,25 @@ def run_in_transaction(self, func, *args, **kw):
881883
:raises Exception:
882884
reraises any non-ABORT exceptions raised by ``func``.
883885
"""
884-
# Sanity check: Is there a transaction already running?
885-
# If there is, then raise a red flag. Otherwise, mark that this one
886-
# is running.
887-
if getattr(self._local, "transaction_running", False):
888-
raise RuntimeError("Spanner does not support nested transactions.")
889-
self._local.transaction_running = True
890-
891-
# Check out a session and run the function in a transaction; once
892-
# done, flip the sanity check bit back.
893-
try:
894-
with SessionCheckout(self._pool) as session:
895-
return session.run_in_transaction(func, *args, **kw)
896-
finally:
897-
self._local.transaction_running = False
886+
observability_options = getattr(self, "observability_options", None)
887+
with trace_call(
888+
"CloudSpanner.Database.run_in_transaction",
889+
observability_options=observability_options,
890+
):
891+
# Sanity check: Is there a transaction already running?
892+
# If there is, then raise a red flag. Otherwise, mark that this one
893+
# is running.
894+
if getattr(self._local, "transaction_running", False):
895+
raise RuntimeError("Spanner does not support nested transactions.")
896+
self._local.transaction_running = True
897+
898+
# Check out a session and run the function in a transaction; once
899+
# done, flip the sanity check bit back.
900+
try:
901+
with SessionCheckout(self._pool) as session:
902+
return session.run_in_transaction(func, *args, **kw)
903+
finally:
904+
self._local.transaction_running = False
898905

899906
def restore(self, source):
900907
"""Restore from a backup to this database.
@@ -1120,7 +1127,12 @@ def observability_options(self):
11201127
if not (self._instance and self._instance._client):
11211128
return None
11221129

1123-
return getattr(self._instance._client, "observability_options", None)
1130+
opts = getattr(self._instance._client, "observability_options", None)
1131+
if not opts:
1132+
opts = dict()
1133+
1134+
opts["db_name"] = self.name
1135+
return opts
11241136

11251137

11261138
class BatchCheckout(object):

google/cloud/spanner_v1/pool.py

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from google.cloud.spanner_v1._opentelemetry_tracing import (
2929
add_span_event,
3030
get_current_span,
31+
trace_call,
3132
)
3233
from warnings import warn
3334

@@ -237,29 +238,41 @@ def bind(self, database):
237238
session_template=Session(creator_role=self.database_role),
238239
)
239240

240-
returned_session_count = 0
241-
while not self._sessions.full():
242-
request.session_count = requested_session_count - self._sessions.qsize()
241+
observability_options = getattr(self._database, "observability_options", None)
242+
with trace_call(
243+
"Cloudspanner.FixedPool.BatchCreateSessions",
244+
observability_options=observability_options,
245+
) as span:
246+
returned_session_count = 0
247+
while not self._sessions.full():
248+
request.session_count = requested_session_count - self._sessions.qsize()
249+
add_span_event(
250+
span,
251+
f"Creating {request.session_count} sessions",
252+
span_event_attributes,
253+
)
254+
resp = api.batch_create_sessions(
255+
request=request,
256+
metadata=metadata,
257+
)
258+
259+
add_span_event(
260+
span,
261+
"Created sessions",
262+
dict(count=len(resp.session)),
263+
)
264+
265+
for session_pb in resp.session:
266+
session = self._new_session()
267+
session._session_id = session_pb.name.split("/")[-1]
268+
self._sessions.put(session)
269+
returned_session_count += 1
270+
243271
add_span_event(
244272
span,
245-
f"Creating {request.session_count} sessions",
273+
f"Requested for {requested_session_count} sessions, returned {returned_session_count}",
246274
span_event_attributes,
247275
)
248-
resp = api.batch_create_sessions(
249-
request=request,
250-
metadata=metadata,
251-
)
252-
for session_pb in resp.session:
253-
session = self._new_session()
254-
session._session_id = session_pb.name.split("/")[-1]
255-
self._sessions.put(session)
256-
returned_session_count += 1
257-
258-
add_span_event(
259-
span,
260-
f"Requested for {requested_session_count} sessions, returned {returned_session_count}",
261-
span_event_attributes,
262-
)
263276

264277
def get(self, timeout=None):
265278
"""Check a session out from the pool.
@@ -550,25 +563,30 @@ def bind(self, database):
550563
span_event_attributes,
551564
)
552565

553-
returned_session_count = 0
554-
while created_session_count < self.size:
555-
resp = api.batch_create_sessions(
556-
request=request,
557-
metadata=metadata,
558-
)
559-
for session_pb in resp.session:
560-
session = self._new_session()
561-
session._session_id = session_pb.name.split("/")[-1]
562-
self.put(session)
563-
returned_session_count += 1
566+
observability_options = getattr(self._database, "observability_options", None)
567+
with trace_call(
568+
"Cloudspanner.PingingPool.BatchCreateSessions",
569+
observability_options=observability_options,
570+
) as span:
571+
returned_session_count = 0
572+
while created_session_count < self.size:
573+
resp = api.batch_create_sessions(
574+
request=request,
575+
metadata=metadata,
576+
)
577+
for session_pb in resp.session:
578+
session = self._new_session()
579+
session._session_id = session_pb.name.split("/")[-1]
580+
self.put(session)
581+
returned_session_count += 1
564582

565-
created_session_count += len(resp.session)
583+
created_session_count += len(resp.session)
566584

567-
add_span_event(
568-
current_span,
569-
f"Requested for {requested_session_count} sessions, return {returned_session_count}",
570-
span_event_attributes,
571-
)
585+
add_span_event(
586+
span,
587+
f"Requested for {requested_session_count} sessions, return {returned_session_count}",
588+
span_event_attributes,
589+
)
572590

573591
def get(self, timeout=None):
574592
"""Check a session out from the pool.

0 commit comments

Comments
 (0)