Skip to content

Commit

Permalink
modified TableFieldSchemaCoder to mimic TableRowJsonCoder which encod…
Browse files Browse the repository at this point in the history
…es element byte size correctly and passes the IT
  • Loading branch information
dedocibula committed Sep 23, 2024
1 parent f4a6a93 commit 6b89160
Showing 1 changed file with 37 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand All @@ -45,6 +45,7 @@
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -225,25 +226,52 @@ private static Dialect getDialect(SpannerConfig spannerConfig) {

/**
* {@link TableFieldSchemaCoder} provides custom coder for TableFieldSchema with deterministic
* serialization.
* serialization. This coder is only used within the context of this file where TableFieldSchema
* objects are created in {@link BigQueryDynamicDestinations#getFields(TableRow)} method
* deterministically.
*/
private static class TableFieldSchemaCoder extends CustomCoder<TableFieldSchema> {

private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper().enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
private static class TableFieldSchemaCoder extends AtomicCoder<TableFieldSchema> {

private static final ObjectMapper OBJECT_MAPPER;
private static final TypeDescriptor<TableFieldSchema> TYPE_DESCRIPTOR;
private static final TableFieldSchemaCoder INSTANCE;

static {
OBJECT_MAPPER =

Check warning on line 240 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L240

Added line #L240 was not covered by tests
new ObjectMapper()
.enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS)
.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
TYPE_DESCRIPTOR = new TypeDescriptor<TableFieldSchema>() {};
INSTANCE = new TableFieldSchemaCoder();
}

Check warning on line 246 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L242-L246

Added lines #L242 - L246 were not covered by tests

private static TableFieldSchemaCoder of() {
return new TableFieldSchemaCoder();
return INSTANCE;

Check warning on line 249 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L249

Added line #L249 was not covered by tests
}

private TableFieldSchemaCoder() {}

@Override
public void encode(TableFieldSchema value, OutputStream outStream) throws IOException {
OBJECT_MAPPER.writeValue(outStream, value);
String strValue = OBJECT_MAPPER.writeValueAsString(value);
StringUtf8Coder.of().encode(strValue, outStream);
}

Check warning on line 258 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L256-L258

Added lines #L256 - L258 were not covered by tests

@Override
public TableFieldSchema decode(InputStream inStream) throws IOException {
return OBJECT_MAPPER.readValue(inStream, TableFieldSchema.class);
String strValue = StringUtf8Coder.of().decode(inStream);
return OBJECT_MAPPER.readValue(strValue, TableFieldSchema.class);

Check warning on line 263 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L262-L263

Added lines #L262 - L263 were not covered by tests
}

@Override
public long getEncodedElementByteSize(TableFieldSchema value) throws Exception {
String strValue = OBJECT_MAPPER.writeValueAsString(value);
return StringUtf8Coder.of().getEncodedElementByteSize(strValue);

Check warning on line 269 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L268-L269

Added lines #L268 - L269 were not covered by tests
}

@Override
public TypeDescriptor<TableFieldSchema> getEncodedTypeDescriptor() {
return TYPE_DESCRIPTOR;

Check warning on line 274 in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java

View check run for this annotation

Codecov / codecov/patch

v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/spannerchangestreamstobigquery/BigQueryDynamicDestinations.java#L274

Added line #L274 was not covered by tests
}

@Override
Expand Down

0 comments on commit 6b89160

Please sign in to comment.