Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

materializations: don't persist a spec in the destination system #2262

Merged
merged 7 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions materialize-bigquery/.snapshots/TestValidateAndApply
Original file line number Diff line number Diff line change
Expand Up @@ -82,34 +82,34 @@ Big Schema Changed Types Constraints:
{"Field":"boolField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'boolField' is already being materialized as endpoint type 'BOOLEAN' but endpoint type 'INTEGER' is required by its schema '{ type: [integer] }'"}
{"Field":"flow_document","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is the document in the current materialization"}
{"Field":"flow_published_at","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"intField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'intField' is already being materialized as endpoint type 'INTEGER' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"}
{"Field":"intField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"key","Type":1,"TypeString":"FIELD_REQUIRED","Reason":"This field is a key in the current materialization"}
{"Field":"multipleField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"nullField","Type":4,"TypeString":"FIELD_OPTIONAL","Reason":"Object fields may be materialized"}
{"Field":"numField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'numField' is already being materialized as endpoint type 'FLOAT' but endpoint type 'BOOLEAN' is required by its schema '{ type: [boolean] }'"}
{"Field":"objField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDateField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateField' is already being materialized as endpoint type 'DATE' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"}
{"Field":"stringDateTimeField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringDateTimeField' is already being materialized as endpoint type 'TIMESTAMP' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"}
{"Field":"stringDateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDateTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringDurationField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringEmailField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringField' is already being materialized as endpoint type 'STRING' but endpoint type 'INTEGER' is required by its schema '{ type: [integer] }'"}
{"Field":"stringHostnameField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringIdnEmailField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringIdnHostnameField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringIntegerField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringIntegerField' is already being materialized as endpoint type 'BIGNUMERIC' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"}
{"Field":"stringIntegerField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringIpv4Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringIpv6Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringIriField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringIriReferenceField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringJsonPointerField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringMacAddr8Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringMacAddrField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringNumberField","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringNumberField' is already being materialized as endpoint type 'FLOAT' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"}
{"Field":"stringNumberField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringRegexField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringRelativeJsonPointerField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringTimeField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringUint32Field","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringUint32Field' is already being materialized as endpoint type 'BIGNUMERIC' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"}
{"Field":"stringUint64Field","Type":6,"TypeString":"UNSATISFIABLE","Reason":"Field 'stringUint64Field' is already being materialized as endpoint type 'BIGNUMERIC' but endpoint type 'STRING' is required by its schema '{ type: [string] }'"}
{"Field":"stringUint32Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringUint64Field","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringUriField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringUriReferenceField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
{"Field":"stringUriTemplateField","Type":3,"TypeString":"LOCATION_RECOMMENDED","Reason":"This location is part of the current materialization"}
Expand Down
6 changes: 1 addition & 5 deletions materialize-bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,10 @@ func newBigQueryDriver() *sql.Driver {
"bucket_path": cfg.BucketPath,
}).Info("creating bigquery endpoint")

var metaBase sql.TablePath = []string{cfg.ProjectID, cfg.Dataset}
var metaSpecs, metaCheckpoints = sql.MetaTables(metaBase)

return &sql.Endpoint{
Config: cfg,
Dialect: bqDialect,
MetaSpecs: &metaSpecs,
MetaCheckpoints: &metaCheckpoints,
MetaCheckpoints: sql.FlowCheckpointsTable([]string{cfg.ProjectID, cfg.Dataset}),
NewClient: newClient,
CreateTableTemplate: tplCreateTargetTable,
NewResource: newTableConfig,
Expand Down
23 changes: 3 additions & 20 deletions materialize-bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/bradleyjkemp/cupaloy"
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
sql "github.com/estuary/connectors/materialize-sql"
pf "github.com/estuary/flow/go/protocols/flow"
pm "github.com/estuary/flow/go/protocols/materialize"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -73,18 +72,12 @@ func TestValidateAndApply(t *testing.T) {
t.Helper()
return dumpSchema(t, ctx, client, cfg, resourceConfig)
},
func(t *testing.T, materialization pf.Materialization) {
func(t *testing.T) {
t.Helper()

_, _ = client.query(ctx, fmt.Sprintf(
"drop table %s;",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table),
))

_, _ = client.query(ctx, fmt.Sprintf(
"delete from %s where materialization = 'test/sqlite'",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, sql.DefaultFlowMaterializations),
))
},
)
}
Expand Down Expand Up @@ -129,7 +122,7 @@ func TestValidateAndApplyMigrations(t *testing.T) {

// bigquery does not support more than 6 fractional second precision, and will fail if we try
// to insert a value with 9
for i, _ := range values {
for i := range values {
if keys[i] == "datetimeValue" {
values[i] = "'2024-01-01 01:01:01.111111'"
}
Expand Down Expand Up @@ -186,18 +179,12 @@ func TestValidateAndApplyMigrations(t *testing.T) {

return b.String()
},
func(t *testing.T, materialization pf.Materialization) {
func(t *testing.T) {
t.Helper()

_, _ = client.query(ctx, fmt.Sprintf(
"drop table %s;",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, resourceConfig.Table),
))

_, _ = client.query(ctx, fmt.Sprintf(
"delete from %s where materialization = 'test/sqlite'",
bqDialect.Identifier(cfg.ProjectID, cfg.Dataset, sql.DefaultFlowMaterializations),
))
},
)
}
Expand Down Expand Up @@ -287,10 +274,6 @@ func TestFencingCases(t *testing.T) {
}

func TestPrereqs(t *testing.T) {
// These tests assume that the configuration obtained from environment variables forms a valid
// config that could be used to materialize into Bigquery. Various parameters of the
// configuration are then manipulated to test assertions for incorrect configs.

// Due to the nature of configuring the connector with a JSON service account key and the
// difficulties in discriminating between error responses from BigQuery there's only a handful
// of cases that can be explicitly tested.
Expand Down
37 changes: 0 additions & 37 deletions materialize-bigquery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package connector

import (
"context"
dbSql "database/sql"
"encoding/base64"
"errors"
"fmt"
Expand All @@ -16,7 +15,6 @@ import (
storage "cloud.google.com/go/storage"
boilerplate "github.com/estuary/connectors/materialize-boilerplate"
sql "github.com/estuary/connectors/materialize-sql"
pf "github.com/estuary/flow/go/protocols/flow"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -113,11 +111,6 @@ func (c *client) InfoSchema(ctx context.Context, resourcePaths [][]string) (*boi
return is, nil
}

func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) error {
_, err := c.query(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...)
return err
}

func (c *client) CreateTable(ctx context.Context, tc sql.TableCreate) error {
_, err := c.query(ctx, tc.TableCreateSql)
return err
Expand Down Expand Up @@ -316,36 +309,6 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr {
return errs
}

func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (specB64, version string, err error) {
job, err := c.query(ctx, fmt.Sprintf(
"SELECT version, spec FROM %s WHERE materialization=%s;",
specs.Identifier,
specs.Keys[0].Placeholder,
), materialization.String())
if err != nil {
return "", "", err
}

var data struct {
Version string `bigquery:"version"`
SpecB64 string `bigquery:"spec"`
}

if err := c.fetchOne(ctx, job, &data); err == errNotFound {
return "", "", dbSql.ErrNoRows
} else if err != nil {
return "", "", err
}

log.WithFields(log.Fields{
"table": specs.Identifier,
"materialization": materialization.String(),
"version": data.Version,
}).Info("existing materialization spec loaded")

return data.SpecB64, data.Version, nil
}

func (c *client) ExecStatements(ctx context.Context, statements []string) error {
_, err := c.query(ctx, strings.Join(statements, "\n"))
return err
Expand Down
138 changes: 85 additions & 53 deletions materialize-bigquery/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,29 @@ func (c client) runQuery(ctx context.Context, query *bigquery.Query) (*bigquery.
var job *bigquery.Job
var err error
for attempt := 0; attempt < maxAttempts; attempt++ {
backoff *= math.Pow(2, 1+rand.Float64())
if backoff > maxBackoff {
backoff = maxBackoff
}
retryDelay := time.Duration(backoff * float64(time.Millisecond))

job, err = query.Run(ctx)
if err != nil {
return nil, fmt.Errorf("run: %w", err)
}

// Some queries may immediately fail, such as table alteration rate
// limits. For these, `job.Wait` will hang forever, so we must check to
// see if the job has already failed.
if initialStatus, err := job.Status(ctx); err != nil {
return nil, fmt.Errorf("getting initialStatus for job: %w", err)
} else if err := initialStatus.Err(); err != nil {
if err := maybeRetry(ctx, err, attempt, retryDelay, initialStatus); err != nil {
return nil, err
}
continue
}

// Weirdness ahead: if `err != nil`, then `status` might be nil. But if `err == nil`, then
// there might still have been an error reported by `status.Err()`. We always want both the
// err and the status so that we can check both. When `err != nil`, the status may still
Expand All @@ -67,61 +85,11 @@ func (c client) runQuery(ctx context.Context, query *bigquery.Query) (*bigquery.
if err == nil {
err = status.Err()
}

if err != nil {
// Is this a terminal error?

// We need to retry errors due to concurrent updates to the same table, but
// unfortunately there's no good way to identify such errors. The status code of that
// error is 400 and the status is "invalidQuery" (see
// https://cloud.google.com/bigquery/docs/error-messages), which also applies to several
// other scenarios like the instance being fenced off (from our use of RAISE), a table
// being referenced by the query not existing (which is for some reason not a 404), etc.

// Because of this we match on substrings in the error message to determine if a retry
// should be attempted. The two errors that may be encountered from concurrent shards
// operating in the same dataset have the error strings "Transaction is aborted due to
// concurrent update against table ..." (most common, seemingly), or "Could not
// serialize access to table ...". We retry only if the the error string contains these
// strings.

// Short term rate limit errors can also be retried using the same exponential backoff
// strategy. These kinds of errors can always be identified by their "Reason" being
// "jobRateLimitExceeded".
if e, ok := err.(*googleapi.Error); ok {
if strings.Contains(err.Error(), "Transaction is aborted due to concurrent update against table") ||
strings.Contains(err.Error(), "Could not serialize access to table") ||
strings.Contains(err.Error(), "The job encountered an error during execution. Retrying the job may solve the problem.") ||
(len(e.Errors) == 1 && e.Errors[0].Reason == "jobRateLimitExceeded") {
backoff *= math.Pow(2, 1+rand.Float64())
if backoff > maxBackoff {
backoff = maxBackoff
}
delay := time.Duration(backoff * float64(time.Millisecond))

ll := log.WithFields(log.Fields{
"attempt": attempt,
"jobStatus": status,
"error": err,
"delay": delay.String(),
})

if attempt > 10 {
ll.Info("job failed (will retry)")
} else {
ll.Debug("job failed (will retry)")
}

select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
continue
}
}
if err := maybeRetry(ctx, err, attempt, retryDelay, status); err != nil {
return nil, err
}

return nil, err
continue
}

// I think this is just documenting the assumption that the job must always be Done after
Expand All @@ -136,6 +104,70 @@ func (c client) runQuery(ctx context.Context, query *bigquery.Query) (*bigquery.
return job, fmt.Errorf("exhausted retries: %w", err)
}

// maybeRetry will return a `nil` error after a period of time if the provided
// `err` is retryable. Otherwise it will return the provided error immediately.
func maybeRetry(ctx context.Context, err error, attempt int, delay time.Duration, status *bigquery.JobStatus) error {
doDelay := func() error {
ll := log.WithFields(log.Fields{
"attempt": attempt,
"jobStatus": status,
"error": err,
"delay": delay.String(),
})

if attempt > 10 {
ll.Info("job failed (will retry)")
} else {
ll.Debug("job failed (will retry)")
}

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
return nil
}
}

// We need to retry errors due to concurrent updates to the same table, but
// unfortunately there's no good way to identify such errors. The status code of that
// error is 400 and the status is "invalidQuery" (see
// https://cloud.google.com/bigquery/docs/error-messages), which also applies to several
// other scenarios like the instance being fenced off (from our use of RAISE), a table
// being referenced by the query not existing (which is for some reason not a 404), etc.

// Because of this we match on substrings in the error message to determine if a retry
// should be attempted. The two errors that may be encountered from concurrent shards
// operating in the same dataset have the error strings "Transaction is aborted due to
// concurrent update against table ..." (most common, seemingly), or "Could not
// serialize access to table ...". We retry only if the the error string contains these
// strings.

// Short term rate limit errors can also be retried using the same exponential backoff
// strategy. These kinds of errors can always be identified by their "Reason" being
// "jobRateLimitExceeded".
if e, ok := err.(*googleapi.Error); ok {
if strings.Contains(err.Error(), "Transaction is aborted due to concurrent update against table") ||
strings.Contains(err.Error(), "Could not serialize access to table") ||
strings.Contains(err.Error(), "The job encountered an error during execution. Retrying the job may solve the problem.") ||
(len(e.Errors) == 1 && e.Errors[0].Reason == "jobRateLimitExceeded") {
return doDelay()
}
}

// A *bigquery.Error is returned if the job immediately has an error in its
// status. The only way I have found this to happen from our usage is from
// table alterations exceeding a rate limit.
if e, ok := err.(*bigquery.Error); ok {
if e.Reason == "rateLimitExceeded" {
return doDelay()
}
}

// Not a retryable error.
return err
}

// fetchOne will fetch one row of data from a job and scan it into dest.
func (c client) fetchOne(ctx context.Context, job *bigquery.Job, dest interface{}) error {

Expand Down
24 changes: 6 additions & 18 deletions materialize-boilerplate/.snapshots/TestApply
Original file line number Diff line number Diff line change
@@ -1,30 +1,18 @@
* new materialization:
create meta tables
create resource for collection "key/value"
put spec with version "aVersion"
create resource for ["key_value"]

* remove required field:
create meta tables
update resource for collection "key/value" [new projections: 0, newly nullable fields: 1, newly delta updates: false]
put spec with version "aVersion"
update resource for ["key_value"] [new projections: 0, newly nullable fields: 1, newly delta updates: false]

* add required field:
create meta tables
update resource for collection "key/value" [new projections: 1, newly nullable fields: 0, newly delta updates: false]
put spec with version "aVersion"
update resource for ["key_value"] [new projections: 1, newly nullable fields: 0, newly delta updates: false]

* add binding:
create meta tables
create resource for collection "extra/collection"
put spec with version "aVersion"
create resource for ["extra_collection"]

* replace binding:
create meta tables
delete resource ["key_value"]
create resource for collection "key/value"
put spec with version "aVersion"
create resource for ["key_value"]

* field is newly nullable:
create meta tables
update resource for collection "key/value" [new projections: 0, newly nullable fields: 1, newly delta updates: false]
put spec with version "aVersion"
update resource for ["key_value"] [new projections: 0, newly nullable fields: 1, newly delta updates: false]
Loading
Loading