Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modified BigQueryDynamicDestination destination to support write API with exactly once semantics #1885

Merged
merged 3 commits into from
Sep 24, 2024

Conversation

dedocibula
Copy link
Contributor

Spanner change stream to BQ template fails to launch with error "ParDo requires a deterministic key coder in order to use state and timers" when using useStorageWriteApi=true and useStorageWriteApiAtLeastOnce=false with:

com.google.cloud.teleport.v2.common.UncaughtExceptionLogger - The template launch failed.
java.lang.IllegalArgumentException: ParDo requires a deterministic key coder in order to use state and timers, the reason is:
 ShardedKey$Coder(KvCoder(SerializableCoder(com.google.cloud.bigquery.TableId),TableRowJsonCoder)) is not deterministic because:
	Key coder must be deterministic
	at org.apache.beam.sdk.transforms.ParDo.validateStateApplicableForInput(ParDo.java:448)
	at org.apache.beam.sdk.transforms.ParDo.access$300(ParDo.java:394)
	at org.apache.beam.sdk.transforms.ParDo$MultiOutput.expand(ParDo.java:953)
	at org.apache.beam.sdk.transforms.ParDo$MultiOutput.expand(ParDo.java:872)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
	at org.apache.beam.sdk.transforms.ParDo$SingleOutput.expand(ParDo.java:797)
	at org.apache.beam.sdk.transforms.ParDo$SingleOutput.expand(ParDo.java:712)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
	at org.apache.beam.sdk.transforms.GroupIntoBatches.expand(GroupIntoBatches.java:338)
	at org.apache.beam.sdk.transforms.GroupIntoBatches.expand(GroupIntoBatches.java:103)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:508)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:365)
	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiLoads.expandTriggered(StorageApiLoads.java:231)
	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiLoads.expand(StorageApiLoads.java:125)
	at org.apache.beam.sdk.io.gcp.bigquery.StorageApiLoads.expand(StorageApiLoads.java:46)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:508)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:365)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.continueExpandTyped(BigQueryIO.java:3718)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:3502)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:3374)
	at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:2214)
	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:559)
	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:508)
	at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:365)
	at com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.SpannerChangeStreamsToBigQuery.run(SpannerChangeStreamsToBigQuery.java:382)
	at com.google.cloud.teleport.v2.templates.spannerchangestreamstobigquery.SpannerChangeStreamsToBigQuery.main(SpannerChangeStreamsToBigQuery.java:198)

Given that TableRowJsonCoder's verifyDeterministic method always throws we need to implement a custom deterministic coder for the value

…es element byte size correctly and passes the IT
Copy link

codecov bot commented Sep 23, 2024

Codecov Report

Attention: Patch coverage is 0% with 22 lines in your changes missing coverage. Please review.

Project coverage is 43.98%. Comparing base (4f45d13) to head (10d11ac).
Report is 6 commits behind head on main.

Files with missing lines Patch % Lines
...streamstobigquery/BigQueryDynamicDestinations.java 0.00% 22 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1885      +/-   ##
============================================
+ Coverage     43.95%   43.98%   +0.02%     
- Complexity     3448     3457       +9     
============================================
  Files           827      827              
  Lines         49001    49053      +52     
  Branches       5136     5141       +5     
============================================
+ Hits          21540    21574      +34     
- Misses        25810    25831      +21     
+ Partials       1651     1648       -3     
Components Coverage Δ
spanner-templates 63.58% <ø> (+0.11%) ⬆️
spanner-import-export 63.82% <ø> (+0.03%) ⬆️
spanner-live-forward-migration 75.24% <ø> (+0.19%) ⬆️
spanner-live-reverse-replication 68.40% <ø> (+0.18%) ⬆️
spanner-bulk-migration 84.29% <ø> (+0.17%) ⬆️
Files with missing lines Coverage Δ
...streamstobigquery/BigQueryDynamicDestinations.java 0.00% <0.00%> (ø)

... and 12 files with indirect coverage changes

Copy link
Contributor

@damondouglas damondouglas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved and merging:

  • Java PR / JUnit Test Report (pull_request) failed checks have nothing to do with this PR
  • Dataflow Integration Test failures show the same failures in the JUnit Test Report

@damondouglas damondouglas merged commit 5eb232a into GoogleCloudPlatform:main Sep 24, 2024
10 of 13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants