-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Google Big Query Write append_rows does not respect retry configuration #593
Comments
I'm seeing a similar issue with In my case I'm not using an async iterator though. Instead, I'm calling the More context:
Let me know if I can be of any help (e.g. by providing additional details). |
Thank you @kamilglod and @abannura. Could you provide a minimum code snippet that can reproduce the error? |
@Linchin it's not easy to repeat as it's happening occasionally, the easiest way to reproduce it is probably by setup the simplest streaming example, run it for longer than 1 day and wait for |
@kamilglod Could you tell me which exact class you are using? Still, a code snippet would help a lot, it reduces lots of ambiguities. |
@Linchin sure, I can simplify my code to something like: import logging
from typing import AsyncIterator, Sequence
from google.api_core import exceptions as core_exceptions
from google.api_core import retry as retries
from google.cloud.bigquery_storage_v1.services.big_query_write.async_client import (
BigQueryWriteAsyncClient,
)
from google.cloud.bigquery_storage_v1.types import (
AppendRowsRequest,
CreateWriteStreamRequest,
WriteStream,
)
from google.protobuf.message import Message
from google.rpc.code_pb2 import Code
logger = logging.getLogger(__name__)
# same as the default, but with more exceptions in predicate
DEFAULT_RETRY = retries.Retry(
initial=0.1,
maximum=60.0,
multiplier=1.3,
timeout=86400.0,
predicate=retries.if_exception_type(
core_exceptions.ServiceUnavailable,
core_exceptions.Aborted,
core_exceptions.InternalServerError,
core_exceptions.BadGateway,
core_exceptions.GatewayTimeout,
),
on_error=lambda exc: logger.warning("BQ stream retriable error.", exc_info=exc),
)
async def stream(table_path: str, messages: AsyncIterator[Sequence[Message]]):
client = BigQueryWriteAsyncClient()
write_stream = await client.create_write_stream(
CreateWriteStreamRequest(
parent=table_path,
write_stream=WriteStream(type_=WriteStream.Type.COMMITTED),
)
)
stream = await client.append_rows(
_inner_stream(messages, write_stream.name), retry=DEFAULT_RETRY
)
async for result in stream:
if result.error.code != Code.OK:
raise Exception(
f"Unexpected result {result.error.code} {result.error.message}"
)
async def _inner_stream(
self, messages: AsyncIterator[Sequence[Message]], stream_name: str
) -> AsyncIterator[AppendRowsRequest]:
... and when the |
I am able to reproduce the issue by intentionally making the append row request invalid:
|
@Linchin please try to reproduce it by
Here are some other errors that I logged in last couple of days that should be retried based on the stream
|
After some learning I think the retry is actually working as intended here. The There is some work going on in the core client library to support retries in streaming: googleapis/python-api-core#495. However, this will only support server streaming, not the client streaming or bidirectional streaming. Thanks to @daniel-sanche @leahecole @shollyman for helping me understand the situation. |
@Linchin what do you mean by server streaming, client streaming and bidirectional streaming? |
Here are the official definitions in gRPC: https://grpc.io/docs/what-is-grpc/core-concepts/#server-streaming-rpc tl;dr: server streaming is when streaming is sent from server to user (like youtube), and client streaming is the other way around. In your case we are streaming stuff to the server, so it's client streaming. |
@Linchin thanks, now it's clear. Do you have any best practise how to handle this server error? I handled it by simply restarting the stream (I'm using tenacity to retry in case of an error), but the problem is that I'm probably loosing one row that should be inserted (it might be consumed from iterator by stream) and I'm not sure how I can check if it was inserted into the table or not. Server might fail before or after handling row and it's not idempotent to send the same message to the different stream. |
The read client handles reconnection to the server in the following way:
I don't think we implemented anything similar for the write API yet. Edit: Potentially, we might want to add "reopen" or something like that to our send method, since we can't do a plain loop in the way that we did for reads. The complication with the write API is that we might have a bunch of requests that we're waiting for a response on. So we might need to resend that backlog queue of requests. |
Perhaps @yirutang can comment on this? I see that https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java#L707 always resends any in-flight requests. |
Default configuration for append_rows() call here and here sets that by default request would be retired in case of
google.api_core.exceptions.ServiceUnavailable
exception with a timeout of 1 day. However I observed that in case of this server response it raises this exception without retrying the call.Environment details
3.11.2
22.3.1
google-cloud-bigquery-storage
version:2.19.0
Steps to reproduce
append_rows()
predicate
Stack trace
The text was updated successfully, but these errors were encountered: