Skip to content

Commit

Permalink
bulker: toSameCase option
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Aug 15, 2024
1 parent e82741f commit 260d183
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 45 deletions.
2 changes: 1 addition & 1 deletion bulkerlib/implementations/file_storage/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (ps *AbstractFileStorageStream) init(ctx context.Context) error {

func (ps *AbstractFileStorageStream) preprocess(object types2.Object) (types2.Object, error) {
if ps.flatten {
flatObject, err := implementations2.NewFlattener(false, false).FlattenObject(object, nil)
flatObject, err := implementations2.NewFlattener(nil, false, false).FlattenObject(object, nil)
if err != nil {
return nil, err
} else {
Expand Down
17 changes: 13 additions & 4 deletions bulkerlib/implementations/flattener.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@ type Flattener interface {
}

type FlattenerImpl struct {
omitNilValues bool
nameTransformer func(string) string
omitNilValues bool
// stringifyObjects objects types like JSON, array will be stringified before sent to warehouse (warehouse will parse them back)
stringifyObjects bool
}

func NewFlattener(omitNilValues, stringifyObjects bool) Flattener {
func NewFlattener(nameTransformer func(string) string, omitNilValues, stringifyObjects bool) Flattener {
if nameTransformer == nil {
nameTransformer = func(s string) string {
return s
}
}
return &FlattenerImpl{
nameTransformer: nameTransformer,
omitNilValues: omitNilValues,
stringifyObjects: stringifyObjects,
}
Expand Down Expand Up @@ -64,9 +71,11 @@ func (f *FlattenerImpl) flatten(key string, value types.Object, destination type
return nil
}
for el := value.Front(); el != nil; el = el.Next() {
newKey := el.Key
var newKey string
if key != "" {
newKey = key + "_" + newKey
newKey = key + "_" + f.nameTransformer(el.Key)
} else {
newKey = f.nameTransformer(el.Key)
}
elv := el.Value
if elv == nil {
Expand Down
12 changes: 11 additions & 1 deletion bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/jitsucom/bulker/jitsubase/logging"
types2 "github.com/jitsucom/bulker/jitsubase/types"
"github.com/jitsucom/bulker/jitsubase/utils"
"strings"
"time"
)

Expand All @@ -24,6 +25,7 @@ type AbstractSQLStream struct {
options bulker.StreamOptions
tableName string
namespace string
nameTransformer func(string) string
merge bool
mergeWindow int
omitNils bool
Expand All @@ -49,6 +51,14 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
for _, option := range streamOptions {
ps.options.Add(option)
}
if bulker.ToSameCaseOption.Get(&ps.options) {
if p.Type() == SnowflakeBulkerTypeId {
ps.nameTransformer = strings.ToUpper
} else {
ps.nameTransformer = strings.ToLower
}
ps.tableName = ps.nameTransformer(tableName)
}
ps.merge = bulker.DeduplicateOption.Get(&ps.options)
pkColumns := bulker.PrimaryKeyOption.Get(&ps.options)
if ps.merge && pkColumns.Empty() {
Expand Down Expand Up @@ -99,7 +109,7 @@ func (ps *AbstractSQLStream) preprocess(object types.Object) (*Table, types.Obje
}
}
}
batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes, ps.omitNils, ps.sqlAdapter.StringifyObjects(), notFlatteningKeys)
batchHeader, processedObject, err := ProcessEvents(ps.tableName, object, ps.customTypes, ps.nameTransformer, ps.omitNils, ps.sqlAdapter.StringifyObjects(), notFlatteningKeys)
if err != nil {
return nil, nil, err
}
Expand Down
71 changes: 36 additions & 35 deletions bulkerlib/implementations/sql/bulker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,13 @@ type bulkerTestConfig struct {
frozenTime time.Time
}

func (c *bulkerTestConfig) getIdAndTableName(mode bulker.BulkMode) (id, tableName string) {
func (c *bulkerTestConfig) getIdAndTableName(mode bulker.BulkMode) (id, tableName, expectedTableName string) {
tableName = c.tableName
if tableName == "" {
tableName = c.name
}
tableName = tableName + "_" + strings.ToLower(string(mode))
expectedTableName = utils.DefaultString(c.expectedTable.Name, tableName)
id = fmt.Sprintf("%s_%s", c.config.BulkerType, tableName)
return
}
Expand Down Expand Up @@ -494,19 +495,19 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
sqlAdapter, ok := blk.(SQLAdapter)
reqr.True(ok)
ctx := context.Background()
id, tableName := testConfig.getIdAndTableName(mode)
id, tableName, expectedTableName := testConfig.getIdAndTableName(mode)
namespace := utils.DefaultString(testConfig.namespace, sqlAdapter.DefaultNamespace())
err = sqlAdapter.InitDatabase(ctx)
PostStep("init_database", testConfig, mode, reqr, err)
//clean up in case of previous test failure
if !testConfig.leaveResultingTable && !forceLeaveResultingTables {
_ = sqlAdapter.DropTable(ctx, namespace, tableName, true)
_ = sqlAdapter.DropTable(ctx, namespace, expectedTableName, true)
//PostStep("pre_cleanup", testConfig, mode, reqr, err)
}
//clean up after test run
if !testConfig.leaveResultingTable && !forceLeaveResultingTables {
defer func() {
_ = sqlAdapter.DropTable(ctx, namespace, tableName, true)
_ = sqlAdapter.DropTable(ctx, namespace, expectedTableName, true)
}()
}
stream, err := blk.CreateStream(id, tableName, mode, testConfig.streamOptions...)
Expand Down Expand Up @@ -565,7 +566,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
//PostStep("state_lasterror", testConfig, mode, reqr, state.LastError)
if testConfig.expectedTable.Columns.Len() > 0 {
//Check table schema
table, err := sqlAdapter.GetTableSchema(ctx, namespace, tableName)
table, err := sqlAdapter.GetTableSchema(ctx, namespace, expectedTableName)
PostStep("get_table", testConfig, mode, reqr, err)
switch testConfig.expectedTableTypeChecking {
case TypeCheckingDisabled:
Expand All @@ -590,53 +591,53 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
el.Value = types2.SQLColumn{Type: el.Value.Type}
}
}
expectedPKFields := types.NewOrderedSet[string]()
if len(testConfig.expectedTable.PKFields) > 0 {
expectedPKFields.PutAll(testConfig.expectedTable.PKFields)
}

table.PrimaryKeyName = ""
expectedTable := &Table{
Name: expectedTableName,
Namespace: utils.Nvl(testConfig.expectedTable.Namespace, namespace),
PrimaryKeyName: "",
PKFields: expectedPKFields,
Columns: testConfig.expectedTable.Columns,
// don't check this yet
Partition: table.Partition,
// don't check this yet
TimestampColumn: table.TimestampColumn,
}
// don't check table name if not explicitly set
if testConfig.expectedTable.Name == "" {
table.Name = ""
expectedTable.Name = ""
}
if !testConfig.expectedTableCaseChecking {
newColumns := NewColumns()
for el := testConfig.expectedTable.Columns.Front(); el != nil; el = el.Next() {
for el := expectedTable.Columns.Front(); el != nil; el = el.Next() {
newColumns.Set(strings.ToLower(el.Key), el.Value)
}
testConfig.expectedTable.Columns = newColumns
expectedTable.Columns = newColumns
newColumns = NewColumns()
for el := table.Columns.Front(); el != nil; el = el.Next() {
newColumns.Set(strings.ToLower(el.Key), el.Value)
}
table.Columns = newColumns

newPKFields := types.NewOrderedSet[string]()
for _, k := range testConfig.expectedTable.PKFields {
expectedTable.PKFields.ForEach(func(k string) {
newPKFields.Put(strings.ToLower(k))
}
testConfig.expectedTable.PKFields = newPKFields.ToSlice()
})
expectedTable.PKFields = newPKFields
newPKFields = types.NewOrderedSet[string]()
table.PKFields.ForEach(func(k string) {
newPKFields.Put(strings.ToLower(k))
})
table.PKFields = newPKFields

testConfig.expectedTable.Name = strings.ToLower(testConfig.expectedTable.Name)
table.Name = strings.ToLower(table.Name)

table.PrimaryKeyName = strings.ToLower(table.PrimaryKeyName)
}
expectedPKFields := types.NewOrderedSet[string]()
if len(testConfig.expectedTable.PKFields) > 0 {
expectedPKFields.PutAll(testConfig.expectedTable.PKFields)
}
// don't check table name if not explicitly set
if testConfig.expectedTable.Name == "" {
table.Name = ""
}
table.PrimaryKeyName = ""
expectedTable := &Table{
Name: testConfig.expectedTable.Name,
Namespace: utils.Nvl(testConfig.expectedTable.Namespace, namespace),
PrimaryKeyName: "",
PKFields: expectedPKFields,
Columns: testConfig.expectedTable.Columns,
// don't check this yet
Partition: table.Partition,
// don't check this yet
TimestampColumn: table.TimestampColumn,
expectedTable.Name = strings.ToLower(expectedTable.Name)
}
actualColumns := table.Columns
expectedColumns := expectedTable.Columns
Expand All @@ -649,7 +650,7 @@ func testStream(t *testing.T, testConfig bulkerTestConfig, mode bulker.BulkMode)
if testConfig.expectedRowsCount != nil || testConfig.expectedRows != nil {
time.Sleep(1 * time.Second)
//Check rows count and rows data when provided
rows, err := sqlAdapter.Select(ctx, namespace, tableName, nil, testConfig.orderBy)
rows, err := sqlAdapter.Select(ctx, namespace, expectedTableName, nil, testConfig.orderBy)
PostStep("select_result", testConfig, mode, reqr, err)
if testConfig.expectedRows == nil {
reqr.Equal(testConfig.expectedRowsCount, len(rows))
Expand Down Expand Up @@ -694,7 +695,7 @@ func adaptConfig(t *testing.T, testConfig *bulkerTestConfig, mode bulker.BulkMod
t.Fatalf("test config error: expected table must have a 'name' column of string type to guess what type to expect for %s column", PartitonIdKeyword)
}
}
newExpectedTable := ExpectedTable{Columns: NewColumns(), PKFields: slices.Clone(testConfig.expectedTable.PKFields)}
newExpectedTable := ExpectedTable{Name: testConfig.expectedTable.Name, Columns: NewColumns(), PKFields: slices.Clone(testConfig.expectedTable.PKFields)}
newExpectedTable.Columns.Set(PartitonIdKeyword, textColumn)
newExpectedTable.Columns.SetAll(testConfig.expectedTable.Columns)
testConfig.expectedTable = newExpectedTable
Expand Down
28 changes: 28 additions & 0 deletions bulkerlib/implementations/sql/naming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,34 @@ func TestNaming(t *testing.T) {
},
configIds: []string{SnowflakeBulkerTypeId},
},
{
name: "naming_test1_postgres_samecase",
tableName: "StrangeTableName",
modes: []bulker.BulkMode{bulker.ReplaceTable},
dataFile: "test_data/identifiers.ndjson",
expectedRowsCount: 1,
expectedTableCaseChecking: true,
expectedTable: ExpectedTable{
Name: "strangetablename_replace_table",
Columns: justColumns("id", "name", "_timestamp", "column_c16da609b86c01f16a2c609eac4ccb0c", "column_12b241e808ae6c964a5bb9f1c012e63d", "秒速_センチメートル", "université français", "странное имя", "test name_ drop database public_ select 1 from dual_", "test name", "1test_name", "2", "lorem_ipsum_dolor_sit_amet_consectetur_adipiscing_elit_sed_do_e", "camelcase", "int", "user", "select", "__root__", "hash", "_unnamed"),
},
streamOptions: []bulker.StreamOption{bulker.WithToSameCase()},
configIds: []string{PostgresBulkerTypeId},
},
{
name: "naming_test1_snowflake_samecase",
tableName: "StrangeTableName",
modes: []bulker.BulkMode{bulker.Batch},
dataFile: "test_data/identifiers.ndjson",
expectedTableCaseChecking: true,
expectedRowsCount: 1,
expectedTable: ExpectedTable{
Name: "STRANGETABLENAME_BATCH",
Columns: justColumns("ID", "NAME", "_TIMESTAMP", "COLUMN_C16DA609B86C01F16A2C609EAC4CCB0C", "COLUMN_12B241E808AE6C964A5BB9F1C012E63D", "秒速_センチメートル", "UNIVERSITÉ FRANÇAIS", "СТРАННОЕ ИМЯ", "TEST NAME_ DROP DATABASE PUBLIC_ SELECT 1 FROM DUAL_", "TEST NAME", "1TEST_NAME", "2", "LOREM_IPSUM_DOLOR_SIT_AMET_CONSECTETUR_ADIPISCING_ELIT_SED_DO_EIUSMOD_TEMPOR_INCIDIDUNT_UT_LABORE_ET_DOLORE_MAGNA_ALIQUA_UT_ENIM_AD_MINIM_VENIAM_QUIS_NOSTRUD_EXERCITATION_ULLAMCO_LABORIS_NISI_UT_ALIQUIP_EX_EA_COMMODO_CONSEQUAT", "CAMELCASE", "INT", "USER", "SELECT", "__ROOT__", "HASH", "_UNNAMED"),
},
configIds: []string{SnowflakeBulkerTypeId},
streamOptions: []bulker.StreamOption{bulker.WithToSameCase()},
},
{
name: "naming_test1_case_bigquery",
tableName: "Strange Table Name; DROP DATABASE public;",
Expand Down
4 changes: 2 additions & 2 deletions bulkerlib/implementations/sql/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// ProcessEvents processes events objects without applying mapping rules
// returns table headerm array of processed objects
// or error if at least 1 was occurred
func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, omitNils bool, stringifyObjects bool, notFlatteningKeys types2.Set[string]) (*TypesHeader, types.Object, error) {
func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTypes, nameTransformer func(string) string, omitNils bool, stringifyObjects bool, notFlatteningKeys types2.Set[string]) (*TypesHeader, types.Object, error) {
sqlTypesHints, err := extractSQLTypesHints(event)
if err != nil {
return nil, nil, err
Expand All @@ -28,7 +28,7 @@ func ProcessEvents(tableName string, event types.Object, customTypes types.SQLTy
notFlatteningKeys.Put(key)
}
}
flatObject, err := implementations.NewFlattener(omitNils, stringifyObjects).FlattenObject(event, notFlatteningKeys)
flatObject, err := implementations.NewFlattener(nameTransformer, omitNils, stringifyObjects).FlattenObject(event, notFlatteningKeys)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/replacepartition_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
}
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405"))
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(ps.tableName, 47), time.Now().Format("060102150405"))
return &Table{
Namespace: p.TmpNamespace(ps.namespace),
Name: tmpTableName,
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/transactional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
}
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(tableName, 47), time.Now().Format("060102150405"))
tmpTableName := fmt.Sprintf("%s_tmp%s", utils.ShortenString(ps.tableName, 47), time.Now().Format("060102150405"))
return &Table{
Namespace: p.TmpNamespace(ps.namespace),
Name: tmpTableName,
Expand Down
11 changes: 11 additions & 0 deletions bulkerlib/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ var (
ParseFunc: utils.ParseString,
}

// ToSameCaseOption - when true all fields and tables name will be converted to the same case (lowercase for most db, uppercase for snowflake)
ToSameCaseOption = ImplementationOption[bool]{
Key: "toSameCase",
DefaultValue: false,
ParseFunc: utils.ParseBool,
}

// DiscriminatorFieldOption - array represents path to the object property. when deduplicate is true, and multiple rows has the same primary key, row with the highest value of this field will be selected
DiscriminatorFieldOption = ImplementationOption[[]string]{
Key: "discriminatorField",
Expand Down Expand Up @@ -274,3 +281,7 @@ func WithSchema(schema types.Schema) StreamOption {
func WithNamespace(namespace string) StreamOption {
return WithOption(&NamespaceOption, namespace)
}

func WithToSameCase() StreamOption {
return WithOption(&ToSameCaseOption, true)
}

0 comments on commit 260d183

Please sign in to comment.