Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove custom routing metadata #1036

Merged
merged 4 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from google.api_core import retry as retries
import google.cloud.bigtable_v2.types.bigtable as types_pb
import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _retry_exception_factory

Expand Down Expand Up @@ -84,14 +83,10 @@ def __init__(
f"all entries. Found {total_mutations}."
)
# create partial function to pass to trigger rpc call
metadata = _make_metadata(
table.table_name, table.app_profile_id, instance_name=None
)
self._gapic_fn = functools.partial(
gapic_client.mutate_rows,
table_name=table.table_name,
app_profile_id=table.app_profile_id,
metadata=metadata,
retry=None,
)
# create predicate for determining which errors are retryable
Expand Down
6 changes: 0 additions & 6 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from google.cloud.bigtable.data.exceptions import InvalidChunk
from google.cloud.bigtable.data.exceptions import _RowSetComplete
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _retry_exception_factory

from google.api_core import retry as retries
Expand Down Expand Up @@ -74,7 +73,6 @@ class _ReadRowsOperationAsync:
"request",
"table",
"_predicate",
"_metadata",
"_last_yielded_row_key",
"_remaining_count",
)
Expand All @@ -101,9 +99,6 @@ def __init__(
self.request = query._to_pb(table)
self.table = table
self._predicate = retries.if_exception_type(*retryable_exceptions)
self._metadata = _make_metadata(
table.table_name, table.app_profile_id, instance_name=None
)
self._last_yielded_row_key: bytes | None = None
self._remaining_count: int | None = self.request.rows_limit or None

Expand Down Expand Up @@ -152,7 +147,6 @@ def _read_rows_attempt(self) -> AsyncGenerator[Row, None]:
gapic_stream = self.table.client._gapic_client.read_rows(
self.request,
timeout=next(self.attempt_timeout_gen),
metadata=self._metadata,
retry=None,
)
chunked_stream = self.chunk_stream(gapic_stream)
Expand Down
52 changes: 12 additions & 40 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
_get_error_type,
_get_retryable_errors,
_get_timeouts,
_make_metadata,
_retry_exception_factory,
_validate_timeouts,
_WarmedInstanceKey,
Expand Down Expand Up @@ -262,19 +261,18 @@ async def _ping_and_warm_instances(
request_serializer=PingAndWarmRequest.serialize,
)
# prepare list of coroutines to run
tasks = [
ping_rpc(
request={"name": instance_name, "app_profile_id": app_profile_id},
metadata=[
(
"x-goog-request-params",
f"name={instance_name}&app_profile_id={app_profile_id}",
)
],
wait_for_ready=True,
tasks = []
for instance_name, table_name, app_profile_id in instance_list:
metadata_str = f"name={instance_name}"
if app_profile_id is not None:
metadata_str = f"{metadata_str}&app_profile_id={app_profile_id}"
tasks.append(
ping_rpc(
request={"name": instance_name, "app_profile_id": app_profile_id},
metadata=[("x-goog-request-params", metadata_str)],
wait_for_ready=True,
)
)
for (instance_name, table_name, app_profile_id) in instance_list
]
# execute coroutines in parallel
result_list = await asyncio.gather(*tasks, return_exceptions=True)
# return None in place of empty successful responses
Expand Down Expand Up @@ -508,24 +506,14 @@ async def execute_query(
"proto_format": {},
}

# app_profile_id should be set to an empty string for ExecuteQueryRequest only
app_profile_id_for_metadata = app_profile_id or ""

req_metadata = _make_metadata(
table_name=None,
app_profile_id=app_profile_id_for_metadata,
instance_name=instance_name,
)

return ExecuteQueryIteratorAsync(
self,
instance_id,
app_profile_id,
request_body,
attempt_timeout,
operation_timeout,
req_metadata,
retryable_excs,
retryable_excs=retryable_excs,
)

async def __aenter__(self):
Expand Down Expand Up @@ -1005,16 +993,11 @@ async def sample_row_keys(
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

# prepare request
metadata = _make_metadata(
self.table_name, self.app_profile_id, instance_name=None
)

async def execute_rpc():
results = await self.client._gapic_client.sample_row_keys(
table_name=self.table_name,
app_profile_id=self.app_profile_id,
timeout=next(attempt_timeout_gen),
metadata=metadata,
retry=None,
)
return [(s.row_key, s.offset_bytes) async for s in results]
Expand Down Expand Up @@ -1143,9 +1126,6 @@ async def mutate_row(
table_name=self.table_name,
app_profile_id=self.app_profile_id,
timeout=attempt_timeout,
metadata=_make_metadata(
self.table_name, self.app_profile_id, instance_name=None
),
retry=None,
)
return await retries.retry_target_async(
Expand Down Expand Up @@ -1263,17 +1243,13 @@ async def check_and_mutate_row(
):
false_case_mutations = [false_case_mutations]
false_case_list = [m._to_pb() for m in false_case_mutations or []]
metadata = _make_metadata(
self.table_name, self.app_profile_id, instance_name=None
)
result = await self.client._gapic_client.check_and_mutate_row(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb() if predicate is not None else None,
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
table_name=self.table_name,
app_profile_id=self.app_profile_id,
metadata=metadata,
timeout=operation_timeout,
retry=None,
)
Expand Down Expand Up @@ -1316,15 +1292,11 @@ async def read_modify_write_row(
rules = [rules]
if not rules:
raise ValueError("rules must contain at least one item")
metadata = _make_metadata(
self.table_name, self.app_profile_id, instance_name=None
)
result = await self.client._gapic_client.read_modify_write_row(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8") if isinstance(row_key, str) else row_key,
table_name=self.table_name,
app_profile_id=self.app_profile_id,
metadata=metadata,
timeout=operation_timeout,
retry=None,
)
Expand Down
25 changes: 0 additions & 25 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,31 +59,6 @@ class TABLE_DEFAULT(enum.Enum):
MUTATE_ROWS = "MUTATE_ROWS_DEFAULT"


def _make_metadata(
table_name: str | None, app_profile_id: str | None, instance_name: str | None
) -> list[tuple[str, str]]:
"""
Create properly formatted gRPC metadata for requests.
"""
params = []

if table_name is not None and instance_name is not None:
raise ValueError("metadata can't contain both instance_name and table_name")

if table_name is not None:
params.append(f"table_name={table_name}")
if instance_name is not None:
params.append(f"name={instance_name}")
if app_profile_id is not None:
params.append(f"app_profile_id={app_profile_id}")
if len(params) == 0:
raise ValueError(
"At least one of table_name and app_profile_id should be not None."
)
params_str = "&".join(params)
return [("x-goog-request-params", params_str)]


def _attempt_timeout_generator(
per_request_timeout: float | None, operation_timeout: float
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
Any,
AsyncIterator,
Dict,
List,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -83,8 +82,8 @@ def __init__(
request_body: Dict[str, Any],
attempt_timeout: float | None,
operation_timeout: float,
req_metadata: Sequence[Tuple[str, str]],
retryable_excs: List[type[Exception]],
req_metadata: Sequence[Tuple[str, str]] = (),
retryable_excs: Sequence[type[Exception]] = (),
) -> None:
self._table_name = None
self._app_profile_id = app_profile_id
Expand All @@ -99,6 +98,7 @@ def __init__(
self._attempt_timeout_gen = _attempt_timeout_generator(
attempt_timeout, operation_timeout
)
retryable_excs = retryable_excs or []
self._async_stream = retries.retry_target_stream_async(
self._make_request_with_resume_token,
retries.if_exception_type(*retryable_excs),
Expand Down
24 changes: 10 additions & 14 deletions google/cloud/bigtable_v2/services/bigtable/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1286,13 +1286,11 @@ def generate_initial_change_stream_partitions(

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata)
if all(m[0] != gapic_v1.routing_header.ROUTING_METADATA_KEY for m in metadata):
metadata += (
gapic_v1.routing_header.to_grpc_metadata(
(("table_name", request.table_name),)
),
)
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata(
(("table_name", request.table_name),)
),
)

# Validate the universe domain.
self._client._validate_universe_domain()
Expand Down Expand Up @@ -1390,13 +1388,11 @@ def read_change_stream(

# Certain fields should be provided within the metadata header;
# add these here.
metadata = tuple(metadata)
if all(m[0] != gapic_v1.routing_header.ROUTING_METADATA_KEY for m in metadata):
metadata += (
gapic_v1.routing_header.to_grpc_metadata(
(("table_name", request.table_name),)
),
)
metadata = tuple(metadata) + (
gapic_v1.routing_header.to_grpc_metadata(
(("table_name", request.table_name),)
),
)

# Validate the universe domain.
self._client._validate_universe_domain()
Expand Down
12 changes: 0 additions & 12 deletions owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,6 @@ def insert(file, before_line, insert_line, after_line, escape=None):
escape='"'
)

# ----------------------------------------------------------------------------
# Patch duplicate routing header: https://github.com/googleapis/gapic-generator-python/issues/2078
# ----------------------------------------------------------------------------
for file in ["async_client.py"]:
s.replace(
f"google/cloud/bigtable_v2/services/bigtable/{file}",
"metadata \= tuple\(metadata\) \+ \(",
"""metadata = tuple(metadata)
if all(m[0] != gapic_v1.routing_header.ROUTING_METADATA_KEY for m in metadata):
metadata += ("""
)

# ----------------------------------------------------------------------------
# Samples templates
# ----------------------------------------------------------------------------
Expand Down
7 changes: 1 addition & 6 deletions tests/unit/data/_async/test__mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,10 @@ def test_ctor(self):
assert client.mutate_rows.call_count == 1
# gapic_fn should call with table details
inner_kwargs = client.mutate_rows.call_args[1]
assert len(inner_kwargs) == 4
assert len(inner_kwargs) == 3
assert inner_kwargs["table_name"] == table.table_name
assert inner_kwargs["app_profile_id"] == table.app_profile_id
assert inner_kwargs["retry"] is None
metadata = inner_kwargs["metadata"]
assert len(metadata) == 1
assert metadata[0][0] == "x-goog-request-params"
assert str(table.table_name) in metadata[0][1]
assert str(table.app_profile_id) in metadata[0][1]
# entries should be passed down
entries_w_pb = [_EntryWithProto(e, e._to_pb()) for e in entries]
assert instance.mutations == entries_w_pb
Expand Down
6 changes: 0 additions & 6 deletions tests/unit/data/_async/test__read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,6 @@ def test_ctor(self):
assert instance._remaining_count == row_limit
assert instance.operation_timeout == expected_operation_timeout
assert client.read_rows.call_count == 0
assert instance._metadata == [
(
"x-goog-request-params",
"table_name=test_table&app_profile_id=test_profile",
)
]
assert instance.request.table_name == table.table_name
assert instance.request.app_profile_id == table.app_profile_id
assert instance.request.rows_limit == row_limit
Expand Down
27 changes: 1 addition & 26 deletions tests/unit/data/_async/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2176,11 +2176,10 @@ async def test_sample_row_keys_gapic_params(self):
await table.sample_row_keys(attempt_timeout=expected_timeout)
args, kwargs = sample_row_keys.call_args
assert len(args) == 0
assert len(kwargs) == 5
assert len(kwargs) == 4
assert kwargs["timeout"] == expected_timeout
assert kwargs["app_profile_id"] == expected_profile
assert kwargs["table_name"] == table.table_name
assert kwargs["metadata"] is not None
assert kwargs["retry"] is None

@pytest.mark.parametrize(
Expand Down Expand Up @@ -2375,30 +2374,6 @@ async def test_mutate_row_non_retryable_errors(self, non_retryable_exception):
"row_key", mutation, operation_timeout=0.2
)

@pytest.mark.parametrize("include_app_profile", [True, False])
@pytest.mark.asyncio
async def test_mutate_row_metadata(self, include_app_profile):
"""request should attach metadata headers"""
profile = "profile" if include_app_profile else None
async with _make_client() as client:
async with client.get_table("i", "t", app_profile_id=profile) as table:
with mock.patch.object(
client._gapic_client, "mutate_row", AsyncMock()
) as read_rows:
await table.mutate_row("rk", mock.Mock())
kwargs = read_rows.call_args_list[0].kwargs
metadata = kwargs["metadata"]
goog_metadata = None
for key, value in metadata:
if key == "x-goog-request-params":
goog_metadata = value
assert goog_metadata is not None, "x-goog-request-params not found"
assert "table_name=" + table.table_name in goog_metadata
if include_app_profile:
assert "app_profile_id=profile" in goog_metadata
else:
assert "app_profile_id=" not in goog_metadata

@pytest.mark.parametrize("mutations", [[], None])
@pytest.mark.asyncio
async def test_mutate_row_no_mutations(self, mutations):
Expand Down
Loading
Loading