From 963962cfd81c93f651807fd74662cfafca71440c Mon Sep 17 00:00:00 2001 From: Ildar Nurislamov Date: Tue, 27 Aug 2024 14:50:11 +0400 Subject: [PATCH] bulker: maxColumnsCount options. (Default: 10000) --- bulkerlib/implementations/sql/abstract.go | 26 ++++--- bulkerlib/implementations/sql/options.go | 11 +++ .../sql/replacepartition_stream.go | 2 +- .../implementations/sql/schema_freeze_test.go | 74 +++++++++++++++++++ bulkerlib/implementations/sql/table.go | 30 +++++--- .../sql/transactional_stream.go | 2 +- 6 files changed, 121 insertions(+), 24 deletions(-) diff --git a/bulkerlib/implementations/sql/abstract.go b/bulkerlib/implementations/sql/abstract.go index cd81b738..bce6335d 100644 --- a/bulkerlib/implementations/sql/abstract.go +++ b/bulkerlib/implementations/sql/abstract.go @@ -30,6 +30,7 @@ type AbstractSQLStream struct { mergeWindow int omitNils bool schemaFreeze bool + maxColumnsCount int schemaOptions types.Schema schemaFromOptions *Table @@ -82,6 +83,7 @@ func newAbstractStream(id string, p SQLAdapter, tableName string, mode bulker.Bu } ps.omitNils = OmitNilsOption.Get(&ps.options) ps.schemaFreeze = SchemaFreezeOption.Get(&ps.options) + ps.maxColumnsCount = MaxColumnsCount.Get(&ps.options) schema := bulker.SchemaOption.Get(&ps.options) if !schema.IsEmpty() { @@ -176,21 +178,21 @@ func (ps *AbstractSQLStream) adjustTableColumnTypes(currentTable, existingTable, newCol := el.Value var existingCol types.SQLColumn ok := false - if existingTable != nil && existingTable.Columns != nil { + if existingTable.Exists() { existingCol, ok = existingTable.Columns.Get(name) } if !ok { - if ps.schemaFreeze { - // when schemaFreeze=true all new columns values go to _unmapped_data - v, ok := values.Get(name) - if ok { - unmappedObj[name] = v - } - current.Delete(name) - values.Delete(name) - } else { - existingCol, ok = current.Get(name) - if !ok { + existingCol, ok = current.Get(name) + if !ok { + if ps.schemaFreeze || current.Len() >= ps.maxColumnsCount { + // when schemaFreeze=true all new columns values go to _unmapped_data + v, ok := values.Get(name) + if ok { + unmappedObj[name] = v + } + values.Delete(name) + continue + } else { // column doesn't exist in database and in current batch - adding as New if !newCol.Override && !newCol.Important { newCol.New = true diff --git a/bulkerlib/implementations/sql/options.go b/bulkerlib/implementations/sql/options.go index d6931696..bdb07cbd 100644 --- a/bulkerlib/implementations/sql/options.go +++ b/bulkerlib/implementations/sql/options.go @@ -54,6 +54,12 @@ var ( ParseFunc: utils.ParseBool, } + MaxColumnsCount = bulker.ImplementationOption[int]{ + Key: "maxColumnsCount", + DefaultValue: 10000, + ParseFunc: utils.ParseInt, + } + localBatchFileOption = bulker.ImplementationOption[string]{Key: "BULKER_OPTION_LOCAL_BATCH_FILE"} s3BatchFileOption = bulker.ImplementationOption[*S3OptionConfig]{Key: "BULKER_OPTION_S3_BATCH_FILE"} @@ -64,6 +70,7 @@ func init() { bulker.RegisterOption(&ColumnTypesOption) bulker.RegisterOption(&OmitNilsOption) bulker.RegisterOption(&SchemaFreezeOption) + bulker.RegisterOption(&MaxColumnsCount) } @@ -87,6 +94,10 @@ func WithSchemaFreeze() bulker.StreamOption { return bulker.WithOption(&SchemaFreezeOption, true) } +func WithMaxColumnsCount(maxColumnsCount int) bulker.StreamOption { + return bulker.WithOption(&MaxColumnsCount, maxColumnsCount) +} + func WithDeduplicateWindow(deduplicateWindow int) bulker.StreamOption { return bulker.WithOption(&DeduplicateWindow, deduplicateWindow) } diff --git a/bulkerlib/implementations/sql/replacepartition_stream.go b/bulkerlib/implementations/sql/replacepartition_stream.go index 2d317ec5..790486a8 100644 --- a/bulkerlib/implementations/sql/replacepartition_stream.go +++ b/bulkerlib/implementations/sql/replacepartition_stream.go @@ -34,7 +34,7 @@ func newReplacePartitionStream(id string, p SQLAdapter, tableName string, stream ps.partitionId = partitionId ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.namespace, ps.tableName) ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) { - dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable, tableForObject).Clone() + dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable.Clone(), tableForObject.WithoutColumns()) ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object) if ps.schemaFromOptions != nil { ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object) diff --git a/bulkerlib/implementations/sql/schema_freeze_test.go b/bulkerlib/implementations/sql/schema_freeze_test.go index d84c77bd..19e6bb09 100644 --- a/bulkerlib/implementations/sql/schema_freeze_test.go +++ b/bulkerlib/implementations/sql/schema_freeze_test.go @@ -73,3 +73,77 @@ func TestSchemaFreeze(t *testing.T) { sequentialGroup.Add(1) } } + +func TestMaxColumns(t *testing.T) { + t.Parallel() + tests := []bulkerTestConfig{ + { + //deletes any table leftovers from previous tests + name: "dummy_test_table_cleanup", + tableName: "max_columns_test", + modes: []bulker.BulkMode{bulker.Batch, bulker.Stream}, + dataFile: "test_data/empty.ndjson", + configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}), + }, + { + name: "added_columns_first_run", + tableName: "max_columns_test", + modes: []bulker.BulkMode{bulker.Batch, bulker.Stream}, + leaveResultingTable: true, + dataFile: "test_data/columns_added.ndjson", + expectedTable: ExpectedTable{ + Columns: justColumns("_timestamp", "id", "name", "column1", "_unmapped_data"), + }, + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "_unmapped_data": nil}, + {"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "_unmapped_data": nil}, + {"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "_unmapped_data": "{\"column2\": \"data\"}"}, + {"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "_unmapped_data": nil}, + {"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "_unmapped_data": nil}, + {"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "_unmapped_data": "{\"column2\": \"data\", \"column3\": \"data\"}"}, + }, + configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}), + streamOptions: []bulker.StreamOption{WithMaxColumnsCount(4)}, + }, + { + name: "added_columns_second_run", + tableName: "max_columns_test", + modes: []bulker.BulkMode{bulker.Batch, bulker.Stream}, + leaveResultingTable: true, + dataFile: "test_data/columns_added2.ndjson", + expectedTable: ExpectedTable{ + Columns: justColumns("_timestamp", "id", "name", "column1", "_unmapped_data"), + }, + expectedRows: []map[string]any{ + {"_timestamp": constantTime, "id": 1, "name": "test", "column1": nil, "_unmapped_data": nil}, + {"_timestamp": constantTime, "id": 2, "name": "test2", "column1": "data", "_unmapped_data": nil}, + {"_timestamp": constantTime, "id": 3, "name": "test3", "column1": "data", "_unmapped_data": "{\"column2\": \"data\"}"}, + {"_timestamp": constantTime, "id": 4, "name": "test2", "column1": "data", "_unmapped_data": nil}, + {"_timestamp": constantTime, "id": 5, "name": "test", "column1": nil, "_unmapped_data": nil}, + {"_timestamp": constantTime, "id": 6, "name": "test4", "column1": "data", "_unmapped_data": "{\"column2\": \"data\", \"column3\": \"data\"}"}, + {"_timestamp": constantTime, "id": 7, "name": "test", "column1": nil, "_unmapped_data": "{\"column4\": \"data\"}"}, + {"_timestamp": constantTime, "id": 8, "name": "test2", "column1": nil, "_unmapped_data": "{\"column5\": \"data\"}"}, + }, + configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}), + streamOptions: []bulker.StreamOption{WithMaxColumnsCount(4)}, + }, + { + name: "dummy_test_table_cleanup", + tableName: "max_columns_test", + modes: []bulker.BulkMode{bulker.Batch, bulker.Stream}, + dataFile: "test_data/empty.ndjson", + configIds: utils.ArrayIntersection(allBulkerConfigs, []string{PostgresBulkerTypeId, MySQLBulkerTypeId}), + streamOptions: []bulker.StreamOption{WithSchemaFreeze()}, + }, + } + sequentialGroup := sync.WaitGroup{} + sequentialGroup.Add(1) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + runTestConfig(t, tt, testStream) + sequentialGroup.Done() + }) + sequentialGroup.Wait() + sequentialGroup.Add(1) + } +} diff --git a/bulkerlib/implementations/sql/table.go b/bulkerlib/implementations/sql/table.go index 92f292f6..44d3a89f 100644 --- a/bulkerlib/implementations/sql/table.go +++ b/bulkerlib/implementations/sql/table.go @@ -127,18 +127,28 @@ func (t *Table) CloneIfNeeded() *Table { return t.Clone() } -// Clone returns clone of current table +func (t *Table) WithoutColumns() *Table { + return t.clone(true) +} + func (t *Table) Clone() *Table { + return t.clone(false) +} + +// Clone returns clone of current table +func (t *Table) clone(omitColumns bool) *Table { clonedColumns := NewColumns() - for el := t.Columns.Front(); el != nil; el = el.Next() { - v := el.Value - clonedColumns.Set(el.Key, types.SQLColumn{ - Type: v.Type, - DdlType: v.DdlType, - DataType: v.DataType, - New: v.New, - Override: v.Override, - }) + if !omitColumns { + for el := t.Columns.Front(); el != nil; el = el.Next() { + v := el.Value + clonedColumns.Set(el.Key, types.SQLColumn{ + Type: v.Type, + DdlType: v.DdlType, + DataType: v.DataType, + New: v.New, + Override: v.Override, + }) + } } clonedPkFields := t.PKFields.Clone() diff --git a/bulkerlib/implementations/sql/transactional_stream.go b/bulkerlib/implementations/sql/transactional_stream.go index 5ca88bc5..3a26aebc 100644 --- a/bulkerlib/implementations/sql/transactional_stream.go +++ b/bulkerlib/implementations/sql/transactional_stream.go @@ -25,7 +25,7 @@ func newTransactionalStream(id string, p SQLAdapter, tableName string, streamOpt } ps.existingTable, _ = ps.sqlAdapter.GetTableSchema(context.Background(), ps.namespace, ps.tableName) ps.tmpTableFunc = func(ctx context.Context, tableForObject *Table, object types.Object) (table *Table) { - dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable, tableForObject).Clone() + dstTable := utils.Ternary(ps.existingTable.Exists(), ps.existingTable.Clone(), tableForObject.WithoutColumns()) ps.adjustTableColumnTypes(dstTable, ps.existingTable, tableForObject, object) if ps.schemaFromOptions != nil { ps.adjustTableColumnTypes(dstTable, ps.existingTable, ps.schemaFromOptions, object)