Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added client-side instrumentation to all rpcs #925

Open
wants to merge 26 commits into
base: client_side_metrics_handlers
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
2955411
feat: added instrumentation to all rpcs
daniel-sanche Jan 26, 2024
2ba1321
added system test for attempt_latencies metrics
daniel-sanche Jan 31, 2024
87bd1db
added tests for other latency types
daniel-sanche Jan 31, 2024
6f79b9a
added system tests for all latency metrics
daniel-sanche Jan 31, 2024
d736503
added tests for counts
daniel-sanche Jan 31, 2024
20260dc
Merge branch 'client_side_metrics_handlers' into client_side_metrics_…
daniel-sanche Jan 31, 2024
1058fad
Merge branch 'client_side_metrics_handlers' into client_side_metrics_…
daniel-sanche Feb 1, 2024
67b545e
catch timed out operations
daniel-sanche Feb 2, 2024
67a6fcd
fixed bug in retry detection
daniel-sanche Feb 2, 2024
ef0fdd8
reworking metric system tests
daniel-sanche Feb 3, 2024
7ee253e
got first test working
daniel-sanche Feb 3, 2024
28d162b
fixed test
daniel-sanche Feb 5, 2024
6caba34
added system tests
daniel-sanche Feb 6, 2024
8ac92d0
fixed bug in parsing bulk mutations errors
daniel-sanche Feb 6, 2024
7a95c4a
fixed up test
daniel-sanche Feb 6, 2024
c96b534
improved tests
daniel-sanche Feb 6, 2024
86159a4
added missing end_op
daniel-sanche Feb 7, 2024
89c5216
fixed test
daniel-sanche Feb 7, 2024
fcd3aaa
fixed lint
daniel-sanche Feb 7, 2024
f2528ae
combined wrapped predicate with wrapped exc factory
daniel-sanche Feb 7, 2024
eb3aae1
fixed blacken
daniel-sanche Feb 8, 2024
1a08f1a
fixed failing test
daniel-sanche Feb 8, 2024
fdc2e3b
improved exception parsing
daniel-sanche Feb 8, 2024
e428bf0
removed export interval option
daniel-sanche Feb 9, 2024
1bf566c
changed buckets
daniel-sanche Feb 9, 2024
ca0963a
Merge branch 'client_side_metrics_handlers' into client_side_metrics_…
daniel-sanche Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 50 additions & 21 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory
from google.cloud.bigtable.data._helpers import backoff_generator

# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
Expand All @@ -35,6 +36,7 @@
)
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data._async.client import TableAsync
from google.cloud.bigtable.data._metrics import ActiveOperationMetric


