Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: BigQuery Storage Write API does not write with no complaint #28168

Closed
1 of 15 tasks
onurdialpad opened this issue Aug 25, 2023 · 14 comments
Closed
1 of 15 tasks

[Bug]: BigQuery Storage Write API does not write with no complaint #28168

onurdialpad opened this issue Aug 25, 2023 · 14 comments

Comments

@onurdialpad
Copy link

onurdialpad commented Aug 25, 2023

What happened?

I wanted to test Storage Write API with SDK 2.49.0 and tried to write a simple data on Dataflow but the "writing" step does not do anything, no logging there as well.

Here is my code snippet.

  with beam.Pipeline(options=pipeline_options) as pipeline:
    ...
    # pylint: disable=line-too-long
    result = objects_for_storage | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table_spec,
                                                                                schema=_SCHEMA,
                                                                                method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)
    _ = (result.failed_rows_with_errors
         | 'Get Errors' >> beam.Map(lambda e: {
              "destination": e[0],
              "row": json.dumps(e[1]),
              "error_message": e[2][0]['message']
            })
         | "LogElements" >> beam.ParDo(LogElements()))

Here the step does not produce output

Screen Shot 2023-08-25 at 1 58 00 PM Screen Shot 2023-08-25 at 1 58 11 PM

Issue Priority

Priority: 1

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
@onurdialpad onurdialpad changed the title [Bug]: BigQuery Storage Write API doesn' [Bug]: BigQuery Storage Write API does not write with no complaint Aug 25, 2023
@onurdialpad
Copy link
Author

onurdialpad commented Aug 28, 2023

.add-labels P1
.remove-labels P3

@github-actions github-actions bot added P1 and removed P3 labels Aug 28, 2023
@liferoad
Copy link
Collaborator

Can you share more details about your job? Streaming ? Source? Since this is a Dataflow job, you could open a cloud ticket: https://cloud.google.com/dataflow/docs/support/getting-support#file-bugs-or-feature-requests

@ahmedabu98
Copy link
Contributor

Hey @onurdialpad, can you provide a reproducible snippet?

@onurdialpad
Copy link
Author

onurdialpad commented Aug 28, 2023

@ahmedabu98 sure, here a snippet. I tested the snippet as well and it did the same thing. No write, no log/error.

import logging
import sys
from typing import Dict, Iterable, List

import apache_beam as beam
from apache_beam.options import pipeline_options as beam_pipeline_options


class CustomPipelineOptions(beam_pipeline_options.PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--tablePrefix', help='The name of the table to write data to')

if __name__ == '__main__':
  _TABLE_PREFIX = 'tablePrefix'
  _PROJECT = 'project'

  table_schema = {
    'fields': [{
      'name': 'rand', 'type': 'STRING', 'mode': 'NULLABLE'
    }]
  }

  pipeline_options = CustomPipelineOptions(flags=sys.argv, streaming=True, save_main_session=True)

  with beam.Pipeline(options=pipeline_options) as pipeline:
    options = pipeline_options.get_all_options()
    project = options[_PROJECT]
    table_spec: str = f'{project}:mydataset.{_TABLE_PREFIX}'

    coll = pipeline | beam.Create([
      {
        'rand': 'Mahatma Gandhi'
      },
      {
        'rand': 'ABCD'
      },
    ])
    
    result = coll | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table_spec,
                                                                 schema=table_schema,
                                                                 method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API)
    

@liferoad Data comes from Pub/Sub in the real environment and the Dataflow is running with streaming mode. It works if I use STREAMING_INSERTS as method here instead of STORAGE_WRITE_API. No matter what I try STORAGE_WRITE_API method does not work for me.

@ahmedabu98
Copy link
Contributor

@onurdialpad I tried running the snippet you provided on 2.49.0 and it worked for both local and Dataflows runners. Can you provide us any relevant logs you're seeing?

+1 to @liferoad's suggestion of opening a Dataflow ticket, it will help the internal engineers debug your pipeline better

@onurdialpad
Copy link
Author

@ahmedabu98 thanks for trying that, Can you elaborate what you meant by "it worked", did you see the job wrote records to the BQ?

Regarding with opening ticket to Dataflow, sure I will do it. Just a note, when I try to run the snippet with DirectRunner on the local it does not write anything to BQ with no log. To clarify: it "works" but not as intended which means it is supposed to write to BQ but it does not, it just works without doing anything.

@ahmedabu98
Copy link
Contributor

I mean that it did write records to the BQ table.

However, I think it's because the snippet uses a batch source (beam.Create()). I tried again with a streaming source and I'm getting a pipeline that doesn't output anything. I'm seeing these errors too:
image

This streaming case may be broken.. I'm still investigating why

@ahmedabu98
Copy link
Contributor

ahmedabu98 commented Aug 30, 2023

Hey @onurdialpad, I'm still digging into it but I've narrowed it down to runner V2 (both Java and Python jobs exhibit this behavior). I suspect a recent internal change is tripping up this behavior.

I'll continue investigating but for now, you may be able to mitigate this by running with the legacy runner. Python Dataflow jobs default to runner v2 but you can disable it as long as you're using a Beam version that is before 2.50.0 2.45.0. Just use --experiments=disable_runner_v2_until_2023

@ahmedabu98
Copy link
Contributor

Ahh sorry nevermind, this xlang storage write connector was implemented on 2.47.0, so that mitigation won't work

@onurdialpad
Copy link
Author

Hey @ahmedabu98 thanks for the effort! It's interesting that the snippet I share here doesn't produce any output on BQ side even it uses batch source (beam.Create()) as you mentioned

@ahmedabu98
Copy link
Contributor

Hey @onurdialpad, we've confirmed it is a bug in Dataflow's Runner V2 that gets hit by Storage Write API with autosharding.

One workaround is to use at_least_once=True, which will use at-least-once semantics (as opposed to exactly-once). More on that here: https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics

I'm going to open a PR to also allow setting a fixed number of shards as another workaround, which may be available for Beam 2.51.0 and will work for exactly-once semantics.

@kennknowles
Copy link
Member

This is tagged as blocking 2.51.0 which is in progress now. This does seem like a major lack of functionality. I see followups and comments on and about #28592. Is there a cherrypick open or is it not yet resolved?

@ahmedabu98
Copy link
Contributor

Hey @kennknowles, this is resolved and a CP is ready in #28631

@ahmedabu98
Copy link
Contributor

Fixed by #28618 (follow-up of #28592)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants