Skip to content

Commit

Permalink
bulker: maxColumnsCount options. (Default: 10000)
Browse files Browse the repository at this point in the history
  • Loading branch information
absorbb committed Aug 27, 2024
1 parent e180c26 commit 963962c
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 24 deletions.
26 changes: 14 additions & 12 deletions bulkerlib/implementations/sql/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type AbstractSQLStream struct {
mergeWindow int
omitNils bool
schemaFreeze bool
maxColumnsCount int
schemaOptions types.Schema
schemaFromOptions *Table

Expand Down Expand Up @@ -82,6 +83,7 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu
}
ps.omitNils = OmitNilsOption.Get(&ps.options)
ps.schemaFreeze = SchemaFreezeOption.Get(&ps.options)
ps.maxColumnsCount = MaxColumnsCount.Get(&ps.options)

schema := bulker.SchemaOption.Get(&ps.options)
if !schema.IsEmpty() {
Expand Down Expand Up @@ -176,21 +178,21 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable,
newCol := el.Value
var existingCol types.SQLColumn
ok := false
if existingTable != nil && existingTable.Columns != nil {
if existingTable.Exists() {
existingCol, ok = existingTable.Columns.Get(name)
}
if !ok {
if ps.schemaFreeze {
// when schemaFreeze=true all new columns values go to _unmapped_data
v, ok := values.Get(name)
if ok {
unmappedObj[name] = v
}
current.Delete(name)
values.Delete(name)
} else {
existingCol, ok = current.Get(name)
if !ok {
existingCol, ok = current.Get(name)
if !ok {
if ps.schemaFreeze || current.Len() >= ps.maxColumnsCount {
// when schemaFreeze=true all new columns values go to _unmapped_data
v, ok := values.Get(name)
if ok {
unmappedObj[name] = v
}
values.Delete(name)
continue
} else {
// column doesn't exist in database and in current batch - adding as New
if !newCol.Override && !newCol.Important {
newCol.New = true
Expand Down
11 changes: 11 additions & 0 deletions bulkerlib/implementations/sql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ var (
ParseFunc: utils.ParseBool,
}

MaxColumnsCount = bulker.ImplementationOption[int]{
Key: "maxColumnsCount",
DefaultValue: 10000,
ParseFunc: utils.ParseInt,
}

localBatchFileOption = bulker.ImplementationOption[string]{Key: "BULKER_OPTION_LOCAL_BATCH_FILE"}

s3BatchFileOption = bulker.ImplementationOption[*S3OptionConfig]{Key: "BULKER_OPTION_S3_BATCH_FILE"}
Expand All @@ -64,6 +70,7 @@ func init() {
bulker.RegisterOption(&ColumnTypesOption)
bulker.RegisterOption(&OmitNilsOption)
bulker.RegisterOption(&SchemaFreezeOption)
bulker.RegisterOption(&MaxColumnsCount)

}

Expand All @@ -87,6 +94,10 @@ func WithSchemaFreeze() bulker.StreamOption {
return bulker.WithOption(&SchemaFreezeOption, true)
}

func WithMaxColumnsCount(maxColumnsCount int) bulker.StreamOption {
return bulker.WithOption(&MaxColumnsCount, maxColumnsCount)
}

func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption {
return bulker.WithOption(&DeduplicateWindow, deduplicateWindow)
}
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/replacepartition_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream
ps.partitionId = partitionId
ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.namespace, ps.tableName)
ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) {
dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable, tableForObject).Clone()
dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable.Clone(), tableForObject.WithoutColumns())
ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
Expand Down
74 changes: 74 additions & 0 deletions bulkerlib/implementations/sql/schema_freeze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,77 @@ func TestSchemaFreeze(t *testing.T) {
sequentialGroup.Add(1)
}
}

func TestMaxColumns(t *testing.T) {
t.Parallel()
tests := []bulkerTestConfig{
{
//deletes any table leftovers from previous tests
name: "dummy_test_table_cleanup",
tableName: "max_columns_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
dataFile: "test_data/empty.ndjson",
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
},
{
name: "added_columns_first_run",
tableName: "max_columns_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
leaveResultingTable: true,
dataFile: "test_data/columns_added.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name", "column1", "_unmapped_data"),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "_unmapped_data": "{\"column2\": \"data\"}"},
{"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "_unmapped_data": "{\"column2\": \"data\", \"column3\": \"data\"}"},
},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
streamOptions: []bulker.StreamOption{WithMaxColumnsCount(4)},
},
{
name: "added_columns_second_run",
tableName: "max_columns_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
leaveResultingTable: true,
dataFile: "test_data/columns_added2.ndjson",
expectedTable: ExpectedTable{
Columns: justColumns("_timestamp", "id", "name", "column1", "_unmapped_data"),
},
expectedRows: []map[string]any{
{"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "_unmapped_data": "{\"column2\": \"data\"}"},
{"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "_unmapped_data": nil},
{"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "_unmapped_data": "{\"column2\": \"data\", \"column3\": \"data\"}"},
{"_timestamp": constantTime, "id": 7, "name": "test", "column1": nil, "_unmapped_data": "{\"column4\": \"data\"}"},
{"_timestamp": constantTime, "id": 8, "name": "test2", "column1": nil, "_unmapped_data": "{\"column5\": \"data\"}"},
},
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
streamOptions: []bulker.StreamOption{WithMaxColumnsCount(4)},
},
{
name: "dummy_test_table_cleanup",
tableName: "max_columns_test",
modes: []bulker.BulkMode{bulker.Batch, bulker.Stream},
dataFile: "test_data/empty.ndjson",
configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}),
streamOptions: []bulker.StreamOption{WithSchemaFreeze()},
},
}
sequentialGroup := sync.WaitGroup{}
sequentialGroup.Add(1)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
runTestConfig(t, tt, testStream)
sequentialGroup.Done()
})
sequentialGroup.Wait()
sequentialGroup.Add(1)
}
}
30 changes: 20 additions & 10 deletions bulkerlib/implementations/sql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,28 @@ func (t *Table) CloneIfNeeded() *Table {
return t.Clone()
}

// Clone returns clone of current table
func (t *Table) WithoutColumns() *Table {
return t.clone(true)
}

func (t *Table) Clone() *Table {
return t.clone(false)
}

// Clone returns clone of current table
func (t *Table) clone(omitColumns bool) *Table {
clonedColumns := NewColumns()
for el := t.Columns.Front(); el != nil; el = el.Next() {
v := el.Value
clonedColumns.Set(el.Key, types.SQLColumn{
Type: v.Type,
DdlType: v.DdlType,
DataType: v.DataType,
New: v.New,
Override: v.Override,
})
if !omitColumns {
for el := t.Columns.Front(); el != nil; el = el.Next() {
v := el.Value
clonedColumns.Set(el.Key, types.SQLColumn{
Type: v.Type,
DdlType: v.DdlType,
DataType: v.DataType,
New: v.New,
Override: v.Override,
})
}
}

clonedPkFields := t.PKFields.Clone()
Expand Down
2 changes: 1 addition & 1 deletion bulkerlib/implementations/sql/transactional_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt
}
ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.namespace, ps.tableName)
ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) {
dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable, tableForObject).Clone()
dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable.Clone(), tableForObject.WithoutColumns())
ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object)
if ps.schemaFromOptions != nil {
ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)
Expand Down

0 comments on commit 963962c

Please sign in to comment.