Skip to content

Commit

Permalink
Add trace span helper add_span_event to check if a span is non-None b…
Browse files Browse the repository at this point in the history
…efore setting event
  • Loading branch information
odeke-em committed Nov 21, 2024
1 parent 6f400f9 commit 9242220
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 113 deletions.
5 changes: 5 additions & 0 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,8 @@ def get_current_span():
if not HAS_OPENTELEMETRY_INSTALLED:
return None
return trace.get_current_span()


def add_span_event(span, commentary, event_attributes=None):
if span:
span.add_event(commentary, event_attributes)
17 changes: 10 additions & 7 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@
SpannerGrpcTransport,
)
from google.cloud.spanner_v1.table import Table
from google.cloud.spanner_v1._opentelemetry_tracing import get_current_span
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_span_event,
get_current_span,
)


SPANNER_DATA_SCOPE = "https://www.googleapis.com/auth/spanner.data"
Expand Down Expand Up @@ -1167,8 +1170,7 @@ def __enter__(self):
"""Begin ``with`` block."""
current_span = get_current_span()
session = self._session = self._database._pool.get()
if current_span:
current_span.add_event("Using session", {"id": session.session_id})
add_span_event(current_span, "Using session", {"id": session.session_id})
batch = self._batch = Batch(session)
if self._request_options.transaction_tag:
batch.transaction_tag = self._request_options.transaction_tag
Expand All @@ -1192,10 +1194,11 @@ def __exit__(self, exc_type, exc_val, exc_tb):
)
self._database._pool.put(self._session)
current_span = get_current_span()
if current_span:
current_span.add_event(
"Returned session to pool", {"id": self._session.session_id}
)
add_span_event(
current_span,
"Returned session to pool",
{"id": self._session.session_id},
)


class MutationGroupsCheckout(object):
Expand Down
157 changes: 80 additions & 77 deletions google/cloud/spanner_v1/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_span_event,
get_current_span,
trace_call,
)
Expand Down Expand Up @@ -214,27 +215,27 @@ def bind(self, database):
session_template=Session(creator_role=self.database_role),
)

