Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1.4.4 #512

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
18 changes: 12 additions & 6 deletions core/dbio/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,9 @@ func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns) {
col.Constraint = fc.Constraint
}

col.Stats.MaxLen = colType.Length
if colType.Length > 0 {
col.Stats.MaxLen = colType.Length
}
col.Stats.MaxDecLen = 0

// if length is provided, set as string if less than 4000
Expand All @@ -1416,12 +1418,14 @@ func SQLColumns(colTypes []ColumnType, conn Connection) (columns iop.Columns) {
}

if colType.IsSourced() || col.Sourced {
if col.IsString() && g.In(conn.GetType(), dbio.TypeDbSQLServer, dbio.TypeDbSnowflake, dbio.TypeDbOracle, dbio.TypeDbPostgres, dbio.TypeDbRedshift) {
if col.IsString() && g.In(conn.GetType(), dbio.TypeDbSQLServer, dbio.TypeDbAzure, dbio.TypeDbAzureDWH, dbio.TypeDbSnowflake, dbio.TypeDbOracle, dbio.TypeDbPostgres, dbio.TypeDbRedshift) {
col.Sourced = true
col.DbPrecision = colType.Length
if colType.Length > 0 {
col.DbPrecision = colType.Length
}
}

if col.IsNumber() && g.In(conn.GetType(), dbio.TypeDbSQLServer, dbio.TypeDbSnowflake) {
if col.IsNumber() && g.In(conn.GetType(), dbio.TypeDbSQLServer, dbio.TypeDbAzure, dbio.TypeDbAzureDWH, dbio.TypeDbSnowflake) {
col.Sourced = true
col.DbPrecision = colType.Precision
col.DbScale = colType.Scale
Expand Down Expand Up @@ -1551,6 +1555,7 @@ func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns i
Name: cast.ToString(rec["column_name"]),
DatabaseTypeName: cast.ToString(rec["data_type"]),
Precision: cast.ToInt(rec["precision"]),
Length: cast.ToInt(cast.ToInt(rec["maximum_length"])),
Scale: cast.ToInt(rec["scale"]),
Sourced: true,
})
Expand All @@ -1567,6 +1572,7 @@ func (conn *BaseConn) GetTableColumns(table *Table, fields ...string) (columns i
DatabaseTypeName: cast.ToString(rec["data_type"]),
Precision: cast.ToInt(rec["precision"]),
Scale: cast.ToInt(rec["scale"]),
Length: cast.ToInt(cast.ToInt(rec["maximum_length"])),
}
})

