diff --git a/bulkerlib/implementations/file_storage/abstract.go b/bulkerlib/implementations/file_storage/abstract.go index 63081ee..a2efe6a 100644 --- a/bulkerlib/implementations/file_storage/abstract.go +++ b/bulkerlib/implementations/file_storage/abstract.go @@ -124,7 +124,7 @@ func (ps *AbstractFileStorageStream) init(ctx context.Context) error { func (ps *AbstractFileStorageStream) preprocess(object types2.Object) (types2.Object, error) { if ps.flatten { - flatObject, err := implementations2.NewFlattener(false, false).FlattenObject(object, nil) + flatObject, err := implementations2.NewFlattener(nil, false, false).FlattenObject(object, nil) if err != nil { return nil, err } else { diff --git a/bulkerlib/implementations/flattener.go b/bulkerlib/implementations/flattener.go index fbe6a80..d36399a 100644 --- a/bulkerlib/implementations/flattener.go +++ b/bulkerlib/implementations/flattener.go @@ -16,13 +16,20 @@ type Flattener interface { } type FlattenerImpl struct { - omitNilValues bool + nameTransformer func(string) string + omitNilValues bool // stringifyObjects objects types like JSON, array will be stringified before sent to warehouse (warehouse will parse them back) stringifyObjects bool } -func NewFlattener(omitNilValues, stringifyObjects bool) Flattener { +func NewFlattener(nameTransformer func(string) string, omitNilValues, stringifyObjects bool) Flattener { + if nameTransformer == nil { + nameTransformer = func(s string) string { + return s + } + } return &FlattenerImpl{ + nameTransformer: nameTransformer, omitNilValues: omitNilValues, stringifyObjects: stringifyObjects, } @@ -64,9 +71,11 @@ func (f *FlattenerImpl) flatten(key string, value types.Object, destination type return nil } for el := value.Front(); el != nil; el = el.Next() { - newKey := el.Key + var newKey string if key != "" { - newKey = key + "_" + newKey + newKey = key + "_" + f.nameTransformer(el.Key) + } else { + newKey = f.nameTransformer(el.Key) } elv := el.Value if elv == nil { diff --git a/bulkerlib/implementations/sql/abstract.go b/bulkerlib/implementations/sql/abstract.go index 8cd63c1..4019ffb 100644 --- a/bulkerlib/implementations/sql/abstract.go +++ b/bulkerlib/implementations/sql/abstract.go @@ -9,6 +9,7 @@ import ( "github.com/jitsucom/bulker/jitsubase/logging" types2 "github.com/jitsucom/bulker/jitsubase/types" "github.com/jitsucom/bulker/jitsubase/utils" + "strings" "time" ) @@ -24,6 +25,7 @@ type AbstractSQLStream struct { options bulker.StreamOptions tableName string namespace string + nameTransformer func(string) string merge bool mergeWindow int omitNils bool @@ -49,6 +51,14 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu for _, option := range streamOptions { ps.options.Add(option) } + if bulker.ToSameCaseOption.Get(&ps.options) { + if p.Type() == SnowflakeBulkerTypeId { + ps.nameTransformer = strings.ToUpper + } else { + ps.nameTransformer = strings.ToLower + } + ps.tableName = ps.nameTransformer(tableName) + } ps.merge = bulker.DeduplicateOption.Get(&ps.options) pkColumns := bulker.PrimaryKeyOption.Get(&ps.options) if ps.merge && pkColumns.Empty() { @@ -99,7 +109,7 @@ func (ps *AbstractSQLStream) preprocess(object types.Object) (*Table, types.Obje } } } - batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes, ps.omitNils, ps.sqlAdapter.StringifyObjects(), notFlatteningKeys) + batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes, ps.nameTransformer, ps.omitNils, ps.sqlAdapter.StringifyObjects(), notFlatteningKeys) if err != nil { return nil, nil, err } diff --git a/bulkerlib/implementations/sql/bulker_test.go b/bulkerlib/implementations/sql/bulker_test.go index 6138e91..5ab4394 100644 --- a/bulkerlib/implementations/sql/bulker_test.go +++ b/bulkerlib/implementations/sql/bulker_test.go @@ -254,12 +254,13 @@ type bulkerTestConfig struct { frozenTime time.Time } -func (c *bulkerTestConfig) getIdAndTableName(mode bulker.BulkMode) (id, tableName string) { +func (c *bulkerTestConfig) getIdAndTableName(mode bulker.BulkMode) (id, tableName, expectedTableName string) { tableName = c.tableName if tableName == "" { tableName = c.name } tableName = tableName + "_" + strings.ToLower(string(mode)) + expectedTableName = utils.DefaultString(c.expectedTable.Name, tableName) id = fmt.Sprintf("%s_%s", c.config.BulkerType, tableName) return } @@ -494,19 +495,19 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) sqlAdapter, ok := blk.(SQLAdapter) reqr.True(ok) ctx := context.Background() - id, tableName := testConfig.getIdAndTableName(mode) + id, tableName, expectedTableName := testConfig.getIdAndTableName(mode) namespace := utils.DefaultString(testConfig.namespace, sqlAdapter.DefaultNamespace()) err = sqlAdapter.InitDatabase(ctx) PostStep("init_database", testConfig, mode, reqr, err) //clean up in case of previous test failure if !testConfig.leaveResultingTable && !forceLeaveResultingTables { - _ = sqlAdapter.DropTable(ctx, namespace, tableName, true) + _ = sqlAdapter.DropTable(ctx, namespace, expectedTableName, true) //PostStep("pre_cleanup", testConfig, mode, reqr, err) } //clean up after test run if !testConfig.leaveResultingTable && !forceLeaveResultingTables { defer func() { - _ = sqlAdapter.DropTable(ctx, namespace, tableName, true) + _ = sqlAdapter.DropTable(ctx, namespace, expectedTableName, true) }() } stream, err := blk.CreateStream(id, tableName, mode, testConfig.streamOptions...) @@ -565,7 +566,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) //PostStep("state_lasterror", testConfig, mode, reqr, state.LastError) if testConfig.expectedTable.Columns.Len() > 0 { //Check table schema - table, err := sqlAdapter.GetTableSchema(ctx, namespace, tableName) + table, err := sqlAdapter.GetTableSchema(ctx, namespace, expectedTableName) PostStep("get_table", testConfig, mode, reqr, err) switch testConfig.expectedTableTypeChecking { case TypeCheckingDisabled: @@ -590,12 +591,34 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) el.Value = types2.SQLColumn{Type: el.Value.Type} } } + expectedPKFields := types.NewOrderedSet[string]() + if len(testConfig.expectedTable.PKFields) > 0 { + expectedPKFields.PutAll(testConfig.expectedTable.PKFields) + } + + table.PrimaryKeyName = "" + expectedTable := &Table{ + Name: expectedTableName, + Namespace: utils.Nvl(testConfig.expectedTable.Namespace, namespace), + PrimaryKeyName: "", + PKFields: expectedPKFields, + Columns: testConfig.expectedTable.Columns, + // don't check this yet + Partition: table.Partition, + // don't check this yet + TimestampColumn: table.TimestampColumn, + } + // don't check table name if not explicitly set + if testConfig.expectedTable.Name == "" { + table.Name = "" + expectedTable.Name = "" + } if !testConfig.expectedTableCaseChecking { newColumns := NewColumns() - for el := testConfig.expectedTable.Columns.Front(); el != nil; el = el.Next() { + for el := expectedTable.Columns.Front(); el != nil; el = el.Next() { newColumns.Set(strings.ToLower(el.Key), el.Value) } - testConfig.expectedTable.Columns = newColumns + expectedTable.Columns = newColumns newColumns = NewColumns() for el := table.Columns.Front(); el != nil; el = el.Next() { newColumns.Set(strings.ToLower(el.Key), el.Value) @@ -603,40 +626,18 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) table.Columns = newColumns newPKFields := types.NewOrderedSet[string]() - for _, k := range testConfig.expectedTable.PKFields { + expectedTable.PKFields.ForEach(func(k string) { newPKFields.Put(strings.ToLower(k)) - } - testConfig.expectedTable.PKFields = newPKFields.ToSlice() + }) + expectedTable.PKFields = newPKFields newPKFields = types.NewOrderedSet[string]() table.PKFields.ForEach(func(k string) { newPKFields.Put(strings.ToLower(k)) }) table.PKFields = newPKFields - testConfig.expectedTable.Name = strings.ToLower(testConfig.expectedTable.Name) table.Name = strings.ToLower(table.Name) - - table.PrimaryKeyName = strings.ToLower(table.PrimaryKeyName) - } - expectedPKFields := types.NewOrderedSet[string]() - if len(testConfig.expectedTable.PKFields) > 0 { - expectedPKFields.PutAll(testConfig.expectedTable.PKFields) - } - // don't check table name if not explicitly set - if testConfig.expectedTable.Name == "" { - table.Name = "" - } - table.PrimaryKeyName = "" - expectedTable := &Table{ - Name: testConfig.expectedTable.Name, - Namespace: utils.Nvl(testConfig.expectedTable.Namespace, namespace), - PrimaryKeyName: "", - PKFields: expectedPKFields, - Columns: testConfig.expectedTable.Columns, - // don't check this yet - Partition: table.Partition, - // don't check this yet - TimestampColumn: table.TimestampColumn, + expectedTable.Name = strings.ToLower(expectedTable.Name) } actualColumns := table.Columns expectedColumns := expectedTable.Columns @@ -649,7 +650,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode) if testConfig.expectedRowsCount != nil || testConfig.expectedRows != nil { time.Sleep(1 * time.Second) //Check rows count and rows data when provided - rows, err := sqlAdapter.Select(ctx, namespace, tableName, nil, testConfig.orderBy) + rows, err := sqlAdapter.Select(ctx, namespace, expectedTableName, nil, testConfig.orderBy) PostStep("select_result", testConfig, mode, reqr, err) if testConfig.expectedRows == nil { reqr.Equal(testConfig.expectedRowsCount, len(rows)) @@ -694,7 +695,7 @@ func adaptConfig(t *testing.T, testConfig *bulkerTestConfig, mode bulker.BulkMod t.Fatalf("test config error: expected table must have a 'name' column of string type to guess what type to expect for %s column", PartitonIdKeyword) } } - newExpectedTable := ExpectedTable{Columns: NewColumns(), PKFields: slices.Clone(testConfig.expectedTable.PKFields)} + newExpectedTable := ExpectedTable{Name: testConfig.expectedTable.Name, Columns: NewColumns(), PKFields: slices.Clone(testConfig.expectedTable.PKFields)} newExpectedTable.Columns.Set(PartitonIdKeyword, textColumn) newExpectedTable.Columns.SetAll(testConfig.expectedTable.Columns) testConfig.expectedTable = newExpectedTable diff --git a/bulkerlib/implementations/sql/naming_test.go b/bulkerlib/implementations/sql/naming_test.go index 8705be2..029ded2 100644 --- a/bulkerlib/implementations/sql/naming_test.go +++ b/bulkerlib/implementations/sql/naming_test.go @@ -53,6 +53,34 @@ func TestNaming(t *testing.T) { }, configIds: []string{SnowflakeBulkerTypeId}, }, + { + name: "naming_test1_postgres_samecase", + tableName: "StrangeTableName", + modes: []bulker.BulkMode{bulker.ReplaceTable}, + dataFile: "test_data/identifiers.ndjson", + expectedRowsCount: 1, + expectedTableCaseChecking: true, + expectedTable: ExpectedTable{ + Name: "strangetablename_replace_table", + Columns: justColumns("id", "name", "_timestamp", "column_c16da609b86c01f16a2c609eac4ccb0c", "column_12b241e808ae6c964a5bb9f1c012e63d", "秒速_センチメートル", "université français", "странное имя", "test name_ drop database public_ select 1 from dual_", "test name", "1test_name", "2", "lorem_ipsum_dolor_sit_amet_consectetur_adipiscing_elit_sed_do_e", "camelcase", "int", "user", "select", "__root__", "hash", "_unnamed"), + }, + streamOptions: []bulker.StreamOption{bulker.WithToSameCase()}, + configIds: []string{PostgresBulkerTypeId}, + }, + { + name: "naming_test1_snowflake_samecase", + tableName: "StrangeTableName", + modes: []bulker.BulkMode{bulker.Batch}, + dataFile: "test_data/identifiers.ndjson", + expectedTableCaseChecking: true, + expectedRowsCount: 1, + expectedTable: ExpectedTable{ + Name: "STRANGETABLENAME_BATCH", + Columns: justColumns("ID", "NAME", "_TIMESTAMP", "COLUMN_C16DA609B86C01F16A2C609EAC4CCB0C", "COLUMN_12B241E808AE6C964A5BB9F1C012E63D", "秒速_センチメートル", "UNIVERSITÉ FRANÇAIS", "СТРАННОЕ ИМЯ", "TEST NAME_ DROP DATABASE PUBLIC_ SELECT 1 FROM DUAL_", "TEST NAME", "1TEST_NAME", "2", "LOREM_IPSUM_DOLOR_SIT_AMET_CONSECTETUR_ADIPISCING_ELIT_SED_DO_EIUSMOD_TEMPOR_INCIDIDUNT_UT_LABORE_ET_DOLORE_MAGNA_ALIQUA_UT_ENIM_AD_MINIM_VENIAM_QUIS_NOSTRUD_EXERCITATION_ULLAMCO_LABORIS_NISI_UT_ALIQUIP_EX_EA_COMMODO_CONSEQUAT", "CAMELCASE", "INT", "USER", "SELECT", "__ROOT__", "HASH", "_UNNAMED"), + }, + configIds: []string{SnowflakeBulkerTypeId}, + streamOptions: []bulker.StreamOption{bulker.WithToSameCase()}, + }, { name: "naming_test1_case_bigquery", tableName: "Strange Table Name; DROP DATABASE public;", diff --git a/bulkerlib/implementations/sql/processor.go b/bulkerlib/implementations/sql/processor.go index 64acdec..435ce83 100644 --- a/bulkerlib/implementations/sql/processor.go +++ b/bulkerlib/implementations/sql/processor.go @@ -12,7 +12,7 @@ import ( // ProcessEvents processes events objects without applying mapping rules // returns table headerm array of processed objects // or error if at least 1 was occurred -func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, omitNils bool, stringifyObjects bool, notFlatteningKeys types2.Set[string]) (*TypesHeader, types.Object, error) { +func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, nameTransformer func(string) string, omitNils bool, stringifyObjects bool, notFlatteningKeys types2.Set[string]) (*TypesHeader, types.Object, error) { sqlTypesHints, err := extractSQLTypesHints(event) if err != nil { return nil, nil, err @@ -28,7 +28,7 @@ func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTy notFlatteningKeys.Put(key) } } - flatObject, err := implementations.NewFlattener(omitNils, stringifyObjects).FlattenObject(event, notFlatteningKeys) + flatObject, err := implementations.NewFlattener(nameTransformer, omitNils, stringifyObjects).FlattenObject(event, notFlatteningKeys) if err != nil { return nil, nil, err } diff --git a/bulkerlib/implementations/sql/replacepartition_stream.go b/bulkerlib/implementations/sql/replacepartition_stream.go index ea2f7f2..2d317ec 100644 --- a/bulkerlib/implementations/sql/replacepartition_stream.go +++ b/bulkerlib/implementations/sql/replacepartition_stream.go @@ -39,7 +39,7 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream if ps.schemaFromOptions != nil { ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object) } - tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405")) + tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(ps.tableName, 47), time.Now().Format("060102150405")) return &Table{ Namespace: p.TmpNamespace(ps.namespace), Name: tmpTableName, diff --git a/bulkerlib/implementations/sql/transactional_stream.go b/bulkerlib/implementations/sql/transactional_stream.go index 2ef71b5..5ca88bc 100644 --- a/bulkerlib/implementations/sql/transactional_stream.go +++ b/bulkerlib/implementations/sql/transactional_stream.go @@ -30,7 +30,7 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt if ps.schemaFromOptions != nil { ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object) } - tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405")) + tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(ps.tableName, 47), time.Now().Format("060102150405")) return &Table{ Namespace: p.TmpNamespace(ps.namespace), Name: tmpTableName, diff --git a/bulkerlib/options.go b/bulkerlib/options.go index a17b241..4715498 100644 --- a/bulkerlib/options.go +++ b/bulkerlib/options.go @@ -99,6 +99,13 @@ var ( ParseFunc: utils.ParseString, } + // ToSameCaseOption - when true all fields and tables name will be converted to the same case (lowercase for most db, uppercase for snowflake) + ToSameCaseOption = ImplementationOption[bool]{ + Key: "toSameCase", + DefaultValue: false, + ParseFunc: utils.ParseBool, + } + // DiscriminatorFieldOption - array represents path to the object property. when deduplicate is true, and multiple rows has the same primary key, row with the highest value of this field will be selected DiscriminatorFieldOption = ImplementationOption[[]string]{ Key: "discriminatorField", @@ -274,3 +281,7 @@ func WithSchema(schema types.Schema) StreamOption { func WithNamespace(namespace string) StreamOption { return WithOption(&NamespaceOption, namespace) } + +func WithToSameCase() StreamOption { + return WithOption(&ToSameCaseOption, true) +}