Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

materialize-bigquery: don't serialize JSON nulls #2323

Merged
merged 3 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package boilerplate
package common

import "strings"

Expand Down
21 changes: 16 additions & 5 deletions go/encrow/encrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (

// A Shape represents the structure of a particular document.
type Shape struct {
arity int
prefixes []string
swizzle []int
flags json.AppendFlags
arity int
prefixes []string
swizzle []int
flags json.AppendFlags
skipNulls bool
}

// NewShape constructs a new Shape corresponding to the provided field names.
Expand Down Expand Up @@ -47,6 +48,11 @@ func (s *Shape) SetFlags(flags json.AppendFlags) {
s.flags = flags
}

// SkipNulls will cause serialized results to omit fields with a `nil` value.
func (s *Shape) SkipNulls() {
s.skipNulls = true
}

func generatePrefixes(fields []string) []string {
var prefixes []string
for idx, fieldName := range fields {
Expand Down Expand Up @@ -76,8 +82,13 @@ func (s *Shape) Encode(buf []byte, values []any) ([]byte, error) {

buf = buf[:0]
for idx, vidx := range s.swizzle {
v := values[vidx]
if s.skipNulls && v == nil {
continue
}

buf = append(buf, s.prefixes[idx]...)
buf, err = json.Append(buf, values[vidx], s.flags)
buf, err = json.Append(buf, v, s.flags)
if err != nil {
return nil, err
}
Expand Down
13 changes: 13 additions & 0 deletions materialize-bigquery/.snapshots/TestSpecification
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@
],
"title": "dbt Cloud Job Trigger",
"description": "Trigger a dbt job when new data is available"
},
"advanced": {
"properties": {
"feature_flags": {
"type": "string",
"title": "Feature Flags",
"description": "This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."
}
},
"additionalProperties": false,
"type": "object",
"title": "Advanced Options",
"description": "Options for advanced users. You should not typically need to modify these."
}
},
"type": "object",
Expand Down
19 changes: 18 additions & 1 deletion materialize-bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"cloud.google.com/go/bigquery"
storage "cloud.google.com/go/storage"
"github.com/estuary/connectors/go/common"
"github.com/estuary/connectors/go/dbt"
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
sql "github.com/estuary/connectors/materialize-sql"
Expand All @@ -16,7 +17,12 @@ import (
"google.golang.org/api/option"
)

// Config represents the endpoint configuration for BigQuery.
var featureFlagDefaults = map[string]bool{
// When set, object and array field types will be materialized as JSON
// columns, instead of the historical behavior of strings.
"objects_and_arrays_as_json": false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is fairly self-explanatory in this case, but I've been trying to maintain a convention of describing the impact of a flag here in the defaults map.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea, I added a comment here to explain what the flag does.

}

type config struct {
ProjectID string `json:"project_id" jsonschema:"title=Project ID,description=Google Cloud Project ID that owns the BigQuery dataset." jsonschema_extras:"order=0"`
CredentialsJSON string `json:"credentials_json" jsonschema:"title=Service Account JSON,description=The JSON credentials of the service account to use for authorization." jsonschema_extras:"secret=true,multiline=true,order=1"`
Expand All @@ -28,6 +34,12 @@ type config struct {
HardDelete bool `json:"hardDelete,omitempty" jsonschema:"title=Hard Delete,description=If this option is enabled items deleted in the source will also be deleted from the destination. By default is disabled and _meta/op in the destination will signify whether rows have been deleted (soft-delete).,default=false" jsonschema_extras:"order=7"`
Schedule boilerplate.ScheduleConfig `json:"syncSchedule,omitempty" jsonschema:"title=Sync Schedule,description=Configure schedule of transactions for the materialization."`
DBTJobTrigger dbt.JobConfig `json:"dbt_job_trigger,omitempty" jsonschema:"title=dbt Cloud Job Trigger,description=Trigger a dbt job when new data is available"`

Advanced advancedConfig `json:"advanced,omitempty" jsonschema:"title=Advanced Options,description=Options for advanced users. You should not typically need to modify these." jsonschema_extra:"advanced=true"`
}

type advancedConfig struct {
FeatureFlags string `json:"feature_flags,omitempty" jsonschema:"title=Feature Flags,description=This property is intended for Estuary internal use. You should only modify this field as directed by Estuary support."`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not an advanced property specifically for materializing objects and arrays as strings without an exposed JSONSchema? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this same thought initially, and there was some internal discussion about it here.

The TL;DR is that there's a desire to standardize around using this feature flag convention for this sort of thing, as we have been & will be doing more of for changing behaviors of SQL captures. More specifically what I mean re: "this sort of thing" is controlling behaviors that we don't expect users to ever interact with directly, either for preserving existing compatibility, or selectively enabling new code paths.

}

func (c *config) Validate() error {
Expand Down Expand Up @@ -162,6 +174,11 @@ func newBigQueryDriver() *sql.Driver {
"bucket_path": cfg.BucketPath,
}).Info("creating bigquery endpoint")

var featureFlags = common.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults)
if cfg.Advanced.FeatureFlags != "" {
log.WithField("flags", featureFlags).Info("parsed feature flags")
}

return &sql.Endpoint{
Config: cfg,
Dialect: bqDialect,
Expand Down
2 changes: 1 addition & 1 deletion materialize-bigquery/staged_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (f *stagedFile) start() {
func (f *stagedFile) newFile(ctx context.Context) {
fName := uuid.NewString()
writer := f.client.Bucket(f.bucket).Object(path.Join(f.prefix, fName)).NewWriter(ctx)
f.encoder = enc.NewJsonEncoder(writer, f.fields)
f.encoder = enc.NewJsonEncoder(writer, f.fields, enc.WithJsonSkipNulls())
f.uploaded = append(f.uploaded, fName)
}

Expand Down
10 changes: 10 additions & 0 deletions materialize-boilerplate/stream-encode/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (

type jsonConfig struct {
disableCompression bool
skipNulls bool
}

type JsonOption func(*jsonConfig)
Expand All @@ -36,6 +37,12 @@ func WithJsonDisableCompression() JsonOption {
}
}

func WithJsonSkipNulls() JsonOption {
return func(cfg *jsonConfig) {
cfg.skipNulls = true
}
}

type JsonEncoder struct {
w io.Writer // will be set to `gz` for compressed writes or `cwc` if compression is disabled
cwc *countingWriteCloser
Expand Down Expand Up @@ -77,6 +84,9 @@ func NewJsonEncoder(w io.WriteCloser, fields []string, opts ...JsonOption) *Json
// serialized as JSON, and escaping HTML is not desired so as to avoid escaping values like
// <, >, &, etc. if they are present in the materialized collection's data.
enc.shape.SetFlags(json.TrustRawMessage)
if cfg.skipNulls {
enc.shape.SkipNulls()
}
}

return enc
Expand Down
5 changes: 3 additions & 2 deletions source-redshift-batch/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"text/template"
"time"

"github.com/estuary/connectors/go/common"
"github.com/estuary/connectors/go/encrow"
"github.com/estuary/connectors/go/schedule"
schemagen "github.com/estuary/connectors/go/schema-gen"
Expand Down Expand Up @@ -175,7 +176,7 @@ func (drv *BatchSQLDriver) Discover(ctx context.Context, req *pc.Request_Discove
}
cfg.SetDefaults()

var featureFlags = boilerplate.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults)
var featureFlags = common.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults)
log.WithField("flags", featureFlags).Info("parsed feature flags")

var db, err = drv.Connect(ctx, &cfg)
Expand Down Expand Up @@ -632,7 +633,7 @@ func (drv *BatchSQLDriver) Pull(open *pc.Request_Open, stream *boilerplate.PullO
}
cfg.SetDefaults()

var featureFlags = boilerplate.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults)
var featureFlags = common.ParseFeatureFlags(cfg.Advanced.FeatureFlags, featureFlagDefaults)
log.WithField("flags", featureFlags).Info("parsed feature flags")

var db, err = drv.Connect(stream.Context(), &cfg)
Expand Down
3 changes: 2 additions & 1 deletion source-sqlserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/estuary/connectors/go/common"
cerrors "github.com/estuary/connectors/go/connector-errors"
networkTunnel "github.com/estuary/connectors/go/network-tunnel"
"github.com/estuary/connectors/go/schedule"
Expand Down Expand Up @@ -171,7 +172,7 @@ func connectSQLServer(ctx context.Context, name string, cfg json.RawMessage) (sq
}
config.SetDefaults()

var featureFlags = boilerplate.ParseFeatureFlags(config.Advanced.FeatureFlags, featureFlagDefaults)
var featureFlags = common.ParseFeatureFlags(config.Advanced.FeatureFlags, featureFlagDefaults)
if config.Advanced.FeatureFlags != "" {
log.WithField("flags", featureFlags).Info("parsed feature flags")
}
Expand Down
6 changes: 3 additions & 3 deletions tests/materialize/materialize-bigquery/snapshot.json
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@
"flow_document": "{\"_meta\":{\"uuid\":\"81ca0080-1dda-11b2-8000-071353030311\"},\"array_int\":[81,82],\"bool_field\":true,\"float_field\":88.88,\"id\":8,\"multiple\":null,\"nested\":{\"id\":\"i8\"},\"nullable_int\":8,\"str_field\":\"str8 v2\"}",
"flow_published_at": "1970-01-01T01:00:21Z",
"id": 8,
"multiple": "null",
"multiple": null,
"nested": "{\"id\":\"i8\"}",
"nullable_int": 8,
"str_field": "str8 v2"
Expand All @@ -235,7 +235,7 @@
"flow_document": "{\"_meta\":{\"uuid\":\"82629700-1dda-11b2-8000-071353030311\"},\"array_int\":[91,92],\"binary_field\":\"YWxvaGEK\",\"bool_field\":false,\"float_field\":99.99,\"id\":9,\"nested\":{\"id\":\"i9\"},\"nullable_int\":null,\"str_field\":\"str9 v2\"}",
"flow_published_at": "1970-01-01T01:00:22Z",
"id": 9,
"multiple": "null",
"multiple": null,
"nested": "{\"id\":\"i9\"}",
"nullable_int": null,
"str_field": "str9 v2"
Expand All @@ -248,7 +248,7 @@
"flow_document": "{\"_meta\":{\"uuid\":\"82fb2d80-1dda-11b2-8000-071353030311\"},\"array_int\":[1,2],\"binary_field\":\"c2F5xY1uYXJhCg==\",\"bool_field\":true,\"float_field\":1010.101,\"id\":10,\"nested\":{\"id\":\"i10\"},\"nullable_int\":10,\"str_field\":\"str10 v2\"}",
"flow_published_at": "1970-01-01T01:00:23Z",
"id": 10,
"multiple": "null",
"multiple": null,
"nested": "{\"id\":\"i10\"}",
"nullable_int": 10,
"str_field": "str10 v2"
Expand Down
Loading