Skip to content

Commit

Permalink
snowpipe: review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj committed Jan 7, 2025
1 parent 9572cd1 commit 620afd2
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 56 deletions.
19 changes: 10 additions & 9 deletions internal/impl/snowflake/output_snowflake_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ The input to this mapping is an object with the value and the name of the new co
service.NewStringField(ssoFieldChannelPrefix).
Description(`The prefix to use when creating a channel name.
Duplicate channel names will result in errors and prevent multiple instances of Redpanda Connect from writing at the same time.
By default if neither `+"`"+ssoFieldChannelPrefix+"`or `"+ssoFieldChannelName+` is specified then the output will create a channel name that is based on the table FQN so there will only be a single stream per table.
By default if neither `+"`"+ssoFieldChannelPrefix+"` or `"+ssoFieldChannelName+` is specified then the output will create a channel name that is based on the table FQN so there will only be a single stream per table.
At most `+"`max_in_flight`"+` channels will be opened.
Expand Down Expand Up @@ -191,7 +191,7 @@ input:
count: 50000
period: 45s
# Prevent multiple batches from being in flight at once, so that we never send
# batch while another batch is being retried, this is important to ensure that
# a batch while another batch is being retried, this is important to ensure that
# the Snowflake Snowpipe Streaming channel does not see older data - as it will
# assume that the older data is already committed.
checkpoint_limit: 1
Expand All @@ -201,7 +201,7 @@ output:
# only upload data exactly once, these are already lexicographically
# ordered.
offset_token: "${!@lsn}"
# Since we're sending a single ordered log, we can only send on thing
# Since we're sending a single ordered log, we can only send one thing
# at a time to ensure that we're properly incrementing our offset_token
# and only using a single channel at a time.
max_in_flight: 1
Expand All @@ -217,8 +217,8 @@ output:
"Ingesting data exactly once from Redpanda",
`How to ingest data from Redpanda with consumer groups, decode the schema using the schema registry, then write the corresponding data into Snowflake exactly once.
NOTE: If attempting to do exactly-once it's important that records are delivered in order to the output and correctly partitioned. Be sure to read the documentation for
channel_name and offset_token first. Removing the offset_token is a safer option that will instruct Redpanda Connect to use it's default at-least-once delivery model instead.`,
NOTE: If attempting to do exactly-once its important that records are delivered in order to the output and correctly partitioned. Be sure to read the documentation for
channel_name and offset_token first. Removing the offset_token is a safer option that will instruct Redpanda Connect to use its default at-least-once delivery model instead.`,
`
input:
redpanda_common:
Expand All @@ -240,7 +240,7 @@ pipeline:
output:
fallback:
- snowflake_streaming:
# To ensure that we write an ordered stream each partition in kafka gets it's own
# To ensure that we write an ordered stream each partition in kafka gets its own
# channel.
channel_name: "partition-${!@kafka_partition}"
# Ensure that our offsets are lexicographically sorted in string form by padding with
Expand Down Expand Up @@ -737,7 +737,7 @@ func (o *snowpipeStreamingOutput) WriteBatch(ctx context.Context, batch service.
}
continue // If creating the table succeeded, retry
}
needsMigrationErr := schemaMigrationNeededError{}
needsMigrationErr := &schemaMigrationNeededError{}
if !errors.As(err, &needsMigrationErr) {
return err
}
Expand All @@ -763,7 +763,8 @@ func (o *snowpipeStreamingOutput) createTable(ctx context.Context, batch service
return nil
}

