Skip to content

Commit

Permalink
materialize-sql: create the checkpoints table after creating schemas
Browse files Browse the repository at this point in the history
In case the metadata schema doesn't exist, wait until after the missing schemas
have been created to try to create the checkpoints table. Otherwise creation of
the checkpoints table may fail if the metadata schema doesn't yet exist.
  • Loading branch information
williamhbaker committed Jan 23, 2025
1 parent 05ec092 commit a637d82
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions materialize-sql/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,22 +189,6 @@ func (d *Driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response
return nil, err
}

if endpoint.MetaCheckpoints != nil && !is.HasResource(endpoint.MetaCheckpoints.Path) {
if resolved, err := ResolveTable(*endpoint.MetaCheckpoints, endpoint.Dialect); err != nil {
return nil, err
} else if createStatement, err := RenderTableTemplate(resolved, endpoint.CreateTableTemplate); err != nil {
return nil, err
} else if err := client.CreateTable(ctx, TableCreate{
Table: resolved,
TableCreateSql: createStatement,
ResourceConfigJson: nil, // not applicable for meta tables
}); err != nil {
return nil, fmt.Errorf("creating checkpoints table: %w", err)
} else {
log.WithField("table", resolved.Identifier).Info("created checkpoints table")
}
}

if sm, ok := client.(SchemaManager); ok {
// Create any schemas that don't already exist, if the endpoint supports schemas.
existingSchemas, err := sm.ListSchemas(ctx)
Expand All @@ -227,6 +211,23 @@ func (d *Driver) Apply(ctx context.Context, req *pm.Request_Apply) (*pm.Response
}
}

if endpoint.MetaCheckpoints != nil && !is.HasResource(endpoint.MetaCheckpoints.Path) {
// Create the checkpoints table if it doesn't already exist.
if resolved, err := ResolveTable(*endpoint.MetaCheckpoints, endpoint.Dialect); err != nil {
return nil, err
} else if createStatement, err := RenderTableTemplate(resolved, endpoint.CreateTableTemplate); err != nil {
return nil, err
} else if err := client.CreateTable(ctx, TableCreate{
Table: resolved,
TableCreateSql: createStatement,
ResourceConfigJson: nil, // not applicable for meta tables
}); err != nil {
return nil, fmt.Errorf("creating checkpoints table: %w", err)
} else {
log.WithField("table", resolved.Identifier).Info("created checkpoints table")
}
}

return boilerplate.ApplyChanges(ctx, req, newSqlApplier(client, is, endpoint, constrainter{dialect: endpoint.Dialect}), is, endpoint.ConcurrentApply)
}

Expand Down

0 comments on commit a637d82

Please sign in to comment.