-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: added client-side instrumentation to all rpcs #925
base: client_side_metrics_handlers
Are you sure you want to change the base?
Changes from all commits
2955411
2ba1321
87bd1db
6f79b9a
d736503
20260dc
1058fad
67b545e
67a6fcd
ef0fdd8
7ee253e
28d162b
6caba34
8ac92d0
7a95c4a
c96b534
86159a4
89c5216
fcd3aaa
f2528ae
eb3aae1
1a08f1a
fdc2e3b
e428bf0
1bf566c
ca0963a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,10 @@ | |
from typing import ( | ||
TYPE_CHECKING, | ||
AsyncGenerator, | ||
AsyncIterable, | ||
Awaitable, | ||
Sequence, | ||
) | ||
import time | ||
|
||
from google.cloud.bigtable_v2.types import ReadRowsRequest as ReadRowsRequestPB | ||
from google.cloud.bigtable_v2.types import ReadRowsResponse as ReadRowsResponsePB | ||
|
@@ -34,13 +34,16 @@ | |
from google.cloud.bigtable.data.exceptions import _RowSetComplete | ||
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator | ||
from google.cloud.bigtable.data._helpers import _make_metadata | ||
from google.cloud.bigtable.data._helpers import backoff_generator | ||
|
||
from google.api_core.grpc_helpers_async import GrpcAsyncStream | ||
from google.cloud.bigtable.data._helpers import _retry_exception_factory | ||
|
||
from google.api_core import retry as retries | ||
from google.api_core.retry import exponential_sleep_generator | ||
|
||
if TYPE_CHECKING: | ||
from google.cloud.bigtable.data._async.client import TableAsync | ||
from google.cloud.bigtable.data._metrics import ActiveOperationMetric | ||
|
||
|
||
class _ResetRow(Exception): | ||
|
@@ -70,6 +73,7 @@ class _ReadRowsOperationAsync: | |
"_metadata", | ||
"_last_yielded_row_key", | ||
"_remaining_count", | ||
"_operation_metrics", | ||
) | ||
|
||
def __init__( | ||
|
@@ -78,6 +82,7 @@ def __init__( | |
table: "TableAsync", | ||
operation_timeout: float, | ||
attempt_timeout: float, | ||
metrics: ActiveOperationMetric, | ||
retryable_exceptions: Sequence[type[Exception]] = (), | ||
): | ||
self.attempt_timeout_gen = _attempt_timeout_generator( | ||
|
@@ -100,17 +105,26 @@ def __init__( | |
) | ||
self._last_yielded_row_key: bytes | None = None | ||
self._remaining_count: int | None = self.request.rows_limit or None | ||
self._operation_metrics = metrics | ||
|
||
def start_operation(self) -> AsyncGenerator[Row, None]: | ||
""" | ||
Start the read_rows operation, retrying on retryable errors. | ||
""" | ||
sleep_generator = backoff_generator() | ||
self._operation_metrics.backoff_generator = sleep_generator | ||
|
||
# Metrics: | ||
# track attempt failures using build_wrapped_fn_handlers() for raised exceptions | ||
# and operation timeouts | ||
metric_fns = self._operation_metrics.build_wrapped_fn_handlers(self._predicate) | ||
metric_predicate, metric_exc_factory = metric_fns | ||
return retries.retry_target_stream_async( | ||
self._read_rows_attempt, | ||
self._predicate, | ||
exponential_sleep_generator(0.01, 60, multiplier=2), | ||
metric_fns[0], | ||
sleep_generator, | ||
self.operation_timeout, | ||
exception_factory=_retry_exception_factory, | ||
exception_factory=metric_fns[1], | ||
) | ||
|
||
def _read_rows_attempt(self) -> AsyncGenerator[Row, None]: | ||
|
@@ -120,6 +134,8 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]: | |
which will call this function until it succeeds or | ||
a non-retryable error is raised. | ||
""" | ||
# register metric start | ||
self._operation_metrics.start_attempt() | ||
# revise request keys and ranges between attempts | ||
if self._last_yielded_row_key is not None: | ||
# if this is a retry, try to trim down the request to avoid ones we've already processed | ||
|
@@ -130,12 +146,12 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]: | |
) | ||
except _RowSetComplete: | ||
# if we've already seen all the rows, we're done | ||
return self.merge_rows(None) | ||
return self.merge_rows(None, self._operation_metrics) | ||
# revise the limit based on number of rows already yielded | ||
if self._remaining_count is not None: | ||
self.request.rows_limit = self._remaining_count | ||
if self._remaining_count == 0: | ||
return self.merge_rows(None) | ||
return self.merge_rows(None, self._operation_metrics) | ||
# create and return a new row merger | ||
gapic_stream = self.table.client._gapic_client.read_rows( | ||
self.request, | ||
|
@@ -144,70 +160,82 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]: | |
retry=None, | ||
) | ||
chunked_stream = self.chunk_stream(gapic_stream) | ||
return self.merge_rows(chunked_stream) | ||
return self.merge_rows(chunked_stream, self._operation_metrics) | ||
|
||
async def chunk_stream( | ||
self, stream: Awaitable[AsyncIterable[ReadRowsResponsePB]] | ||
self, stream: Awaitable[GrpcAsyncStream[ReadRowsResponsePB]] | ||
) -> AsyncGenerator[ReadRowsResponsePB.CellChunk, None]: | ||
""" | ||
process chunks out of raw read_rows stream | ||
""" | ||
async for resp in await stream: | ||
# extract proto from proto-plus wrapper | ||
resp = resp._pb | ||
call = await stream | ||
try: | ||
async for resp in call: | ||
# extract proto from proto-plus wrapper | ||
resp = resp._pb | ||
|
||
# handle last_scanned_row_key packets, sent when server | ||
# has scanned past the end of the row range | ||
if resp.last_scanned_row_key: | ||
if ( | ||
self._last_yielded_row_key is not None | ||
and resp.last_scanned_row_key <= self._last_yielded_row_key | ||
): | ||
raise InvalidChunk("last scanned out of order") | ||
self._last_yielded_row_key = resp.last_scanned_row_key | ||
|
||
# handle last_scanned_row_key packets, sent when server | ||
# has scanned past the end of the row range | ||
if resp.last_scanned_row_key: | ||
if ( | ||
self._last_yielded_row_key is not None | ||
and resp.last_scanned_row_key <= self._last_yielded_row_key | ||
): | ||
raise InvalidChunk("last scanned out of order") | ||
self._last_yielded_row_key = resp.last_scanned_row_key | ||
|
||
current_key = None | ||
# process each chunk in the response | ||
for c in resp.chunks: | ||
if current_key is None: | ||
current_key = c.row_key | ||
current_key = None | ||
# process each chunk in the response | ||
for c in resp.chunks: | ||
if current_key is None: | ||
raise InvalidChunk("first chunk is missing a row key") | ||
elif ( | ||
self._last_yielded_row_key | ||
and current_key <= self._last_yielded_row_key | ||
): | ||
raise InvalidChunk("row keys should be strictly increasing") | ||
current_key = c.row_key | ||
if current_key is None: | ||
raise InvalidChunk("first chunk is missing a row key") | ||
elif ( | ||
self._last_yielded_row_key | ||
and current_key <= self._last_yielded_row_key | ||
): | ||
raise InvalidChunk("row keys should be strictly increasing") | ||
|
||
yield c | ||
yield c | ||
|
||
if c.reset_row: | ||
current_key = None | ||
elif c.commit_row: | ||
# update row state after each commit | ||
self._last_yielded_row_key = current_key | ||
if self._remaining_count is not None: | ||
self._remaining_count -= 1 | ||
if self._remaining_count < 0: | ||
raise InvalidChunk("emit count exceeds row limit") | ||
current_key = None | ||
if c.reset_row: | ||
current_key = None | ||
elif c.commit_row: | ||
# update row state after each commit | ||
self._last_yielded_row_key = current_key | ||
if self._remaining_count is not None: | ||
self._remaining_count -= 1 | ||
if self._remaining_count < 0: | ||
raise InvalidChunk("emit count exceeds row limit") | ||
current_key = None | ||
mutianf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
finally: | ||
# ensure stream is closed | ||
call.cancel() | ||
# send trailing metadata to metrics | ||
metadata = await call.trailing_metadata() + await call.initial_metadata() | ||
self._operation_metrics.add_response_metadata(metadata) | ||
|
||
@staticmethod | ||
async def merge_rows( | ||
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None | ||
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None, | ||
operation: ActiveOperationMetric, | ||
): | ||
""" | ||
Merge chunks into rows | ||
""" | ||
if chunks is None: | ||
operation.end_with_success() | ||
return | ||
it = chunks.__aiter__() | ||
is_first_row = True | ||
# For each row | ||
while True: | ||
try: | ||
c = await it.__anext__() | ||
except StopAsyncIteration: | ||
# stream complete | ||
operation.end_with_success() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this happen when customer cancels the read in the middle of a stream? |
||
return | ||
row_key = c.row_key | ||
|
||
|
@@ -284,7 +312,17 @@ async def merge_rows( | |
Cell(value, row_key, family, qualifier, ts, list(labels)) | ||
) | ||
if c.commit_row: | ||
if is_first_row: | ||
# record first row latency in metrics | ||
is_first_row = False | ||
operation.attempt_first_response() | ||
block_time = time.monotonic() | ||
yield Row(row_key, cells) | ||
# most metric operations use setters, but this one updates | ||
# the value directly to avoid extra overhead | ||
operation.active_attempt.application_blocking_time_ms += ( # type: ignore | ||
time.monotonic() - block_time | ||
) * 1000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we record everything in nanoseconds and convert them to milliseconds instead? |
||
break | ||
c = await it.__anext__() | ||
except _ResetRow as e: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to call cancel in the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is calling cancel on the grpc stream. I think this is in case we encounter a client error; we need to stop the stream so it'll give us the trailing metadata
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What client error would happen? I'm just wondering if this will introduce some bugs (like not consuming all the results or cancelling the grpc stream twice)