func (o *snowpipeStreamingOutput) runMigration(ctx context.Context, needsMigrationErr schemaMigrationNeededError) error {
// runMigration requires the migration lock being held.
func (o *snowpipeStreamingOutput) runMigration(ctx context.Context, needsMigrationErr *schemaMigrationNeededError) error {
if err := needsMigrationErr.runMigration(ctx, o.schemaEvolver); err != nil {
return err
}
Expand Down Expand Up @@ -999,7 +1000,7 @@ func preprocessForExactlyOnce(
}

func wrapInsertError(err error) error {
if errors.Is(err, streaming.InvalidTimestampFormatError{}) {
if errors.Is(err, &streaming.InvalidTimestampFormatError{}) {
return fmt.Errorf("%w; if a custom format is required use a `%s` and bloblang functions `ts_parse` or `ts_strftime` to convert a custom format into a timestamp", err, ssoFieldMapping)
}
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/snowflake/pool/indexed.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

type (
// Indexed is essentially a pool where each object in the pool is explicitly retrivied by name.
// Indexed is essentially a pool where each object in the pool is explicitly retrieved by name.
Indexed[T any] interface {
// Acquire gets a named object T out of the pool if available, otherwise will create a new
// item using the given name.
Expand Down Expand Up @@ -103,9 +103,9 @@ func (p *indexedImpl[T]) Reset() {
func (p *indexedImpl[T]) Keys() []string {
keys := []string{}
_ = p.lock(context.Background())
defer p.unlock()
for k := range p.items {
keys = append(keys, k)
}
p.unlock()
return keys
}
26 changes: 13 additions & 13 deletions internal/impl/snowflake/schema_evolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,32 @@ type schemaMigrationNeededError struct {
runMigration func(ctx context.Context, evolver *snowpipeSchemaEvolver) error
}

func (schemaMigrationNeededError) Error() string {
func (*schemaMigrationNeededError) Error() string {
return "schema migration was required and the operation needs to be retried after the migration"
}

func asSchemaMigrationError(err error) (schemaMigrationNeededError, bool) {
nullColumnErr := streaming.NonNullColumnError{}
func asSchemaMigrationError(err error) (*schemaMigrationNeededError, bool) {
var nullColumnErr *streaming.NonNullColumnError
if errors.As(err, &nullColumnErr) {
// Return an error so that we release our read lock and can take the write lock
// to forcibly reopen all our channels to get a new schema.
return schemaMigrationNeededError{
return &schemaMigrationNeededError{
runMigration: func(ctx context.Context, evolver *snowpipeSchemaEvolver) error {
return evolver.MigrateNotNullColumn(ctx, nullColumnErr)
},
}, true
}
missingColumnErr := streaming.MissingColumnError{}
var missingColumnErr *streaming.MissingColumnError
if errors.As(err, &missingColumnErr) {
return schemaMigrationNeededError{
return &schemaMigrationNeededError{
runMigration: func(ctx context.Context, evolver *snowpipeSchemaEvolver) error {
return evolver.MigrateMissingColumn(ctx, missingColumnErr)
},
}, true
}
batchErr := streaming.BatchSchemaMismatchError[streaming.MissingColumnError]{}
var batchErr *streaming.BatchSchemaMismatchError[*streaming.MissingColumnError]
if errors.As(err, &batchErr) {
return schemaMigrationNeededError{
return &schemaMigrationNeededError{
runMigration: func(ctx context.Context, evolver *snowpipeSchemaEvolver) error {
for _, missingCol := range batchErr.Errors {
// TODO(rockwood): Consider a batch SQL statement that adds N columns at a time
Expand All @@ -62,7 +62,7 @@ func asSchemaMigrationError(err error) (schemaMigrationNeededError, bool) {
},
}, true
}
return schemaMigrationNeededError{}, false
return nil, false
}

type snowpipeSchemaEvolver struct {
Expand All @@ -73,7 +73,7 @@ type snowpipeSchemaEvolver struct {
db, schema, table, role string
}

func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(col streaming.MissingColumnError) (string, error) {
func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(col *streaming.MissingColumnError) (string, error) {
msg := service.NewMessage(nil)
msg.SetStructuredMut(map[string]any{
"name": col.RawName(),
Expand All @@ -94,7 +94,7 @@ func (o *snowpipeSchemaEvolver) ComputeMissingColumnType(col streaming.MissingCo
return columnType, nil
}

func (o *snowpipeSchemaEvolver) MigrateMissingColumn(ctx context.Context, col streaming.MissingColumnError) error {
func (o *snowpipeSchemaEvolver) MigrateMissingColumn(ctx context.Context, col *streaming.MissingColumnError) error {
columnType, err := o.ComputeMissingColumnType(col)
if err != nil {
return err
Expand All @@ -113,12 +113,12 @@ func (o *snowpipeSchemaEvolver) MigrateMissingColumn(ctx context.Context, col st
),
)
if err != nil {
o.logger.Warnf("unable to add new column, this maybe due to a race with another request, error: %s", err)
o.logger.Warnf("unable to add new column %s, this maybe due to a race with another request, error: %s", col.ColumnName(), err)
}
return nil
}

func (o *snowpipeSchemaEvolver) MigrateNotNullColumn(ctx context.Context, col streaming.NonNullColumnError) error {
func (o *snowpipeSchemaEvolver) MigrateNotNullColumn(ctx context.Context, col *streaming.NonNullColumnError) error {
o.logger.Infof("identified new schema - attempting to alter table to remove null constraint on column: %s", col.ColumnName())
err := o.RunSQLMigration(
ctx,
Expand Down
6 changes: 3 additions & 3 deletions internal/impl/snowflake/streaming/api_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type APIError struct {
Message string `json:"message"`
}

var _ error = APIError{}
var _ error = &APIError{}

// Error statisfies the Error interface
func (e APIError) Error() string {
func (e *APIError) Error() string {
msg := e.Message
if msg == "" {
msg = "(no message)"
Expand All @@ -34,6 +34,6 @@ func (e APIError) Error() string {

// IsTableNotExistsError returns true if the table does not exist (or the user is not authorized to see it).
func IsTableNotExistsError(err error) bool {
var restErr APIError
var restErr *APIError
return errors.As(err, &restErr) && restErr.StatusCode == responseTableNotExist
}
8 changes: 4 additions & 4 deletions internal/impl/snowflake/streaming/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ func messageToRow(msg *service.Message, out []any, nameToPosition map[string]int
if !ok {
return fmt.Errorf("expected object, got: %T", v)
}
var missingColumns []MissingColumnError
var missingColumns []*MissingColumnError
for k, v := range row {
idx, ok := nameToPosition[normalizeColumnName(k)]
if !ok {
if !allowExtraProperties && v != nil {
missingColumns = append(missingColumns, MissingColumnError{columnName: k, val: v})
missingColumns = append(missingColumns, &MissingColumnError{columnName: k, val: v})
}
continue
}
out[idx] = v
}
if len(missingColumns) > 0 {
return BatchSchemaMismatchError[MissingColumnError]{missingColumns}
return &BatchSchemaMismatchError[*MissingColumnError]{missingColumns}
}
return nil
}
Expand Down Expand Up @@ -94,7 +94,7 @@ func constructRowGroup(
err = t.converter.ValidateAndConvert(s, v, b)
if err != nil {
if errors.Is(err, errNullValue) {
return nil, nil, NonNullColumnError{t.column.Name}
return nil, nil, &NonNullColumnError{t.column.Name}
}
// There is not special typed error for a validation error, there really isn't
// anything we can do about it.
Expand Down
6 changes: 4 additions & 2 deletions internal/impl/snowflake/streaming/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
responseTableNotExist = 4
responseErrQueueFull = 7
responseErrRetryRequest = 10

partnerID = "RedpandaConnect_SnowpipeStreamingSDK"
)

type (
Expand Down Expand Up @@ -448,7 +450,7 @@ func (c *SnowflakeRestClient) doPost(ctx context.Context, url string, req any, r
}
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "application/json")
httpReq.Header.Set("User-Agent", fmt.Sprintf("RedpandaConnect_SnowpipeStreamingSDK/%v", c.version))
httpReq.Header.Set("User-Agent", fmt.Sprintf(partnerID+"/%v", c.version))
httpReq.Header.Set("X-Snowflake-Authorization-Token-Type", "KEYPAIR_JWT")
httpReq.Header.Set("Authorization", "Bearer "+c.cachedJWT.Load())
r, err := c.client.Do(httpReq)
Expand All @@ -467,7 +469,7 @@ func (c *SnowflakeRestClient) doPost(ctx context.Context, url string, req any, r
if r.StatusCode != 200 {
var restErr APIError
if unmarshalErr := json.Unmarshal(respBody, &restErr); unmarshalErr == nil && restErr.StatusCode != responseSuccess {
return nil, restErr
return nil, &restErr
}
return nil, fmt.Errorf("non successful status code (%d): %s", r.StatusCode, respBody)
}
Expand Down
32 changes: 16 additions & 16 deletions internal/impl/snowflake/streaming/schema_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ type SchemaMismatchError interface {
Value() any
}

var _ error = BatchSchemaMismatchError[SchemaMismatchError]{}
var _ error = &BatchSchemaMismatchError[SchemaMismatchError]{}

// BatchSchemaMismatchError is when multiple schema mismatch errors happen at once
type BatchSchemaMismatchError[T SchemaMismatchError] struct {
Errors []T
}

// Error implements the error interface
func (e BatchSchemaMismatchError[T]) Error() string {
func (e *BatchSchemaMismatchError[T]) Error() string {
errs := []error{}
for _, err := range e.Errors {
errs = append(errs, err)
}
return errors.Join(errs...).Error()
}

var _ error = NonNullColumnError{}
var _ SchemaMismatchError = NonNullColumnError{}
var _ error = &NonNullColumnError{}
var _ SchemaMismatchError = &NonNullColumnError{}

// NonNullColumnError occurs when a column with a NOT NULL constraint
// gets a value with a `NULL` value.
Expand All @@ -51,23 +51,23 @@ type NonNullColumnError struct {
}

// ColumnName returns the column name with the NOT NULL constraint
func (e NonNullColumnError) ColumnName() string {
func (e *NonNullColumnError) ColumnName() string {
// This name comes directly from the Snowflake API so I hope this is properly quoted...
return e.columnName
}

// Value returns nil
func (e NonNullColumnError) Value() any {
func (e *NonNullColumnError) Value() any {
return nil
}

// Error implements the error interface
func (e NonNullColumnError) Error() string {
func (e *NonNullColumnError) Error() string {
return fmt.Sprintf("column %q has a NOT NULL constraint and recieved a nil value", e.columnName)
}

var _ error = MissingColumnError{}
var _ SchemaMismatchError = MissingColumnError{}
var _ error = &MissingColumnError{}
var _ SchemaMismatchError = &MissingColumnError{}

// MissingColumnError occurs when a column that is not in the table is
// found on a record
Expand All @@ -77,31 +77,31 @@ type MissingColumnError struct {
}

// NewMissingColumnError creates a new MissingColumnError object
func NewMissingColumnError(rawName string, val any) MissingColumnError {
return MissingColumnError{rawName, val}
func NewMissingColumnError(rawName string, val any) *MissingColumnError {
return &MissingColumnError{rawName, val}
}

// ColumnName returns the column name of the data that was not in the table
//
// NOTE this is escaped, so it's valid to use this directly in a SQL statement
// but I wish that Snowflake would just allow `identifier` for ALTER column.
func (e MissingColumnError) ColumnName() string {
func (e *MissingColumnError) ColumnName() string {
return quoteColumnName(e.columnName)
}

// RawName is the unquoted name of the new column - DO NOT USE IN SQL!
// This is the more intutitve name for users in the mapping function
func (e MissingColumnError) RawName() string {
func (e *MissingColumnError) RawName() string {
return e.columnName
}

// Value returns the value that was associated with the missing column
func (e MissingColumnError) Value() any {
func (e *MissingColumnError) Value() any {
return e.val
}

// Error implements the error interface
func (e MissingColumnError) Error() string {
func (e *MissingColumnError) Error() string {
return fmt.Sprintf("new data %+v with the name %q does not have an associated column", e.val, e.columnName)
}

Expand All @@ -112,6 +112,6 @@ type InvalidTimestampFormatError struct {
}

// Error implements the error interface
func (e InvalidTimestampFormatError) Error() string {
func (e *InvalidTimestampFormatError) Error() string {
return fmt.Sprintf("unable to parse %s value from %q - string time values must be in RFC 3339 format", e.columnType, e.val)
}
5 changes: 1 addition & 4 deletions internal/impl/snowflake/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,11 +441,8 @@ func (c *SnowflakeIngestionChannel) InsertRows(ctx context.Context, batch servic
fullMD5Hash := md5.Sum(part.parquetFile)
err = backoff.Retry(func() error {
return uploader.upload(ctx, blobPath, part.parquetFile, fullMD5Hash[:], map[string]string{
"ingestclientname": "RedpandaConnect_SnowpipeStreamingSDK",
"ingestclientname": partnerID,
"ingestclientkey": c.clientPrefix,
// TODO(rockwood): It's not clear what this digest is used for,
// so we omit it so that we don't have to compute both the sha and md5
// "sfc-digest": string(sha256.Sum256(part.parquetFile)[:]),
})
}, backoff.WithMaxRetries(backoff.NewConstantBackOff(time.Second), 3))
if err != nil {
Expand Down
Loading

0 comments on commit 620afd2

Please sign in to comment.