@dataclass
Expand Down Expand Up @@ -65,6 +67,7 @@ def __init__(
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
attempt_timeout: float | None,
metrics: ActiveOperationMetric,
retryable_exceptions: Sequence[type[Exception]] = (),
):
"""
Expand All @@ -75,6 +78,8 @@ def __init__(
- operation_timeout: the timeout to use for the entire operation, in seconds.
- attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
- metrics: the metrics object to use for tracking the operation
- retryable_exceptions: a list of exceptions that should be retried
"""
# check that mutations are within limits
total_mutations = sum(len(entry.mutations) for entry in mutation_entries)
Expand All @@ -100,7 +105,7 @@ def __init__(
# Entry level errors
bt_exceptions._MutateRowsIncomplete,
)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
sleep_generator = backoff_generator(0.01, 2, 60)
self._operation = retries.retry_target_async(
self._run_attempt,
self.is_retryable,
Expand All @@ -115,6 +120,9 @@ def __init__(
self.mutations = [_EntryWithProto(m, m._to_pb()) for m in mutation_entries]
self.remaining_indices = list(range(len(self.mutations)))
self.errors: dict[int, list[Exception]] = {}
# set up metrics
metrics.backoff_generator = sleep_generator
self._operation_metrics = metrics

async def start(self):
"""
Expand All @@ -136,9 +144,11 @@ async def start(self):
all_errors: list[Exception] = []
for idx, exc_list in self.errors.items():
if len(exc_list) == 0:
raise core_exceptions.ClientError(
exc = core_exceptions.ClientError(
f"Mutation {idx} failed with no associated errors"
)
self._operation_metrics.end_with_status(exc)
raise exc
elif len(exc_list) == 1:
cause_exc = exc_list[0]
else:
Expand All @@ -148,9 +158,13 @@ async def start(self):
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
)
if all_errors:
raise bt_exceptions.MutationsExceptionGroup(
combined_exc = bt_exceptions.MutationsExceptionGroup(
all_errors, len(self.mutations)
)
self._operation_metrics.end_with_status(combined_exc)
raise combined_exc
else:
self._operation_metrics.end_with_success()

async def _run_attempt(self):
"""
Expand All @@ -161,6 +175,8 @@ async def _run_attempt(self):
retry after the attempt is complete
- GoogleAPICallError: if the gapic rpc fails
"""
# register attempt start
self._operation_metrics.start_attempt()
request_entries = [self.mutations[idx].proto for idx in self.remaining_indices]
# track mutations in this request that have not been finalized yet
active_request_indices = {
Expand All @@ -177,34 +193,47 @@ async def _run_attempt(self):
entries=request_entries,
retry=None,
)
async for result_list in result_generator:
for result in result_list.entries:
# convert sub-request index to global index
orig_idx = active_request_indices[result.index]
entry_error = core_exceptions.from_grpc_status(
result.status.code,
result.status.message,
details=result.status.details,
)
if result.status.code != 0:
# mutation failed; update error list (and remaining_indices if retryable)
self._handle_entry_error(orig_idx, entry_error)
elif orig_idx in self.errors:
# mutation succeeded; remove from error list
del self.errors[orig_idx]
# remove processed entry from active list
del active_request_indices[result.index]
try:
async for result_list in result_generator:
for result in result_list.entries:
# convert sub-request index to global index
orig_idx = active_request_indices[result.index]
entry_error = core_exceptions.from_grpc_status(
result.status.code,
result.status.message,
details=result.status.details,
)
if result.status.code != 0:
# mutation failed; update error list (and remaining_indices if retryable)
self._handle_entry_error(orig_idx, entry_error)
elif orig_idx in self.errors:
# mutation succeeded; remove from error list
del self.errors[orig_idx]
# remove processed entry from active list
del active_request_indices[result.index]
finally:
# send trailing metadata to metrics
result_generator.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to call cancel in the end?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is calling cancel on the grpc stream. I think this is in case we encounter a client error; we need to stop the stream so it'll give us the trailing metadata

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What client error would happen? I'm just wondering if this will introduce some bugs (like not consuming all the results or cancelling the grpc stream twice)

metadata = (
await result_generator.trailing_metadata()
+ await result_generator.initial_metadata()
)
self._operation_metrics.add_response_metadata(metadata)
except Exception as exc:
# add this exception to list for each mutation that wasn't
# already handled, and update remaining_indices if mutation is retryable
for idx in active_request_indices.values():
self._handle_entry_error(idx, exc)
# record attempt failure metric
self._operation_metrics.end_attempt_with_status(exc)
# bubble up exception to be handled by retry wrapper
raise
# check if attempt succeeded, or needs to be retried
if self.remaining_indices:
# unfinished work; raise exception to trigger retry
raise bt_exceptions._MutateRowsIncomplete
last_exc = self.errors[self.remaining_indices[-1]][-1]
self._operation_metrics.end_attempt_with_status(last_exc)
raise bt_exceptions._MutateRowsIncomplete()

def _handle_entry_error(self, idx: int, exc: Exception):
"""
Expand Down
128 changes: 83 additions & 45 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
from typing import (
TYPE_CHECKING,
AsyncGenerator,
AsyncIterable,
Awaitable,
Sequence,
)
import time

from google.cloud.bigtable_v2.types import ReadRowsRequest as ReadRowsRequestPB
from google.cloud.bigtable_v2.types import ReadRowsResponse as ReadRowsResponsePB
Expand All @@ -34,13 +34,16 @@
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import backoff_generator

from google.api_core.grpc_helpers_async import GrpcAsyncStream
from google.cloud.bigtable.data._helpers import _retry_exception_factory

from google.api_core import retry as retries
from google.api_core.retry import exponential_sleep_generator

if TYPE_CHECKING:
from google.cloud.bigtable.data._async.client import TableAsync
from google.cloud.bigtable.data._metrics import ActiveOperationMetric


class _ResetRow(Exception):
Expand Down Expand Up @@ -70,6 +73,7 @@ class _ReadRowsOperationAsync:
"_metadata",
"_last_yielded_row_key",
"_remaining_count",
"_operation_metrics",
)

def __init__(
Expand All @@ -78,6 +82,7 @@ def __init__(
table: "TableAsync",
operation_timeout: float,
attempt_timeout: float,
metrics: ActiveOperationMetric,
retryable_exceptions: Sequence[type[Exception]] = (),
):
self.attempt_timeout_gen = _attempt_timeout_generator(
Expand All @@ -100,17 +105,26 @@ def __init__(
)
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None
self._operation_metrics = metrics

def start_operation(self) -> AsyncGenerator[Row, None]:
"""
Start the read_rows operation, retrying on retryable errors.
"""
sleep_generator = backoff_generator()
self._operation_metrics.backoff_generator = sleep_generator

# Metrics:
# track attempt failures using build_wrapped_fn_handlers() for raised exceptions
# and operation timeouts
metric_fns = self._operation_metrics.build_wrapped_fn_handlers(self._predicate)
metric_predicate, metric_exc_factory = metric_fns
return retries.retry_target_stream_async(
self._read_rows_attempt,
self._predicate,
exponential_sleep_generator(0.01, 60, multiplier=2),
metric_fns[0],
sleep_generator,
self.operation_timeout,
exception_factory=_retry_exception_factory,
exception_factory=metric_fns[1],
)

def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
Expand All @@ -120,6 +134,8 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
which will call this function until it succeeds or
a non-retryable error is raised.
"""
# register metric start
self._operation_metrics.start_attempt()
# revise request keys and ranges between attempts
if self._last_yielded_row_key is not None:
# if this is a retry, try to trim down the request to avoid ones we've already processed
Expand All @@ -130,12 +146,12 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
)
except _RowSetComplete:
# if we've already seen all the rows, we're done
return self.merge_rows(None)
return self.merge_rows(None, self._operation_metrics)
# revise the limit based on number of rows already yielded
if self._remaining_count is not None:
self.request.rows_limit = self._remaining_count
if self._remaining_count == 0:
return self.merge_rows(None)
return self.merge_rows(None, self._operation_metrics)
# create and return a new row merger
gapic_stream = self.table.client._gapic_client.read_rows(
self.request,
Expand All @@ -144,70 +160,82 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
retry=None,
)
chunked_stream = self.chunk_stream(gapic_stream)
return self.merge_rows(chunked_stream)
return self.merge_rows(chunked_stream, self._operation_metrics)

async def chunk_stream(
self, stream: Awaitable[AsyncIterable[ReadRowsResponsePB]]
self, stream: Awaitable[GrpcAsyncStream[ReadRowsResponsePB]]
) -> AsyncGenerator[ReadRowsResponsePB.CellChunk, None]:
"""
process chunks out of raw read_rows stream
"""
async for resp in await stream:
# extract proto from proto-plus wrapper
resp = resp._pb
call = await stream
try:
async for resp in call:
# extract proto from proto-plus wrapper
resp = resp._pb

# handle last_scanned_row_key packets, sent when server
# has scanned past the end of the row range
if resp.last_scanned_row_key:
if (
self._last_yielded_row_key is not None
and resp.last_scanned_row_key <= self._last_yielded_row_key
):
raise InvalidChunk("last scanned out of order")
self._last_yielded_row_key = resp.last_scanned_row_key

# handle last_scanned_row_key packets, sent when server
# has scanned past the end of the row range
if resp.last_scanned_row_key:
if (
self._last_yielded_row_key is not None
and resp.last_scanned_row_key <= self._last_yielded_row_key
):
raise InvalidChunk("last scanned out of order")
self._last_yielded_row_key = resp.last_scanned_row_key

current_key = None
# process each chunk in the response
for c in resp.chunks:
if current_key is None:
current_key = c.row_key
current_key = None
# process each chunk in the response
for c in resp.chunks:
if current_key is None:
raise InvalidChunk("first chunk is missing a row key")
elif (
self._last_yielded_row_key
and current_key <= self._last_yielded_row_key
):
raise InvalidChunk("row keys should be strictly increasing")
current_key = c.row_key
if current_key is None:
raise InvalidChunk("first chunk is missing a row key")
elif (
self._last_yielded_row_key
and current_key <= self._last_yielded_row_key
):
raise InvalidChunk("row keys should be strictly increasing")

yield c
yield c

if c.reset_row:
current_key = None
elif c.commit_row:
# update row state after each commit
self._last_yielded_row_key = current_key
if self._remaining_count is not None:
self._remaining_count -= 1
if self._remaining_count < 0:
raise InvalidChunk("emit count exceeds row limit")
current_key = None
if c.reset_row:
current_key = None
elif c.commit_row:
# update row state after each commit
self._last_yielded_row_key = current_key
if self._remaining_count is not None:
self._remaining_count -= 1
if self._remaining_count < 0:
raise InvalidChunk("emit count exceeds row limit")
current_key = None
mutianf marked this conversation as resolved.
Show resolved Hide resolved
finally:
# ensure stream is closed
call.cancel()
# send trailing metadata to metrics
metadata = await call.trailing_metadata() + await call.initial_metadata()
self._operation_metrics.add_response_metadata(metadata)

@staticmethod
async def merge_rows(
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None
chunks: AsyncGenerator[ReadRowsResponsePB.CellChunk, None] | None,
operation: ActiveOperationMetric,
):
"""
Merge chunks into rows
"""
if chunks is None:
operation.end_with_success()
return
it = chunks.__aiter__()
is_first_row = True
# For each row
while True:
try:
c = await it.__anext__()
except StopAsyncIteration:
# stream complete
operation.end_with_success()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this happen when customer cancels the read in the middle of a stream?

return
row_key = c.row_key

Expand Down Expand Up @@ -284,7 +312,17 @@ async def merge_rows(
Cell(value, row_key, family, qualifier, ts, list(labels))
)
if c.commit_row:
if is_first_row:
# record first row latency in metrics
is_first_row = False
operation.attempt_first_response()
block_time = time.monotonic()
yield Row(row_key, cells)
# most metric operations use setters, but this one updates
# the value directly to avoid extra overhead
operation.active_attempt.application_blocking_time_ms += ( # type: ignore
time.monotonic() - block_time
) * 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we record everything in nanoseconds and convert them to milliseconds instead?

break
c = await it.__anext__()
except _ResetRow as e:
Expand Down
Loading
Loading