Expand Down Expand Up @@ -2813,7 +2819,7 @@ func GetOptimizeTableStatements(conn Connection, table *Table, newColumns iop.Co
oldColName := conn.Self().Quote(colNameTemp)
newColName := conn.Self().Quote(col.Name)

if g.In(conn.GetType(), dbio.TypeDbSQLServer) {
if g.In(conn.GetType(), dbio.TypeDbSQLServer, dbio.TypeDbAzure, dbio.TypeDbAzureDWH) {
tableName = conn.Unquote(table.FullName())
oldColName = colNameTemp
newColName = col.Name
Expand Down Expand Up @@ -2961,7 +2967,7 @@ func (conn *BaseConn) CompareChecksums(tableName string, columns iop.Columns) (e
} else if checksum1 != checksum2 {
if refCol.Type != col.Type {
// don't compare
} else if refCol.IsString() && conn.GetType() == dbio.TypeDbSQLServer && checksum2 >= checksum1 {
} else if refCol.IsString() && g.In(conn.GetType(), dbio.TypeDbSQLServer, dbio.TypeDbAzure, dbio.TypeDbAzureDWH) && checksum2 >= checksum1 {
// datalength can return higher counts since it counts bytes
} else if refCol.IsDatetime() && conn.GetType() == dbio.TypeDbSQLite && checksum1/1000 == checksum2 {
// sqlite can only handle timestamps up to milliseconds
Expand Down
29 changes: 21 additions & 8 deletions core/dbio/database/database_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,39 +95,52 @@ func (conn *BigQueryConn) Init() error {
}

func (conn *BigQueryConn) getNewClient(timeOut ...int) (client *bigquery.Client, err error) {
var authOption option.ClientOption
var authOptions []option.ClientOption
var credJsonBody string

to := 15
if len(timeOut) > 0 {
to = timeOut[0]
}

// Add specified scopes for BigQuery such as:
// "https://www.googleapis.com/auth/drive"
// "https://www.googleapis.com/auth/spreadsheets"
scopes := []string{"https://www.googleapis.com/auth/bigquery"}
if val := conn.GetProp("extra_scopes"); val != "" {
var extraScopes []string
g.Unmarshal(val, &extraScopes)
scopes = append(scopes, extraScopes...)
}

if val := conn.GetProp("GC_KEY_BODY"); val != "" {
credJsonBody = val
authOption = option.WithCredentialsJSON([]byte(val))
authOptions = append(authOptions, option.WithCredentialsJSON([]byte(val)))
authOptions = append(authOptions, option.WithScopes(scopes...))
} else if val := conn.GetProp("GC_KEY_FILE"); val != "" {
authOption = option.WithCredentialsFile(val)
authOptions = append(authOptions, option.WithCredentialsFile(val))
authOptions = append(authOptions, option.WithScopes(scopes...))
b, err := os.ReadFile(val)
if err != nil {
return client, g.Error(err, "could not read google cloud key file")
}
credJsonBody = string(b)
} else if val := conn.GetProp("GC_CRED_API_KEY"); val != "" {
authOption = option.WithAPIKey(val)
authOptions = append(authOptions, option.WithAPIKey(val))
} else if val := conn.GetProp("GOOGLE_APPLICATION_CREDENTIALS"); val != "" {
authOption = option.WithCredentialsFile(val)
authOptions = append(authOptions, option.WithCredentialsFile(val))
authOptions = append(authOptions, option.WithScopes(scopes...))
b, err := os.ReadFile(val)
if err != nil {
return client, g.Error(err, "could not read google cloud key file")
}
credJsonBody = string(b)
} else {
creds, err := google.FindDefaultCredentials(conn.BaseConn.Context().Ctx)
creds, err := google.FindDefaultCredentials(conn.BaseConn.Context().Ctx, scopes...)
if err != nil {
return client, g.Error(err, "No Google credentials provided or could not find Application Default Credentials.")
}
authOption = option.WithCredentials(creds)
authOptions = append(authOptions, option.WithCredentials(creds))
}

if conn.ProjectID == "" && credJsonBody != "" {
Expand All @@ -139,7 +152,7 @@ func (conn *BigQueryConn) getNewClient(timeOut ...int) (client *bigquery.Client,
ctx, cancel := context.WithTimeout(conn.BaseConn.Context().Ctx, time.Duration(to)*time.Second)
defer cancel()

client, err = bigquery.NewClient(ctx, conn.ProjectID, authOption)
client, err = bigquery.NewClient(ctx, conn.ProjectID, authOptions...)
if err != nil {
return nil, g.Error(err, "Failed to create BigQuery client")
}
Expand Down
6 changes: 5 additions & 1 deletion core/dbio/database/database_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,11 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui
return 0, err
}

defer func() { env.RemoveLocalTempFile(dataPath) }()
defer func() {
file.Close()
env.RemoveLocalTempFile(dataPath)
}()

defer func() { env.RemoveLocalTempFile(logPath) }()
}

Expand Down
3 changes: 1 addition & 2 deletions core/dbio/filesys/fs_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,7 @@ func (fs *LocalFileSysClient) List(uri string) (nodes FileNodes, err error) {

files, err := os.ReadDir(path)
if err != nil {
err = g.Error(err, "Error listing "+path)
return
return nodes, nil // should not error if path doesn't exist
}

// path is dir
Expand Down
4 changes: 2 additions & 2 deletions core/dbio/iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2090,7 +2090,7 @@ func (ds *Datastream) NewCsvReaderChnl(sc StreamConfig) (readerChn chan *BatchRe

// new reader
pipeR, pipeW = io.Pipe()
w = csv.NewWriter(pipeW)
w = csv.NewWriterSize(pipeW, 40960*10)
w.Comma = ','
if sp.Config.Delimiter != "" {
w.Comma = []rune(sp.Config.Delimiter)[0]
Expand Down Expand Up @@ -2568,7 +2568,7 @@ func (ds *Datastream) NewCsvReader(sc StreamConfig) *io.PipeReader {
}

c := int64(0) // local counter
w := csv.NewWriter(pipeW)
w := csv.NewWriterSize(pipeW, 40960*10)
w.Comma = ','
if sp.Config.Delimiter != "" {
w.Comma = []rune(sp.Config.Delimiter)[0]
Expand Down
62 changes: 61 additions & 1 deletion core/dbio/iop/stream_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,67 @@ type Transformers struct {
func NewTransformers() Transformers {
win16be := encUnicode.UTF16(encUnicode.BigEndian, encUnicode.IgnoreBOM)
return Transformers{
Accent: transform.Chain(norm.NFD, runes.Remove(runes.In(unicode.Mn)), norm.NFC),
Accent: transform.Chain(
norm.NFD,
runes.Remove(runes.In(unicode.Mn)),
runes.Map(func(r rune) rune {
switch r {
// Polish special characters
case 'Ł', 'Ɫ':
return 'L'
case 'ł':
return 'l'
// Other special characters and their variations
case 'Æ', 'Ǽ':
return 'A'
case 'æ', 'ǽ':
return 'a'
case 'Ø', 'Ǿ':
return 'O'
case 'ø', 'ǿ':
return 'o'
case 'Þ':
return 'T'
case 'þ':
return 't'
case 'Ð':
return 'D'
case 'ð':
return 'd'
case 'ß', 'ẞ':
return 's'
case 'Œ':
return 'O'
case 'œ':
return 'o'
case 'IJ':
return 'I'
case 'ij':
return 'i'
case 'ƒ':
return 'f'
case 'Ŋ':
return 'N'
case 'ŋ':
return 'n'
case 'Ɲ':
return 'N'
case 'ɲ':
return 'n'
case 'Ƴ':
return 'Y'
case 'ƴ':
return 'y'
case 'Ɣ':
return 'G'
case 'ɣ':
return 'g'
default:
return r
}
}),
norm.NFC,
),

DecodeUTF8: encUnicode.UTF8.NewDecoder(),
DecodeUTF8BOM: encUnicode.UTF8BOM.NewDecoder(),
Expand Down
7 changes: 6 additions & 1 deletion core/dbio/templates/azuresql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ metadata:
order by table_schema, table_name

columns: |
select column_name, data_type
select
column_name,
data_type,
character_maximum_length as maximum_length,
numeric_precision as precision,
numeric_scale as scale
from INFORMATION_SCHEMA.COLUMNS
where table_schema = '{schema}'
and table_name = '{table}'
Expand Down
7 changes: 6 additions & 1 deletion core/dbio/templates/sqlserver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ metadata:
order by table_schema, table_name

columns: |
select column_name, data_type
select
column_name,
data_type,
character_maximum_length as maximum_length,
numeric_precision as precision,
numeric_scale as scale
from INFORMATION_SCHEMA.COLUMNS
where table_schema = '{schema}'
and table_name = '{table}'
Expand Down
2 changes: 1 addition & 1 deletion core/dbio/templates/types_general_to_native.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ string varchar() varchar() varchar() varchar() nvarchar() nvarchar() nvarchar()
text clob text mediumtext mediumtext nvarchar(max) nvarchar(max) nvarchar(max) varchar(65535) text text text string Nullable(String) text text varchar(65533) varchar nullable(string)
timestamp timestamp(9) timestamp datetime(6) datetime(6) datetime2 datetime2 datetime2 timestamp timestamp_ntz text text timestamp Nullable(DateTime64(6)) timestamp timestamp datetime timestamp nullable(datetime64(6))
timestampz timestamp(9) with time zone timestamptz datetime(6) datetime(6) datetimeoffset datetimeoffset datetimeoffset timestamptz timestamp_tz text text timestamp Nullable(DateTime64(6)) timestamptz timestamptz datetime timestamp with time zone nullable(datetime64(6))
float float double precision double double float float float double precision float real real float64 Nullable(Float64) float float double double nullable(float64)
float double double precision double double float float float double precision float real real float64 Nullable(Float64) double double double double nullable(float64)
time varchar() varchar() varchar() varchar() varchar() varchar() varchar() varchar(65535) varchar text text string Nullable(String) time time varchar() varchar nullable(string)
timez varchar() varchar() varchar() varchar() varchar() varchar() varchar() varchar(65535) varchar text text string Nullable(String) time time varchar() varchar nullable(string)
uuid varchar(36) uuid varchar(36) varchar(36) uniqueidentifier uniqueidentifier uniqueidentifier varchar(36) varchar(36) text text string Nullable(UUID) uuid uuid varchar(36) uuid nullable(string)
4 changes: 3 additions & 1 deletion core/sling/task_run_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func (t *TaskExecution) ReadFromDB(cfg *Config, srcConn database.Connection) (df
// get source columns to match update-key
// in case column casing needs adjustment
updateCol := sTable.Columns.GetColumn(cfg.Source.UpdateKey)
if updateCol != nil && updateCol.Name != "" {
if updateCol == nil {
return df, g.Error("did not find update_key: %s", cfg.Source.UpdateKey)
} else if updateCol.Name != "" {
cfg.Source.UpdateKey = updateCol.Name // overwrite with correct casing
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/elastic/go-elasticsearch/v8 v8.17.0
github.com/fatih/color v1.17.0
github.com/flarco/bigquery v0.0.9
github.com/flarco/g v0.1.136
github.com/flarco/g v0.1.138
github.com/getsentry/sentry-go v0.27.0
github.com/go-sql-driver/mysql v1.8.1
github.com/gobwas/glob v0.2.3
Expand Down
Loading