if requested_session_count > 0 and span:
span.add_event(
if requested_session_count > 0:
add_span_event(
span,
f"Requesting {requested_session_count} sessions",
span_event_attributes,
)

if self._sessions.full():
if span:
span.add_event(
"Session pool is already full", span_event_attributes
)
add_span_event(
span, "Session pool is already full", span_event_attributes
)
return

returned_session_count = 0
while not self._sessions.full():
request.session_count = requested_session_count - self._sessions.qsize()
if span:
span.add_event(
f"Creating {request.session_count} sessions",
span_event_attributes,
)
add_span_event(
span,
f"Creating {request.session_count} sessions",
span_event_attributes,
)
resp = api.batch_create_sessions(
request=request,
metadata=metadata,
Expand All @@ -245,11 +246,11 @@ def bind(self, database):
self._sessions.put(session)
returned_session_count += 1

if span:
span.add_event(
f"Requested for {requested_session_count} sessions, returned {returned_session_count}",
span_event_attributes,
)
add_span_event(
span,
f"Requested for {requested_session_count} sessions, returned {returned_session_count}",
span_event_attributes,
)

def get(self, timeout=None):
"""Check a session out from the pool.
Expand All @@ -268,35 +269,34 @@ def get(self, timeout=None):
start_time = time.time()
current_span = get_current_span()
span_event_attributes = {"kind": type(self).__name__}
if current_span:
current_span.add_event("Acquiring session", span_event_attributes)
add_span_event(current_span, "Acquiring session", span_event_attributes)

session = None
try:
if current_span:
current_span.add_event(
"Waiting for a session to become available", span_event_attributes
)
add_span_event(
current_span,
"Waiting for a session to become available",
span_event_attributes,
)
session = self._sessions.get(block=True, timeout=timeout)
except queue.Empty as e:
if current_span:
current_span.add_event("No session available", span_event_attributes)
add_span_event(current_span, "No session available", span_event_attributes)
raise e

span_event_attributes["session.id"] = session._session_id

if not session.exists():
if current_span:
current_span.add_event(
"Session is not valid, recreating it", span_event_attributes
)
add_span_event(
current_span,
"Session is not valid, recreating it",
span_event_attributes,
)
session = self._database.session()
session.create()
span_event_attributes["session.id"] = session._session_id

span_event_attributes["time.elapsed"] = time.time() - start_time
if current_span:
current_span.add_event("Acquired session", span_event_attributes)
add_span_event(current_span, "Acquired session", span_event_attributes)
return session

def put(self, session):
Expand Down Expand Up @@ -371,28 +371,30 @@ def get(self):
"""
current_span = get_current_span()
span_event_attributes = {"kind": type(self).__name__}
if current_span:
current_span.add_event("Acquiring session", span_event_attributes)
add_span_event(current_span, "Acquiring session", span_event_attributes)

try:
if current_span:
current_span.add_event(
"Waiting for a session to become available", span_event_attributes
)
add_span_event(
current_span,
"Waiting for a session to become available",
span_event_attributes,
)
session = self._sessions.get_nowait()
except queue.Empty:
if current_span:
current_span.add_event(
"No session available. Creating session", span_event_attributes
)
add_span_event(
current_span,
"No session available. Creating session",
span_event_attributes,
)
session = self._new_session()
session.create()
else:
if not session.exists():
if current_span:
current_span.add_event(
"Session is not valid, recreating it", span_event_attributes
)
add_span_event(
current_span,
"Session is not valid, recreating it",
span_event_attributes,
)
session = self._new_session()
session.create()
return session
Expand Down Expand Up @@ -504,23 +506,25 @@ def bind(self, database):

requested_session_count = request.session_count
current_span = get_current_span()
if current_span:
current_span.add_event(
f"Requesting {requested_session_count} sessions", span_event_attributes
)
add_span_event(
current_span,
f"Requesting {requested_session_count} sessions",
span_event_attributes,
)

if created_session_count >= self.size:
if current_span:
current_span.add_event(
"Created no new sessions as sessionPool is full",
span_event_attributes,
)
add_span_event(
current_span,
"Created no new sessions as sessionPool is full",
span_event_attributes,
)
return

if current_span:
current_span.add_event(
f"Creating {request.session_count} sessions", span_event_attributes
)
add_span_event(
current_span,
f"Creating {request.session_count} sessions",
span_event_attributes,
)

returned_session_count = 0
while created_session_count < self.size:
Expand All @@ -536,11 +540,11 @@ def bind(self, database):

created_session_count += len(resp.session)

if current_span:
current_span.add_event(
f"Requested for {requested_session_count} sessions, return {returned_session_count}",
span_event_attributes,
)
add_span_event(
current_span,
f"Requested for {requested_session_count} sessions, return {returned_session_count}",
span_event_attributes,
)

def get(self, timeout=None):
"""Check a session out from the pool.
Expand All @@ -559,18 +563,18 @@ def get(self, timeout=None):
start_time = time.time()
span_event_attributes = {"kind": type(self).__name__}
current_span = get_current_span()
if current_span:
current_span.add_event(
"Waiting for a session to become available", span_event_attributes
)
add_span_event(
current_span,
"Waiting for a session to become available",
span_event_attributes,
)

ping_after = None
session = None
try:
ping_after, session = self._sessions.get(block=True, timeout=timeout)
except queue.Empty as e:
if current_span:
current_span.add_event("No session available", span_event_attributes)
add_span_event(current_span, "No session available", span_event_attributes)
raise e

if _NOW() > ping_after:
Expand All @@ -581,15 +585,14 @@ def get(self, timeout=None):
session = self._new_session()
session.create()

if current_span:
span_event_attributes.update(
{
"time.elapsed": time.time() - start_time,
"session.id": session._session_id,
"kind": "pinging_pool",
}
)
current_span.add_event("Acquired session", span_event_attributes)
span_event_attributes.update(
{
"time.elapsed": time.time() - start_time,
"session.id": session._session_id,
"kind": "pinging_pool",
}
)
add_span_event(current_span, "Acquired session", span_event_attributes)
return session

def put(self, session):
Expand Down
27 changes: 14 additions & 13 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import (
add_span_event,
get_current_span,
trace_call,
)
Expand Down Expand Up @@ -128,8 +129,7 @@ def create(self):
:raises ValueError: if :attr:`session_id` is already set.
"""
current_span = get_current_span()
if current_span:
current_span.add_event("Creating Session")
add_span_event(current_span, "Creating Session")

if self._session_id is not None:
raise ValueError("Session ID already set by back-end")
Expand Down Expand Up @@ -173,13 +173,14 @@ def exists(self):
"""
current_span = get_current_span()
if self._session_id is None:
current_span.add_event("Checking if Session failed due to unset session_id")
add_span_event(
current_span, "Checking if Session failed due to unset session_id"
)
return False

if current_span:
current_span.add_event(
"Checking if Session exists", {"session.id": self._session_id}
)
add_span_event(
current_span, "Checking if Session exists", {"session.id": self._session_id}
)

api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
Expand Down Expand Up @@ -216,14 +217,14 @@ def delete(self):
"""
current_span = get_current_span()
if self._session_id is None:
if current_span:
current_span.add_event(
"Deleting Session failed due to unset session_id"
)
add_span_event(
current_span, "Deleting Session failed due to unset session_id"
)
raise ValueError("Session ID not set by back-end")

if current_span:
current_span.add_event("Deleting Session", {"session.id": self._session_id})
add_span_event(
current_span, "Deleting Session", {"session.id": self._session_id}
)

api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/spanner_v1/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def _get_streamed_result_set(
iterator = _restart_on_unavailable(
restart,
request,
"CloudSpanner.execute_sql",
"CloudSpanner.ReadWriteTransaction",
self._session,
trace_attributes,
transaction=self,
Expand Down
Loading

0 comments on commit 9242220

Please sign in to comment.