From 685da10ea6a2adb27524a60c1c9f25a0092118e2 Mon Sep 17 00:00:00 2001 From: Alisha Date: Wed, 11 Dec 2024 17:07:27 -0800 Subject: [PATCH 1/9] test date and time postgres --- .../neosync/sync/sync_integration_test.go | 84 ++++++++++++++++--- .../postgres/alltypes/create-tables.sql | 60 ++++++------- .../pkg/benthos/sql/processor_neosync_pgx.go | 33 ++++++++ 3 files changed, 137 insertions(+), 40 deletions(-) diff --git a/cli/internal/cmds/neosync/sync/sync_integration_test.go b/cli/internal/cmds/neosync/sync/sync_integration_test.go index d24aa3a16b..7aa6fe1b13 100644 --- a/cli/internal/cmds/neosync/sync/sync_integration_test.go +++ b/cli/internal/cmds/neosync/sync/sync_integration_test.go @@ -110,21 +110,53 @@ func Test_Sync(t *testing.T) { err := sync.configureAndRunSync() require.NoError(t, err) - rows := postgres.Target.DB.QueryRow(ctx, "select count(*) from humanresources.employees;") + row := postgres.Target.DB.QueryRow(ctx, "select count(*) from humanresources.employees;") var rowCount int - err = rows.Scan(&rowCount) + err = row.Scan(&rowCount) require.NoError(t, err) require.Greater(t, rowCount, 1) - rows = postgres.Target.DB.QueryRow(ctx, "select count(*) from humanresources.generated_table;") - err = rows.Scan(&rowCount) + row = postgres.Target.DB.QueryRow(ctx, "select count(*) from humanresources.generated_table;") + err = row.Scan(&rowCount) require.NoError(t, err) require.Greater(t, rowCount, 1) - rows = postgres.Target.DB.QueryRow(ctx, "select count(*) from alltypes.all_data_types;") - err = rows.Scan(&rowCount) + row = postgres.Target.DB.QueryRow(ctx, "select count(*) from alltypes.all_data_types;") + err = row.Scan(&rowCount) require.NoError(t, err) require.Greater(t, rowCount, 1) + + row = postgres.Target.DB.QueryRow(ctx, "select count(*) from alltypes.time_time;") + err = row.Scan(&rowCount) + require.NoError(t, err) + require.Greater(t, rowCount, 0) + + rows, err := postgres.Target.DB.Query(ctx, "select timestamp_col::text, date_col::text from alltypes.time_time;") + require.NoError(t, err) + defer rows.Close() + + expectedTimestamps := [][]byte{ + []byte("2024-03-18 10:30:00"), + // []byte("0001-01-01 00:00:00 BC"), + } + expectedDates := [][]byte{ + []byte("2024-03-18"), + // []byte("0001-01-01 BC"), + } + var actualTimestamps [][]byte + var actualDates [][]byte + + for rows.Next() { + var timestampCol, dateCol []byte + err = rows.Scan(×tampCol, &dateCol) + require.NoError(t, err) + actualTimestamps = append(actualTimestamps, timestampCol) + actualDates = append(actualDates, dateCol) + } + + require.NoError(t, rows.Err()) + require.ElementsMatch(t, expectedTimestamps, actualTimestamps, "Expected timestamp_col values to match") + require.ElementsMatch(t, expectedDates, actualDates, "Expected date_col values to match") }) t.Run("S3_end_to_end", func(t *testing.T) { @@ -217,15 +249,47 @@ func Test_Sync(t *testing.T) { }) var rowCount int - rows := postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.all_data_types;", alltypesSchema)) - err = rows.Scan(&rowCount) + row := postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.all_data_types;", alltypesSchema)) + err = row.Scan(&rowCount) require.NoError(t, err) require.Greater(t, rowCount, 1) - rows = postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.json_data;", alltypesSchema)) - err = rows.Scan(&rowCount) + row = postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.json_data;", alltypesSchema)) + err = row.Scan(&rowCount) require.NoError(t, err) require.Greater(t, rowCount, 1) + + row = postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.time_time;", alltypesSchema)) + err = row.Scan(&rowCount) + require.NoError(t, err) + require.Greater(t, rowCount, 0) + + rows, err := postgres.Target.DB.Query(ctx, fmt.Sprintf("select timestamp_col::text, date_col::text from %s.time_time;", alltypesSchema)) + require.NoError(t, err) + defer rows.Close() + + expectedTimestamps := [][]byte{ + []byte("2024-03-18 10:30:00"), + // []byte("0001-01-01 00:00:00 BC"), + } + expectedDates := [][]byte{ + []byte("2024-03-18"), + // []byte("0001-01-01 BC"), + } + var actualTimestamps [][]byte + var actualDates [][]byte + + for rows.Next() { + var timestampCol, dateCol []byte + err = rows.Scan(×tampCol, &dateCol) + require.NoError(t, err) + actualTimestamps = append(actualTimestamps, timestampCol) + actualDates = append(actualDates, dateCol) + } + + require.NoError(t, rows.Err()) + require.ElementsMatch(t, expectedTimestamps, actualTimestamps, "Expected timestamp_col values to match") + require.ElementsMatch(t, expectedDates, actualDates, "Expected date_col values to match") }) t.Cleanup(func() { diff --git a/internal/testutil/testdata/postgres/alltypes/create-tables.sql b/internal/testutil/testdata/postgres/alltypes/create-tables.sql index 1821d4df7e..dd410cb4e1 100644 --- a/internal/testutil/testdata/postgres/alltypes/create-tables.sql +++ b/internal/testutil/testdata/postgres/alltypes/create-tables.sql @@ -26,8 +26,8 @@ CREATE TABLE IF NOT EXISTS all_data_types ( timestamp_col TIMESTAMP, timestamptz_col TIMESTAMPTZ, date_col DATE, - -- time_col TIME, - -- timetz_col TIMETZ, + time_col TIME, + timetz_col TIMETZ, interval_col INTERVAL, -- Boolean Type @@ -100,8 +100,8 @@ INSERT INTO all_data_types ( timestamp_col, timestamptz_col, date_col, - -- time_col, - -- timetz_col, + time_col, + timetz_col, interval_col, boolean_col, uuid_col, @@ -149,8 +149,8 @@ INSERT INTO all_data_types ( '2024-01-01 12:34:56', -- timestamp_col '2024-01-01 12:34:56+00', -- timestamptz_col '2024-01-01', -- date_col - -- '12:34:56', -- time_col - -- '12:34:56+00', -- timetz_col + '12:34:56', -- time_col + '12:34:56+00', -- timetz_col '1 day', -- interval_col TRUE, -- boolean_col '123e4567-e89b-12d3-a456-426614174000', -- uuid_col @@ -188,32 +188,32 @@ INSERT INTO all_data_types ( ); --- CREATE TABLE IF NOT EXISTS time_time ( --- id SERIAL PRIMARY KEY, --- timestamp_col TIMESTAMP, --- timestamptz_col TIMESTAMPTZ, --- date_col DATE --- ); +CREATE TABLE IF NOT EXISTS time_time ( + id SERIAL PRIMARY KEY, + timestamp_col TIMESTAMP, + timestamptz_col TIMESTAMPTZ, + date_col DATE +); --- INSERT INTO time_time ( --- timestamp_col, --- timestamptz_col, --- date_col --- ) --- VALUES ( --- '2024-03-18 10:30:00', --- '2024-03-18 10:30:00+00', --- '2024-03-18' --- ); +INSERT INTO time_time ( + timestamp_col, + timestamptz_col, + date_col +) +VALUES ( + '2024-03-18 10:30:00', + '2024-03-18 10:30:00+00', + '2024-03-18' +); -- INSERT INTO time_time ( -- timestamp_col, --- timestamptz_col, +-- -- timestamptz_col, -- date_col -- ) -- VALUES ( -- '0001-01-01 00:00:00 BC', --- '0001-01-01 00:00:00+00 BC', +-- -- '0001-01-01 00:00:00+00 BC', -- '0001-01-01 BC' -- ); @@ -233,7 +233,7 @@ CREATE TABLE IF NOT EXISTS array_types ( -- "time_array" _time, -- "timestamp_array" _timestamp, -- "timestamptz_array" _timestamptz, - "interval_array" _interval + "interval_array" _interval, -- "inet_array" _inet, // broken -- "cidr_array" _cidr, -- "point_array" _point, @@ -243,7 +243,7 @@ CREATE TABLE IF NOT EXISTS array_types ( -- "path_array" _path, -- "polygon_array" _polygon, -- "circle_array" _circle, - -- "uuid_array" _uuid, + "uuid_array" _uuid -- "json_array" _json, -- "jsonb_array" _jsonb, -- "bit_array" _bit, @@ -263,12 +263,12 @@ INSERT INTO array_types ( -- text_array, varchar_array, char_array, boolean_array, -- date_array, -- time_array, timestamp_array, timestamptz_array, - interval_array + interval_array, -- inet_array, cidr_array, -- point_array, line_array, lseg_array, -- box_array, -- path_array, polygon_array, circle_array, - -- uuid_array, + uuid_array -- json_array, jsonb_array, -- bit_array, varbit_array, numeric_array, -- money_array, @@ -289,7 +289,7 @@ INSERT INTO array_types ( -- ARRAY['12:00:00'::time, '13:00:00'::time], -- ARRAY['2023-01-01 12:00:00'::timestamp, '2023-01-02 13:00:00'::timestamp], -- ARRAY['2023-01-01 12:00:00+00'::timestamptz, '2023-01-02 13:00:00+00'::timestamptz], - ARRAY['1 day'::interval, '2 hours'::interval] + ARRAY['1 day'::interval, '2 hours'::interval], -- ARRAY['192.168.0.1'::inet, '10.0.0.1'::inet], -- ARRAY['192.168.0.0/24'::cidr, '10.0.0.0/8'::cidr], -- ARRAY['(1,1)'::point, '(2,2)'::point], @@ -299,7 +299,7 @@ INSERT INTO array_types ( -- ARRAY['((1,1),(2,2),(3,3))'::path, '((4,4),(5,5),(6,6))'::path], -- ARRAY['((1,1),(2,2),(3,3))'::polygon, '((4,4),(5,5),(6,6))'::polygon], -- ARRAY['<(1,1),1>'::circle, '<(2,2),2>'::circle], - -- ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid], + ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid] -- ARRAY['{"key": "value1"}'::json, '{"key": "value2"}'::json], -- ARRAY['{"key": "value1"}'::jsonb, '{"key": "value2"}'::jsonb], -- ARRAY['101'::bit(3), '110'::bit(3)], diff --git a/worker/pkg/benthos/sql/processor_neosync_pgx.go b/worker/pkg/benthos/sql/processor_neosync_pgx.go index 0a3e572da4..5c8a99eef1 100644 --- a/worker/pkg/benthos/sql/processor_neosync_pgx.go +++ b/worker/pkg/benthos/sql/processor_neosync_pgx.go @@ -8,6 +8,7 @@ import ( "slices" "strconv" "strings" + "time" "github.com/doug-martin/goqu/v9" "github.com/lib/pq" @@ -119,6 +120,11 @@ func transformNeosyncToPgx( } colDefaults := columnDefaultProperties[col] datatype := columnDataTypes[col] + // if col == "date_col" { + // fmt.Println() + // fmt.Println("datatype", datatype, "val", val, string(val.([]byte))) + // fmt.Println() + // } newVal, err := getPgxValue(val, colDefaults, datatype) if err != nil { logger.Warn(err.Error()) @@ -206,6 +212,22 @@ func handlePgxByteSlice(v []byte, datatype string) (any, error) { return nil, fmt.Errorf("unable to get valid json: %w", err) } return validJson, nil + case "time": + fmt.Println() + fmt.Println("time", string(v)) + fmt.Println() + return v, nil + case "date": + dateStr := string(v) + t, err := time.Parse(time.RFC3339, dateStr) + if err != nil { + // Try parsing as just date if not RFC3339 format + t, err = time.Parse("2006-01-02", dateStr) + if err != nil { + return dateStr, nil + } + } + return convertTimeForPostgres(t), nil case "money", "uuid", "time with time zone", "timestamp with time zone": // Convert UUID []byte to string before inserting since postgres driver stores uuid bytes in different order return string(v), nil @@ -213,6 +235,17 @@ func handlePgxByteSlice(v []byte, datatype string) (any, error) { return v, nil } +func convertTimeForPostgres(t time.Time) string { + // Handle BC dates (negative years in Go) + if t.Year() <= 0 { + // In Go, year 0 = 1 BC, year -1 = 2 BC, etc. + // add 1 to get the correct BC year + bcYear := -t.Year() + 1 + return fmt.Sprintf("%04d-%02d-%02d BC", bcYear, t.Month(), t.Day()) + } + return t.Format("2006-01-02") +} + // this expects the bits to be in the form [1,2,3] func processPgArrayFromJson(bits []byte, datatype string) (any, error) { var pgarray []any From 7cf8b275bbc6360319fa3378812b99de0624b1e1 Mon Sep 17 00:00:00 2001 From: Alisha Date: Thu, 12 Dec 2024 11:32:35 -0800 Subject: [PATCH 2/9] fix postgres types from db and s3 --- .../neosync/sync/sync_integration_test.go | 8 +-- .../postgres/alltypes/create-tables.sql | 20 +++--- .../benthos/json/processor_neosync_json.go | 2 +- .../pkg/benthos/sql/processor_neosync_pgx.go | 61 +++++++++++-------- 4 files changed, 52 insertions(+), 39 deletions(-) diff --git a/cli/internal/cmds/neosync/sync/sync_integration_test.go b/cli/internal/cmds/neosync/sync/sync_integration_test.go index 7aa6fe1b13..8cd666ebc0 100644 --- a/cli/internal/cmds/neosync/sync/sync_integration_test.go +++ b/cli/internal/cmds/neosync/sync/sync_integration_test.go @@ -137,11 +137,11 @@ func Test_Sync(t *testing.T) { expectedTimestamps := [][]byte{ []byte("2024-03-18 10:30:00"), - // []byte("0001-01-01 00:00:00 BC"), + []byte("0001-01-01 00:00:00 BC"), } expectedDates := [][]byte{ []byte("2024-03-18"), - // []byte("0001-01-01 BC"), + []byte("0001-01-01 BC"), } var actualTimestamps [][]byte var actualDates [][]byte @@ -270,11 +270,11 @@ func Test_Sync(t *testing.T) { expectedTimestamps := [][]byte{ []byte("2024-03-18 10:30:00"), - // []byte("0001-01-01 00:00:00 BC"), + []byte("0001-01-01 00:00:00 BC"), } expectedDates := [][]byte{ []byte("2024-03-18"), - // []byte("0001-01-01 BC"), + []byte("0001-01-01 BC"), } var actualTimestamps [][]byte var actualDates [][]byte diff --git a/internal/testutil/testdata/postgres/alltypes/create-tables.sql b/internal/testutil/testdata/postgres/alltypes/create-tables.sql index dd410cb4e1..b671b5c5e7 100644 --- a/internal/testutil/testdata/postgres/alltypes/create-tables.sql +++ b/internal/testutil/testdata/postgres/alltypes/create-tables.sql @@ -206,16 +206,16 @@ VALUES ( '2024-03-18' ); --- INSERT INTO time_time ( --- timestamp_col, --- -- timestamptz_col, --- date_col --- ) --- VALUES ( --- '0001-01-01 00:00:00 BC', --- -- '0001-01-01 00:00:00+00 BC', --- '0001-01-01 BC' --- ); +INSERT INTO time_time ( + timestamp_col, + -- timestamptz_col, + date_col +) +VALUES ( + '0001-01-01 00:00:00 BC', + -- '0001-01-01 00:00:00+00 BC', + '0001-01-01 BC' +); CREATE TABLE IF NOT EXISTS array_types ( diff --git a/worker/pkg/benthos/json/processor_neosync_json.go b/worker/pkg/benthos/json/processor_neosync_json.go index b1bc0a4682..98547a3364 100644 --- a/worker/pkg/benthos/json/processor_neosync_json.go +++ b/worker/pkg/benthos/json/processor_neosync_json.go @@ -71,7 +71,7 @@ func transform(root any) any { } return newSlice case time.Time: - return v.Format(time.DateTime) + return v.Format(time.RFC3339) case []uint8: return string(v) // TODO this should be neosync bit type diff --git a/worker/pkg/benthos/sql/processor_neosync_pgx.go b/worker/pkg/benthos/sql/processor_neosync_pgx.go index 5c8a99eef1..98a429302b 100644 --- a/worker/pkg/benthos/sql/processor_neosync_pgx.go +++ b/worker/pkg/benthos/sql/processor_neosync_pgx.go @@ -120,11 +120,6 @@ func transformNeosyncToPgx( } colDefaults := columnDefaultProperties[col] datatype := columnDataTypes[col] - // if col == "date_col" { - // fmt.Println() - // fmt.Println("datatype", datatype, "val", val, string(val.([]byte))) - // fmt.Println() - // } newVal, err := getPgxValue(val, colDefaults, datatype) if err != nil { logger.Warn(err.Error()) @@ -212,38 +207,56 @@ func handlePgxByteSlice(v []byte, datatype string) (any, error) { return nil, fmt.Errorf("unable to get valid json: %w", err) } return validJson, nil - case "time": - fmt.Println() - fmt.Println("time", string(v)) - fmt.Println() - return v, nil case "date": - dateStr := string(v) - t, err := time.Parse(time.RFC3339, dateStr) + t, err := convertBitsToTime(v) if err != nil { - // Try parsing as just date if not RFC3339 format - t, err = time.Parse("2006-01-02", dateStr) - if err != nil { - return dateStr, nil - } + return string(v), nil } - return convertTimeForPostgres(t), nil - case "money", "uuid", "time with time zone", "timestamp with time zone": + return convertDateForPostgres(t), nil + case "timestamp", "timestamp without time zone", "timestamp with time zone": + t, err := convertBitsToTime(v) + if err != nil { + return string(v), nil + } + return convertTimestampForPostgres(t), nil + case "money", "uuid", "time with time zone": // Convert UUID []byte to string before inserting since postgres driver stores uuid bytes in different order + // For time with time zone, we keep as string since the timezone info is already properly formatted return string(v), nil } return v, nil } -func convertTimeForPostgres(t time.Time) string { +func convertBitsToTime(bits []byte) (time.Time, error) { + timeStr := string(bits) + t, err := time.Parse(time.RFC3339, timeStr) + if err != nil { + // Try parsing as DateTime format if not RFC3339 format + t, err = time.Parse(time.DateTime, timeStr) + if err != nil { + return time.Time{}, err + } + } + return t, nil +} + +func convertDateForPostgres(t time.Time) string { + return convertTimeForPostgres(t, time.DateOnly) +} + +func convertTimestampForPostgres(t time.Time) string { + return convertTimeForPostgres(t, time.DateTime) +} + +func convertTimeForPostgres(t time.Time, layout string) string { // Handle BC dates (negative years in Go) + // year 0 is 1 BC, year -1 is 2 BC, etc. if t.Year() <= 0 { - // In Go, year 0 = 1 BC, year -1 = 2 BC, etc. // add 1 to get the correct BC year - bcYear := -t.Year() + 1 - return fmt.Sprintf("%04d-%02d-%02d BC", bcYear, t.Month(), t.Day()) + t = t.AddDate(1, 0, 0) + return fmt.Sprintf("%s BC", t.Format(layout)) } - return t.Format("2006-01-02") + return t.Format(layout) } // this expects the bits to be in the form [1,2,3] From 6f1bdce4c604828ab365ab05b13ea7c2818f3cd8 Mon Sep 17 00:00:00 2001 From: Alisha Date: Thu, 12 Dec 2024 11:49:53 -0800 Subject: [PATCH 3/9] fix mysql date and time --- .../pkg/benthos/sql/processor_neosync_mysql.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/worker/pkg/benthos/sql/processor_neosync_mysql.go b/worker/pkg/benthos/sql/processor_neosync_mysql.go index 481638ffc8..fae2500670 100644 --- a/worker/pkg/benthos/sql/processor_neosync_mysql.go +++ b/worker/pkg/benthos/sql/processor_neosync_mysql.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/doug-martin/goqu/v9" mysqlutil "github.com/nucleuscloud/neosync/internal/mysql" @@ -150,18 +151,31 @@ func getMysqlValue(value any, colDefaults *neosync_benthos.ColumnDefaultProperti } func handleMysqlByteSlice(v []byte, datatype string) (any, error) { - if datatype == "bit" { + switch datatype { + case "bit": bit, err := convertStringToBit(string(v)) if err != nil { return nil, fmt.Errorf("unable to convert bit string to SQL bit []byte: %w", err) } return bit, nil - } else if mysqlutil.IsJsonDataType(datatype) { + case "json": validJson, err := getValidJson(v) if err != nil { return nil, fmt.Errorf("unable to get valid json: %w", err) } return validJson, nil + case "date": + t, err := convertBitsToTime(v) + if err != nil { + return string(v), nil + } + return t.Format(time.DateOnly), nil + case "timestamp", "datetime": + t, err := convertBitsToTime(v) + if err != nil { + return string(v), nil + } + return t.Format(time.DateTime), nil } return v, nil } From c6c5becc584a7e44b694b8266b9c6e444d0e3f0a Mon Sep 17 00:00:00 2001 From: Alisha Date: Thu, 12 Dec 2024 11:59:06 -0800 Subject: [PATCH 4/9] add timestamptz to test --- .../testutil/testdata/postgres/alltypes/create-tables.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/testutil/testdata/postgres/alltypes/create-tables.sql b/internal/testutil/testdata/postgres/alltypes/create-tables.sql index b671b5c5e7..ebdabc6d1c 100644 --- a/internal/testutil/testdata/postgres/alltypes/create-tables.sql +++ b/internal/testutil/testdata/postgres/alltypes/create-tables.sql @@ -208,12 +208,12 @@ VALUES ( INSERT INTO time_time ( timestamp_col, - -- timestamptz_col, + timestamptz_col, date_col ) VALUES ( '0001-01-01 00:00:00 BC', - -- '0001-01-01 00:00:00+00 BC', + '0001-01-01 00:00:00+00 BC', '0001-01-01 BC' ); From b36de5a5d0bdcaef01b2c342dbdcff58d3ae146a Mon Sep 17 00:00:00 2001 From: Alisha Date: Fri, 13 Dec 2024 12:36:01 -0800 Subject: [PATCH 5/9] fix bc dates --- .../neosync/sync/sync_integration_test.go | 144 +++++++++--------- .../testutil/testcontainers/mysql/mysql.go | 10 ++ .../testcontainers/postgres/postgres.go | 10 ++ .../postgres/alltypes/create-tables.sql | 12 ++ .../pkg/benthos/sql/processor_neosync_pgx.go | 87 ++++++++--- .../benthos/sql/processor_neosync_pgx_test.go | 142 +++++++++++++++++ 6 files changed, 304 insertions(+), 101 deletions(-) diff --git a/cli/internal/cmds/neosync/sync/sync_integration_test.go b/cli/internal/cmds/neosync/sync/sync_integration_test.go index 8cd666ebc0..cfdd822dff 100644 --- a/cli/internal/cmds/neosync/sync/sync_integration_test.go +++ b/cli/internal/cmds/neosync/sync/sync_integration_test.go @@ -6,6 +6,7 @@ import ( "testing" "connectrpc.com/connect" + "github.com/jackc/pgx/v5/pgxpool" mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1" tcneosyncapi "github.com/nucleuscloud/neosync/backend/pkg/integration-test" "github.com/nucleuscloud/neosync/cli/internal/output" @@ -110,53 +111,32 @@ func Test_Sync(t *testing.T) { err := sync.configureAndRunSync() require.NoError(t, err) - row := postgres.Target.DB.QueryRow(ctx, "select count(*) from humanresources.employees;") - var rowCount int - err = row.Scan(&rowCount) + rowCount, err := postgres.Target.GetTableRowCount(ctx, "humanresources", "employees") require.NoError(t, err) require.Greater(t, rowCount, 1) - row = postgres.Target.DB.QueryRow(ctx, "select count(*) from humanresources.generated_table;") - err = row.Scan(&rowCount) + rowCount, err = postgres.Target.GetTableRowCount(ctx, "humanresources", "generated_table") require.NoError(t, err) require.Greater(t, rowCount, 1) - row = postgres.Target.DB.QueryRow(ctx, "select count(*) from alltypes.all_data_types;") - err = row.Scan(&rowCount) + rowCount, err = postgres.Target.GetTableRowCount(ctx, "alltypes", "all_data_types") require.NoError(t, err) require.Greater(t, rowCount, 1) - row = postgres.Target.DB.QueryRow(ctx, "select count(*) from alltypes.time_time;") - err = row.Scan(&rowCount) + rowCount, err = postgres.Target.GetTableRowCount(ctx, "alltypes", "time_time") require.NoError(t, err) require.Greater(t, rowCount, 0) - rows, err := postgres.Target.DB.Query(ctx, "select timestamp_col::text, date_col::text from alltypes.time_time;") + var tsvectorVal, jsonVal, jsonbVal string + row := postgres.Target.DB.QueryRow(ctx, "select tsvector_col::text, json_col::text, jsonb_col::text from alltypes.all_data_types where tsvector_col is not null and json_col is not null;") + err = row.Scan(&tsvectorVal, &jsonVal, &jsonbVal) require.NoError(t, err) - defer rows.Close() + require.Equal(t, "'example' 'tsvector'", tsvectorVal) + require.Equal(t, `{"name": "John", "age": 30}`, jsonVal) + require.Equal(t, `{"age": 30, "name": "John"}`, jsonbVal) // Note: JSONB reorders keys - expectedTimestamps := [][]byte{ - []byte("2024-03-18 10:30:00"), - []byte("0001-01-01 00:00:00 BC"), - } - expectedDates := [][]byte{ - []byte("2024-03-18"), - []byte("0001-01-01 BC"), - } - var actualTimestamps [][]byte - var actualDates [][]byte - - for rows.Next() { - var timestampCol, dateCol []byte - err = rows.Scan(×tampCol, &dateCol) - require.NoError(t, err) - actualTimestamps = append(actualTimestamps, timestampCol) - actualDates = append(actualDates, dateCol) - } - - require.NoError(t, rows.Err()) - require.ElementsMatch(t, expectedTimestamps, actualTimestamps, "Expected timestamp_col values to match") - require.ElementsMatch(t, expectedDates, actualDates, "Expected date_col values to match") + err = verifyPostgresTimeTimeTableValues(t, ctx, postgres.Target.DB) + require.NoError(t, err) }) t.Run("S3_end_to_end", func(t *testing.T) { @@ -248,48 +228,28 @@ func Test_Sync(t *testing.T) { require.NoError(t, err) }) - var rowCount int - row := postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.all_data_types;", alltypesSchema)) - err = row.Scan(&rowCount) + rowCount, err := postgres.Target.GetTableRowCount(ctx, alltypesSchema, "all_data_types") require.NoError(t, err) require.Greater(t, rowCount, 1) - row = postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.json_data;", alltypesSchema)) - err = row.Scan(&rowCount) + rowCount, err = postgres.Target.GetTableRowCount(ctx, alltypesSchema, "json_data") require.NoError(t, err) require.Greater(t, rowCount, 1) - row = postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.time_time;", alltypesSchema)) - err = row.Scan(&rowCount) + rowCount, err = postgres.Target.GetTableRowCount(ctx, alltypesSchema, "time_time") require.NoError(t, err) require.Greater(t, rowCount, 0) - rows, err := postgres.Target.DB.Query(ctx, fmt.Sprintf("select timestamp_col::text, date_col::text from %s.time_time;", alltypesSchema)) + var tsvectorVal, jsonVal, jsonbVal string + row := postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select tsvector_col::text, json_col::text, jsonb_col::text from %s.all_data_types where tsvector_col is not null and json_col is not null;", alltypesSchema)) + err = row.Scan(&tsvectorVal, &jsonVal, &jsonbVal) require.NoError(t, err) - defer rows.Close() + require.Equal(t, "'example' 'tsvector'", tsvectorVal) + require.Equal(t, `{"age":30,"name":"John"}`, jsonVal) + require.Equal(t, `{"age": 30, "name": "John"}`, jsonbVal) // Note: JSONB reorders keys - expectedTimestamps := [][]byte{ - []byte("2024-03-18 10:30:00"), - []byte("0001-01-01 00:00:00 BC"), - } - expectedDates := [][]byte{ - []byte("2024-03-18"), - []byte("0001-01-01 BC"), - } - var actualTimestamps [][]byte - var actualDates [][]byte - - for rows.Next() { - var timestampCol, dateCol []byte - err = rows.Scan(×tampCol, &dateCol) - require.NoError(t, err) - actualTimestamps = append(actualTimestamps, timestampCol) - actualDates = append(actualDates, dateCol) - } - - require.NoError(t, rows.Err()) - require.ElementsMatch(t, expectedTimestamps, actualTimestamps, "Expected timestamp_col values to match") - require.ElementsMatch(t, expectedDates, actualDates, "Expected date_col values to match") + err = verifyPostgresTimeTimeTableValues(t, ctx, postgres.Target.DB) + require.NoError(t, err) }) t.Cleanup(func() { @@ -354,19 +314,15 @@ func Test_Sync(t *testing.T) { err := sync.configureAndRunSync() require.NoError(t, err) - rows := mysql.Target.DB.QueryRowContext(ctx, "select count(*) from humanresources.locations;") - var rowCount int - err = rows.Scan(&rowCount) + rowCount, err := mysql.Target.GetTableRowCount(ctx, "humanresources", "locations") require.NoError(t, err) require.Greater(t, rowCount, 1) - rows = mysql.Target.DB.QueryRowContext(ctx, "select count(*) from humanresources.generated_table;") - err = rows.Scan(&rowCount) + rowCount, err = mysql.Target.GetTableRowCount(ctx, "humanresources", "generated_table") require.NoError(t, err) require.Greater(t, rowCount, 1) - rows = mysql.Target.DB.QueryRowContext(ctx, "select count(*) from alltypes.all_data_types;") - err = rows.Scan(&rowCount) + rowCount, err = mysql.Target.GetTableRowCount(ctx, "alltypes", "all_data_types") require.NoError(t, err) require.Greater(t, rowCount, 1) }) @@ -459,14 +415,11 @@ func Test_Sync(t *testing.T) { require.NoError(t, err) }) - var rowCount int - rows := mysql.Target.DB.QueryRowContext(ctx, fmt.Sprintf("select count(*) from %s.all_data_types;", alltypesSchema)) - err = rows.Scan(&rowCount) + rowCount, err := mysql.Target.GetTableRowCount(ctx, alltypesSchema, "all_data_types") require.NoError(t, err) require.Greater(t, rowCount, 1) - rows = mysql.Target.DB.QueryRowContext(ctx, fmt.Sprintf("select count(*) from %s.json_data;", alltypesSchema)) - err = rows.Scan(&rowCount) + rowCount, err = mysql.Target.GetTableRowCount(ctx, alltypesSchema, "json_data") require.NoError(t, err) require.Greater(t, rowCount, 1) }) @@ -486,3 +439,42 @@ func Test_Sync(t *testing.T) { } }) } + +func verifyPostgresTimeTimeTableValues(t *testing.T, ctx context.Context, db *pgxpool.Pool) error { + rows, err := db.Query(ctx, "select timestamp_col::text, date_col::text from alltypes.time_time;") + if err != nil { + return err + } + defer rows.Close() + + expectedTimestamps := [][]byte{ + []byte("2024-03-18 10:30:00"), + []byte("0001-01-01 00:00:00 BC"), + []byte("0002-01-01 00:00:00 BC"), + } + expectedDates := [][]byte{ + []byte("2024-03-18"), + []byte("0001-01-01 BC"), + []byte("0002-01-01 BC"), + } + var actualTimestamps [][]byte + var actualDates [][]byte + + for rows.Next() { + var timestampCol, dateCol []byte + err = rows.Scan(×tampCol, &dateCol) + if err != nil { + return err + } + actualTimestamps = append(actualTimestamps, timestampCol) + actualDates = append(actualDates, dateCol) + } + + if err = rows.Err(); err != nil { + return err + } + + require.ElementsMatch(t, expectedTimestamps, actualTimestamps, "Expected timestamp_col values to match") + require.ElementsMatch(t, expectedDates, actualDates, "Expected date_col values to match") + return nil +} diff --git a/internal/testutil/testcontainers/mysql/mysql.go b/internal/testutil/testcontainers/mysql/mysql.go index 90433d21c4..70dd0f9e39 100644 --- a/internal/testutil/testcontainers/mysql/mysql.go +++ b/internal/testutil/testcontainers/mysql/mysql.go @@ -301,3 +301,13 @@ func (m *MysqlTestContainer) CreateDatabases(ctx context.Context, schemas []stri } return nil } + +func (m *MysqlTestContainer) GetTableRowCount(ctx context.Context, schema, table string) (int, error) { + rows := m.DB.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`;", schema, table)) + var count int + err := rows.Scan(&count) + if err != nil { + return 0, err + } + return count, nil +} diff --git a/internal/testutil/testcontainers/postgres/postgres.go b/internal/testutil/testcontainers/postgres/postgres.go index 15e64723bc..82d3a42b56 100644 --- a/internal/testutil/testcontainers/postgres/postgres.go +++ b/internal/testutil/testcontainers/postgres/postgres.go @@ -305,3 +305,13 @@ func (p *PostgresTestContainer) CreateSchemas(ctx context.Context, schemas []str } return nil } + +func (p *PostgresTestContainer) GetTableRowCount(ctx context.Context, schema, table string) (int, error) { + rows := p.DB.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %q.%q;", schema, table)) + var count int + err := rows.Scan(&count) + if err != nil { + return 0, err + } + return count, nil +} diff --git a/internal/testutil/testdata/postgres/alltypes/create-tables.sql b/internal/testutil/testdata/postgres/alltypes/create-tables.sql index ebdabc6d1c..803a21a864 100644 --- a/internal/testutil/testdata/postgres/alltypes/create-tables.sql +++ b/internal/testutil/testdata/postgres/alltypes/create-tables.sql @@ -217,6 +217,18 @@ VALUES ( '0001-01-01 BC' ); +INSERT INTO time_time ( + timestamp_col, + timestamptz_col, + date_col +) +VALUES ( + '0002-01-01 00:00:00 BC', + '0002-01-01 00:00:00+00 BC', + '0002-01-01 BC' +); + + CREATE TABLE IF NOT EXISTS array_types ( "id" BIGINT NOT NULL PRIMARY KEY, diff --git a/worker/pkg/benthos/sql/processor_neosync_pgx.go b/worker/pkg/benthos/sql/processor_neosync_pgx.go index 98a429302b..c7647f2e83 100644 --- a/worker/pkg/benthos/sql/processor_neosync_pgx.go +++ b/worker/pkg/benthos/sql/processor_neosync_pgx.go @@ -120,10 +120,12 @@ func transformNeosyncToPgx( } colDefaults := columnDefaultProperties[col] datatype := columnDataTypes[col] + newVal, err := getPgxValue(val, colDefaults, datatype) if err != nil { logger.Warn(err.Error()) } + newMap[col] = newVal } @@ -208,20 +210,13 @@ func handlePgxByteSlice(v []byte, datatype string) (any, error) { } return validJson, nil case "date": - t, err := convertBitsToTime(v) - if err != nil { - return string(v), nil - } - return convertDateForPostgres(t), nil - case "timestamp", "timestamp without time zone", "timestamp with time zone": - t, err := convertBitsToTime(v) - if err != nil { - return string(v), nil - } - return convertTimestampForPostgres(t), nil - case "money", "uuid", "time with time zone": + return convertDateForPostgres(v) + case "timestamp with time zone": + return convertTimestampWithTimezoneForPostgres(v), nil + case "timestamp", "timestamp without time zone": + return convertTimestampForPostgres(v) + case "money", "uuid", "tsvector", "time with time zone": // Convert UUID []byte to string before inserting since postgres driver stores uuid bytes in different order - // For time with time zone, we keep as string since the timezone info is already properly formatted return string(v), nil } return v, nil @@ -237,26 +232,68 @@ func convertBitsToTime(bits []byte) (time.Time, error) { return time.Time{}, err } } + return t, nil } -func convertDateForPostgres(t time.Time) string { - return convertTimeForPostgres(t, time.DateOnly) +func convertDateForPostgres(input []byte) (string, error) { + return convertTimeForPostgres(input, time.DateOnly) } -func convertTimestampForPostgres(t time.Time) string { - return convertTimeForPostgres(t, time.DateTime) +func convertTimestampForPostgres(input []byte) (string, error) { + return convertTimeForPostgres(input, time.DateTime) } -func convertTimeForPostgres(t time.Time, layout string) string { - // Handle BC dates (negative years in Go) - // year 0 is 1 BC, year -1 is 2 BC, etc. - if t.Year() <= 0 { - // add 1 to get the correct BC year - t = t.AddDate(1, 0, 0) - return fmt.Sprintf("%s BC", t.Format(layout)) +// pgtypes does not handle BC dates correctly +// convertTimeForPostgres handles BC dates properly +func convertTimeForPostgres(timebits []byte, layout string) (string, error) { + if strings.HasPrefix(string(timebits), "-") { + t, err := time.Parse("-2006-01-02T15:04:05Z", string(timebits)) + if err != nil { + return "", err + } + // For negative years, add 1 to get correct BC year + // year -1 is 2 BC, year -2 is 3 BC, etc. + yearsToAdd := t.Year() + 1 + + newT := time.Date(yearsToAdd, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), t.Location()) + return fmt.Sprintf("%s BC", newT.Format(layout)), nil } - return t.Format(layout) + + t, err := convertBitsToTime(timebits) + if err != nil { + return "", err + } + // Handle BC dates year 0 + // year 0 is 1 BC, + if t.Year() == 0 { + newT := time.Date(1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), t.Location()) + return fmt.Sprintf("%s BC", newT.Format(layout)), nil + } + return t.Format(layout), nil +} + +// pgtype.Timestamptz does not support BC dates, so we need to reformat them +func convertTimestampWithTimezoneForPostgres(input []byte) string { + // Remove the 'T' + withoutT := strings.Replace(string(input), "T", " ", 1) + + // Handle year 0000 case (should become 0001 BC) + if strings.HasPrefix(withoutT, "0000") { + rest := withoutT[4:] + return "0001" + rest[:len(rest)-6] + rest[len(rest)-6:] + " BC" + } + + // If starts with '-', remove it and add BC before timezone + if strings.HasPrefix(withoutT, "-") { + yearNum, _ := strconv.Atoi(withoutT[1:5]) + yearNum++ // Add 1 to convert from Go BC year to Postgres BC year + year := fmt.Sprintf("%04d", yearNum) + rest := withoutT[5:] + return year + rest[:len(rest)-6] + rest[len(rest)-6:] + " BC" + } + + return string(input) } // this expects the bits to be in the form [1,2,3] diff --git a/worker/pkg/benthos/sql/processor_neosync_pgx_test.go b/worker/pkg/benthos/sql/processor_neosync_pgx_test.go index a29f58e0c3..f825d66a81 100644 --- a/worker/pkg/benthos/sql/processor_neosync_pgx_test.go +++ b/worker/pkg/benthos/sql/processor_neosync_pgx_test.go @@ -5,6 +5,7 @@ import ( "database/sql/driver" "encoding/json" "testing" + "time" "github.com/doug-martin/goqu/v9" "github.com/lib/pq" @@ -556,3 +557,144 @@ column_default_properties: require.NoError(t, proc.Close(context.Background())) } + +func TestConvertBitsToTime(t *testing.T) { + // Test RFC3339 format + input := []byte("2023-01-01T00:00:00Z") + want := time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) + got, err := convertBitsToTime(input) + require.NoError(t, err) + require.Equal(t, want, got) + + // Test DateTime format + input = []byte("2023-01-01 00:00:00") + want = time.Date(2023, 1, 1, 0, 0, 0, 0, time.UTC) + got, err = convertBitsToTime(input) + require.NoError(t, err) + require.Equal(t, want, got) + + // Test Invalid format + input = []byte("invalid-date") + got, err = convertBitsToTime(input) + require.Error(t, err) + require.Equal(t, time.Time{}, got) + + // Test BC date + input = []byte("0000-01-01 00:00:00") + want = time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC) + got, err = convertBitsToTime(input) + require.NoError(t, err) + require.Equal(t, want, got) + + input = []byte("0000-01-01T00:00:00Z") + want = time.Date(0, 1, 1, 0, 0, 0, 0, time.UTC) + got, err = convertBitsToTime(input) + require.NoError(t, err) + require.Equal(t, want, got) +} +func TestConvertTimeForPostgres(t *testing.T) { + // Test normal date + input := []byte("2023-01-01T00:00:00Z") + got, err := convertTimeForPostgres(input, time.DateTime) + require.NoError(t, err) + require.Equal(t, "2023-01-01 00:00:00", got) + + // Test BC date (year 0) + input = []byte("0000-01-01T00:00:00Z") + got, err = convertTimeForPostgres(input, time.DateTime) + require.NoError(t, err) + require.Equal(t, "0001-01-01 00:00:00 BC", got) + + // Test BC date (negative year) + input = []byte("-0001-01-01T00:00:00Z") + got, err = convertTimeForPostgres(input, time.DateTime) + require.NoError(t, err) + require.Equal(t, "0002-01-01 00:00:00 BC", got) + + // Test BC date (negative year) + input = []byte("-0002-01-01T00:00:00Z") + got, err = convertTimeForPostgres(input, time.DateTime) + require.NoError(t, err) + require.Equal(t, "0003-01-01 00:00:00 BC", got) + + // Test with DateOnly layout + input = []byte("2023-01-01T00:00:00Z") + got, err = convertTimeForPostgres(input, time.DateOnly) + require.NoError(t, err) + require.Equal(t, "2023-01-01", got) + + // Test BC date with DateOnly layout + input = []byte("-0001-01-01T00:00:00Z") + got, err = convertTimeForPostgres(input, time.DateOnly) + require.NoError(t, err) + require.Equal(t, "0002-01-01 BC", got) +} + +func Test_convertTimestampWithTimezoneForPostgres(t *testing.T) { + t.Run("normal timestamp", func(t *testing.T) { + input := []byte("2023-01-01T00:00:00Z") + got := convertTimestampWithTimezoneForPostgres(input) + require.Equal(t, "2023-01-01T00:00:00Z", got) + }) + + t.Run("year 0000 becomes 0001 BC", func(t *testing.T) { + input := []byte("0000-01-01T00:00:00+00") + got := convertTimestampWithTimezoneForPostgres(input) + require.Equal(t, "0001-01-01 00:00:00+00 BC", got) + }) + + t.Run("negative year removes minus and adds BC", func(t *testing.T) { + input := []byte("-0001-01-01T00:00:00+00") + got := convertTimestampWithTimezoneForPostgres(input) + require.Equal(t, "0002-01-01 00:00:00+00 BC", got) + }) + + t.Run("handles timezone offset", func(t *testing.T) { + input := []byte("-0002-01-01T00:00:00+07:00") + got := convertTimestampWithTimezoneForPostgres(input) + require.Equal(t, "0003-01-01 00:00:00+07:00 BC", got) + }) + + t.Run("handles negative timezone offset", func(t *testing.T) { + input := []byte("-0003-01-01T00:00:00-08:00") + got := convertTimestampWithTimezoneForPostgres(input) + require.Equal(t, "0004-01-01 00:00:00-08:00 BC", got) + }) +} + +func Test_convertDateForPostgres(t *testing.T) { + t.Run("normal date", func(t *testing.T) { + input := []byte("2023-01-01T00:00:00Z") + got, err := convertDateForPostgres(input) + require.NoError(t, err) + require.Equal(t, "2023-01-01", got) + }) + + t.Run("handles BC date", func(t *testing.T) { + input := []byte("-0001-01-01T00:00:00Z") + got, err := convertDateForPostgres(input) + require.NoError(t, err) + require.Equal(t, "0002-01-01 BC", got) + }) + + t.Run("handles year 0", func(t *testing.T) { + input := []byte("0000-01-01T00:00:00Z") + got, err := convertDateForPostgres(input) + require.NoError(t, err) + require.Equal(t, "0001-01-01 BC", got) + }) + + t.Run("handles negative years", func(t *testing.T) { + input := []byte("-0002-01-01T00:00:00Z") + got, err := convertDateForPostgres(input) + require.NoError(t, err) + require.Equal(t, "0003-01-01 BC", got) + }) + + t.Run("handles DateTime format", func(t *testing.T) { + input := []byte("2023-01-01 00:00:00") + got, err := convertDateForPostgres(input) + require.NoError(t, err) + require.Equal(t, "2023-01-01", got) + }) +} From 6007a41e9059ca5fae82415f0b7c1cd5c6a8cc62 Mon Sep 17 00:00:00 2001 From: Alisha Date: Fri, 13 Dec 2024 14:27:07 -0800 Subject: [PATCH 6/9] stash --- .../postgres/alltypes/create-tables.sql | 208 +++++++++++------- worker/pkg/benthos/sql/output_sql_insert.go | 7 + .../pkg/benthos/sql/processor_neosync_pgx.go | 8 + 3 files changed, 147 insertions(+), 76 deletions(-) diff --git a/internal/testutil/testdata/postgres/alltypes/create-tables.sql b/internal/testutil/testdata/postgres/alltypes/create-tables.sql index 803a21a864..de84c8c2c1 100644 --- a/internal/testutil/testdata/postgres/alltypes/create-tables.sql +++ b/internal/testutil/testdata/postgres/alltypes/create-tables.sql @@ -232,97 +232,153 @@ VALUES ( CREATE TABLE IF NOT EXISTS array_types ( "id" BIGINT NOT NULL PRIMARY KEY, - -- "int_array" _int4, - -- "smallint_array" _int2, - -- "bigint_array" _int8, - -- "real_array" _float4, - -- "double_array" _float8, - -- "text_array" _text, - -- "varchar_array" _varchar, - -- "char_array" _bpchar, - -- "boolean_array" _bool, - -- "date_array" _date, - -- "time_array" _time, - -- "timestamp_array" _timestamp, - -- "timestamptz_array" _timestamptz, + "int_array" _int4, + "smallint_array" _int2, + "bigint_array" _int8, + "real_array" _float4, + "double_array" _float8, + "text_array" _text, + "varchar_array" _varchar, + "char_array" _bpchar, + "boolean_array" _bool, + "date_array" _date, + "time_array" _time, + "timestamp_array" _timestamp, + "timestamptz_array" _timestamptz, "interval_array" _interval, - -- "inet_array" _inet, // broken - -- "cidr_array" _cidr, - -- "point_array" _point, - -- "line_array" _line, - -- "lseg_array" _lseg, + "inet_array" _inet, + "cidr_array" _cidr, + "point_array" _point, + "line_array" _line, + "lseg_array" _lseg, -- "box_array" _box, // broken - -- "path_array" _path, - -- "polygon_array" _polygon, - -- "circle_array" _circle, - "uuid_array" _uuid - -- "json_array" _json, - -- "jsonb_array" _jsonb, - -- "bit_array" _bit, - -- "varbit_array" _varbit, - -- "numeric_array" _numeric, - -- "money_array" _money, - -- "xml_array" _xml - -- "int_double_array" _int4 + "path_array" _path, + "polygon_array" _polygon, + "circle_array" _circle, + "uuid_array" _uuid, + "json_array" _json, + "jsonb_array" _jsonb, + "bit_array" _bit, + "varbit_array" _varbit, + "numeric_array" _numeric, + "money_array" _money, + "xml_array" _xml, + "int_double_array" _int4 ); - INSERT INTO array_types ( id, - -- int_array, smallint_array, bigint_array, - -- real_array, - -- double_array, - -- text_array, varchar_array, char_array, boolean_array, - -- date_array, - -- time_array, timestamp_array, timestamptz_array, + int_array, smallint_array, bigint_array, + real_array, + double_array, + text_array, varchar_array, char_array, boolean_array, + date_array, + time_array, timestamp_array, timestamptz_array, interval_array, - -- inet_array, cidr_array, - -- point_array, line_array, lseg_array, + inet_array, cidr_array, + point_array, line_array, lseg_array, -- box_array, - -- path_array, polygon_array, circle_array, - uuid_array - -- json_array, jsonb_array, - -- bit_array, varbit_array, numeric_array, - -- money_array, - -- xml_array - -- int_double_array + path_array, polygon_array, circle_array, + uuid_array, + json_array, jsonb_array, + bit_array, varbit_array, numeric_array, + money_array, + xml_array, + int_double_array ) VALUES ( 1, - -- ARRAY[1, 2, 3], - -- ARRAY[10::smallint, 20::smallint], - -- ARRAY[100::bigint, 200::bigint], - -- ARRAY[1.1::real, 2.2::real], - -- ARRAY[1.11::double precision, 2.22::double precision], - -- ARRAY['text1', 'text2'], - -- ARRAY['varchar1'::varchar, 'varchar2'::varchar], - -- ARRAY['a'::char, 'b'::char], - -- ARRAY[true, false], - -- ARRAY['2023-01-01'::date, '2023-01-02'::date], - -- ARRAY['12:00:00'::time, '13:00:00'::time], - -- ARRAY['2023-01-01 12:00:00'::timestamp, '2023-01-02 13:00:00'::timestamp], - -- ARRAY['2023-01-01 12:00:00+00'::timestamptz, '2023-01-02 13:00:00+00'::timestamptz], + ARRAY[1, 2, 3], + ARRAY[10::smallint, 20::smallint], + ARRAY[100::bigint, 200::bigint], + ARRAY[1.1::real, 2.2::real], + ARRAY[1.11::double precision, 2.22::double precision], + ARRAY['text1', 'text2'], + ARRAY['varchar1'::varchar, 'varchar2'::varchar], + ARRAY['a'::char, 'b'::char], + ARRAY[true, false], + ARRAY['2023-01-01'::date, '2023-01-02'::date], + NULL, + ARRAY['2023-01-01 12:00:00'::timestamp, '2023-01-02 13:00:00'::timestamp], + ARRAY['2023-01-01 12:00:00+00'::timestamptz, '2023-01-02 13:00:00+00'::timestamptz], ARRAY['1 day'::interval, '2 hours'::interval], - -- ARRAY['192.168.0.1'::inet, '10.0.0.1'::inet], - -- ARRAY['192.168.0.0/24'::cidr, '10.0.0.0/8'::cidr], - -- ARRAY['(1,1)'::point, '(2,2)'::point], - -- ARRAY['{1,2,2}'::line, '{3,4,4}'::line], - -- ARRAY['(1,1,2,2)'::lseg, '(3,3,4,4)'::lseg], + NULL, + NULL, + NULL, + NULL, + NULL, -- ARRAY['(1,1,2,2)'::box, '(3,3,4,4)'::box], - -- ARRAY['((1,1),(2,2),(3,3))'::path, '((4,4),(5,5),(6,6))'::path], - -- ARRAY['((1,1),(2,2),(3,3))'::polygon, '((4,4),(5,5),(6,6))'::polygon], - -- ARRAY['<(1,1),1>'::circle, '<(2,2),2>'::circle], - ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid] - -- ARRAY['{"key": "value1"}'::json, '{"key": "value2"}'::json], - -- ARRAY['{"key": "value1"}'::jsonb, '{"key": "value2"}'::jsonb], - -- ARRAY['101'::bit(3), '110'::bit(3)], - -- ARRAY['10101'::bit varying(5), '01010'::bit varying(5)], - -- ARRAY[1.23::numeric, 4.56::numeric], - -- ARRAY[10.00::money, 20.00::money], - -- ARRAY['value1'::xml, 'value2'::xml] - -- ARRAY[[1, 2], [3, 4]] + NULL, + NULL, + NULL, + ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid], + ARRAY['{"key": "value1"}'::json, '{"key": "value2"}'::json], + ARRAY['{"key": "value1"}'::jsonb, '{"key": "value2"}'::jsonb], + NULL, + NULL, + ARRAY[1.23::numeric, 4.56::numeric], + ARRAY[10.00::money, 20.00::money], + ARRAY['value1'::xml, 'value2'::xml], + NULL ); + +-- INSERT INTO array_types ( +-- id, +-- int_array, smallint_array, bigint_array, +-- real_array, +-- double_array, +-- text_array, varchar_array, char_array, boolean_array, +-- date_array, +-- time_array, timestamp_array, timestamptz_array, +-- interval_array, +-- inet_array, cidr_array, +-- point_array, line_array, lseg_array, +-- -- box_array, +-- path_array, polygon_array, circle_array, +-- uuid_array, +-- json_array, jsonb_array, +-- bit_array, varbit_array, numeric_array, +-- money_array, +-- xml_array, +-- int_double_array +-- ) VALUES ( +-- 1, +-- ARRAY[1, 2, 3], +-- ARRAY[10::smallint, 20::smallint], +-- ARRAY[100::bigint, 200::bigint], +-- ARRAY[1.1::real, 2.2::real], +-- ARRAY[1.11::double precision, 2.22::double precision], +-- ARRAY['text1', 'text2'], +-- ARRAY['varchar1'::varchar, 'varchar2'::varchar], +-- ARRAY['a'::char, 'b'::char], +-- ARRAY[true, false], +-- ARRAY['2023-01-01'::date, '2023-01-02'::date], +-- ARRAY['12:00:00'::time, '13:00:00'::time], +-- ARRAY['2023-01-01 12:00:00'::timestamp, '2023-01-02 13:00:00'::timestamp], +-- ARRAY['2023-01-01 12:00:00+00'::timestamptz, '2023-01-02 13:00:00+00'::timestamptz], +-- ARRAY['1 day'::interval, '2 hours'::interval], +-- ARRAY['192.168.0.1'::inet, '10.0.0.1'::inet], +-- ARRAY['192.168.0.0/24'::cidr, '10.0.0.0/8'::cidr], +-- ARRAY['(1,1)'::point, '(2,2)'::point], +-- ARRAY['{1,2,2}'::line, '{3,4,4}'::line], +-- ARRAY['(1,1,2,2)'::lseg, '(3,3,4,4)'::lseg], +-- -- ARRAY['(1,1,2,2)'::box, '(3,3,4,4)'::box], +-- ARRAY['((1,1),(2,2),(3,3))'::path, '((4,4),(5,5),(6,6))'::path], +-- ARRAY['((1,1),(2,2),(3,3))'::polygon, '((4,4),(5,5),(6,6))'::polygon], +-- ARRAY['<(1,1),1>'::circle, '<(2,2),2>'::circle], +-- ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid], +-- ARRAY['{"key": "value1"}'::json, '{"key": "value2"}'::json], +-- ARRAY['{"key": "value1"}'::jsonb, '{"key": "value2"}'::jsonb], +-- ARRAY['101'::bit(3), '110'::bit(3)], +-- ARRAY['10101'::bit varying(5), '01010'::bit varying(5)], +-- ARRAY[1.23::numeric, 4.56::numeric], +-- ARRAY[10.00::money, 20.00::money], +-- ARRAY['value1'::xml, 'value2'::xml], +-- ARRAY[[1, 2], [3, 4]] +-- ); + + CREATE TABLE json_data ( id SERIAL PRIMARY KEY, data JSONB diff --git a/worker/pkg/benthos/sql/output_sql_insert.go b/worker/pkg/benthos/sql/output_sql_insert.go index 0fa36ffdba..bb9ba833c8 100644 --- a/worker/pkg/benthos/sql/output_sql_insert.go +++ b/worker/pkg/benthos/sql/output_sql_insert.go @@ -276,6 +276,13 @@ func (s *pooledInsertOutput) WriteBatch(ctx context.Context, batch service.Messa return err } + if s.table == "array_types" { + fmt.Println() + fmt.Println("insertQuery", insertQuery) + fmt.Println("args", args) + fmt.Println() + } + if _, err := s.db.ExecContext(ctx, insertQuery, args...); err != nil { shouldRetry := isDeadlockError(err) || (s.skipForeignKeyViolations && neosync_benthos.IsForeignKeyViolationError(err.Error())) if !shouldRetry { diff --git a/worker/pkg/benthos/sql/processor_neosync_pgx.go b/worker/pkg/benthos/sql/processor_neosync_pgx.go index c7647f2e83..9e774549dd 100644 --- a/worker/pkg/benthos/sql/processor_neosync_pgx.go +++ b/worker/pkg/benthos/sql/processor_neosync_pgx.go @@ -82,10 +82,18 @@ func (p *neosyncToPgxProcessor) ProcessBatch(ctx context.Context, batch service. if err != nil { return nil, err } + + jsonF, _ := json.MarshalIndent(root, "", " ") + fmt.Printf("\n\n root: %s \n\n", string(jsonF)) + newRoot, err := transformNeosyncToPgx(p.logger, root, p.columns, p.columnDataTypes, p.columnDefaultProperties) if err != nil { return nil, err } + + jsonF, _ = json.MarshalIndent(newRoot, "", " ") + fmt.Printf("\n\n newRoot: %s \n\n", string(jsonF)) + newMsg := msg.Copy() newMsg.SetStructured(newRoot) newBatch = append(newBatch, newMsg) From c9fd12c61dd0b9192f5882f2986fc850c7f08299 Mon Sep 17 00:00:00 2001 From: Alisha Date: Fri, 13 Dec 2024 15:09:57 -0800 Subject: [PATCH 7/9] stash --- .../postgres/alltypes/create-tables.sql | 137 ++++++------------ 1 file changed, 45 insertions(+), 92 deletions(-) diff --git a/internal/testutil/testdata/postgres/alltypes/create-tables.sql b/internal/testutil/testdata/postgres/alltypes/create-tables.sql index de84c8c2c1..2f8d7fd44e 100644 --- a/internal/testutil/testdata/postgres/alltypes/create-tables.sql +++ b/internal/testutil/testdata/postgres/alltypes/create-tables.sql @@ -242,49 +242,59 @@ CREATE TABLE IF NOT EXISTS array_types ( "char_array" _bpchar, "boolean_array" _bool, "date_array" _date, - "time_array" _time, + -- "time_array" _time, "timestamp_array" _timestamp, "timestamptz_array" _timestamptz, "interval_array" _interval, - "inet_array" _inet, - "cidr_array" _cidr, - "point_array" _point, - "line_array" _line, - "lseg_array" _lseg, + -- "inet_array" _inet, + -- "cidr_array" _cidr, + -- "point_array" _point, + -- "line_array" _line, + -- "lseg_array" _lseg, -- "box_array" _box, // broken - "path_array" _path, - "polygon_array" _polygon, - "circle_array" _circle, + -- "path_array" _path, + -- "polygon_array" _polygon, + -- "circle_array" _circle, "uuid_array" _uuid, "json_array" _json, "jsonb_array" _jsonb, - "bit_array" _bit, - "varbit_array" _varbit, + -- "bit_array" _bit, + -- "varbit_array" _varbit, "numeric_array" _numeric, "money_array" _money, - "xml_array" _xml, - "int_double_array" _int4 + "xml_array" _xml + -- "int_double_array" _int4 ); + + INSERT INTO array_types ( id, - int_array, smallint_array, bigint_array, + int_array, + smallint_array, + bigint_array, real_array, double_array, - text_array, varchar_array, char_array, boolean_array, + text_array, + varchar_array, + char_array, + boolean_array, date_array, - time_array, timestamp_array, timestamptz_array, + -- time_array, + timestamp_array, + timestamptz_array, interval_array, - inet_array, cidr_array, - point_array, line_array, lseg_array, + -- inet_array, cidr_array, + -- point_array, line_array, lseg_array, -- box_array, - path_array, polygon_array, circle_array, + -- path_array, polygon_array, circle_array, uuid_array, json_array, jsonb_array, - bit_array, varbit_array, numeric_array, + -- bit_array, varbit_array, + numeric_array, money_array, - xml_array, - int_double_array + xml_array + -- int_double_array ) VALUES ( 1, ARRAY[1, 2, 3], @@ -297,88 +307,31 @@ INSERT INTO array_types ( ARRAY['a'::char, 'b'::char], ARRAY[true, false], ARRAY['2023-01-01'::date, '2023-01-02'::date], - NULL, + -- ARRAY['12:00:00'::time, '13:00:00'::time], ARRAY['2023-01-01 12:00:00'::timestamp, '2023-01-02 13:00:00'::timestamp], ARRAY['2023-01-01 12:00:00+00'::timestamptz, '2023-01-02 13:00:00+00'::timestamptz], ARRAY['1 day'::interval, '2 hours'::interval], - NULL, - NULL, - NULL, - NULL, - NULL, + -- ARRAY['192.168.0.1'::inet, '10.0.0.1'::inet], + -- ARRAY['192.168.0.0/24'::cidr, '10.0.0.0/8'::cidr], + -- ARRAY['(1,1)'::point, '(2,2)'::point], + -- ARRAY['{1,2,2}'::line, '{3,4,4}'::line], + -- ARRAY['(1,1,2,2)'::lseg, '(3,3,4,4)'::lseg], -- ARRAY['(1,1,2,2)'::box, '(3,3,4,4)'::box], - NULL, - NULL, - NULL, + -- ARRAY['((1,1),(2,2),(3,3))'::path, '((4,4),(5,5),(6,6))'::path], + -- ARRAY['((1,1),(2,2),(3,3))'::polygon, '((4,4),(5,5),(6,6))'::polygon], + -- ARRAY['<(1,1),1>'::circle, '<(2,2),2>'::circle], ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid], ARRAY['{"key": "value1"}'::json, '{"key": "value2"}'::json], ARRAY['{"key": "value1"}'::jsonb, '{"key": "value2"}'::jsonb], - NULL, - NULL, + -- ARRAY['101'::bit(3), '110'::bit(3)], + -- ARRAY['10101'::bit varying(5), '01010'::bit varying(5)], ARRAY[1.23::numeric, 4.56::numeric], ARRAY[10.00::money, 20.00::money], - ARRAY['value1'::xml, 'value2'::xml], - NULL + ARRAY['value1'::xml, 'value2'::xml] + -- ARRAY[[1, 2], [3, 4]] ); - --- INSERT INTO array_types ( --- id, --- int_array, smallint_array, bigint_array, --- real_array, --- double_array, --- text_array, varchar_array, char_array, boolean_array, --- date_array, --- time_array, timestamp_array, timestamptz_array, --- interval_array, --- inet_array, cidr_array, --- point_array, line_array, lseg_array, --- -- box_array, --- path_array, polygon_array, circle_array, --- uuid_array, --- json_array, jsonb_array, --- bit_array, varbit_array, numeric_array, --- money_array, --- xml_array, --- int_double_array --- ) VALUES ( --- 1, --- ARRAY[1, 2, 3], --- ARRAY[10::smallint, 20::smallint], --- ARRAY[100::bigint, 200::bigint], --- ARRAY[1.1::real, 2.2::real], --- ARRAY[1.11::double precision, 2.22::double precision], --- ARRAY['text1', 'text2'], --- ARRAY['varchar1'::varchar, 'varchar2'::varchar], --- ARRAY['a'::char, 'b'::char], --- ARRAY[true, false], --- ARRAY['2023-01-01'::date, '2023-01-02'::date], --- ARRAY['12:00:00'::time, '13:00:00'::time], --- ARRAY['2023-01-01 12:00:00'::timestamp, '2023-01-02 13:00:00'::timestamp], --- ARRAY['2023-01-01 12:00:00+00'::timestamptz, '2023-01-02 13:00:00+00'::timestamptz], --- ARRAY['1 day'::interval, '2 hours'::interval], --- ARRAY['192.168.0.1'::inet, '10.0.0.1'::inet], --- ARRAY['192.168.0.0/24'::cidr, '10.0.0.0/8'::cidr], --- ARRAY['(1,1)'::point, '(2,2)'::point], --- ARRAY['{1,2,2}'::line, '{3,4,4}'::line], --- ARRAY['(1,1,2,2)'::lseg, '(3,3,4,4)'::lseg], --- -- ARRAY['(1,1,2,2)'::box, '(3,3,4,4)'::box], --- ARRAY['((1,1),(2,2),(3,3))'::path, '((4,4),(5,5),(6,6))'::path], --- ARRAY['((1,1),(2,2),(3,3))'::polygon, '((4,4),(5,5),(6,6))'::polygon], --- ARRAY['<(1,1),1>'::circle, '<(2,2),2>'::circle], --- ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid], --- ARRAY['{"key": "value1"}'::json, '{"key": "value2"}'::json], --- ARRAY['{"key": "value1"}'::jsonb, '{"key": "value2"}'::jsonb], --- ARRAY['101'::bit(3), '110'::bit(3)], --- ARRAY['10101'::bit varying(5), '01010'::bit varying(5)], --- ARRAY[1.23::numeric, 4.56::numeric], --- ARRAY[10.00::money, 20.00::money], --- ARRAY['value1'::xml, 'value2'::xml], --- ARRAY[[1, 2], [3, 4]] --- ); - - CREATE TABLE json_data ( id SERIAL PRIMARY KEY, data JSONB From a509b584137a23e887a7e3434f49e82baadfca8a Mon Sep 17 00:00:00 2001 From: Alisha Date: Fri, 13 Dec 2024 15:17:04 -0800 Subject: [PATCH 8/9] fix test --- cli/internal/cmds/neosync/sync/sync_integration_test.go | 8 ++++---- worker/pkg/benthos/sql/output_sql_insert.go | 7 ------- worker/pkg/benthos/sql/processor_neosync_pgx.go | 6 ------ 3 files changed, 4 insertions(+), 17 deletions(-) diff --git a/cli/internal/cmds/neosync/sync/sync_integration_test.go b/cli/internal/cmds/neosync/sync/sync_integration_test.go index cfdd822dff..31b04ce797 100644 --- a/cli/internal/cmds/neosync/sync/sync_integration_test.go +++ b/cli/internal/cmds/neosync/sync/sync_integration_test.go @@ -135,7 +135,7 @@ func Test_Sync(t *testing.T) { require.Equal(t, `{"name": "John", "age": 30}`, jsonVal) require.Equal(t, `{"age": 30, "name": "John"}`, jsonbVal) // Note: JSONB reorders keys - err = verifyPostgresTimeTimeTableValues(t, ctx, postgres.Target.DB) + err = verifyPostgresTimeTimeTableValues(t, ctx, postgres.Target.DB, "alltypes") require.NoError(t, err) }) @@ -248,7 +248,7 @@ func Test_Sync(t *testing.T) { require.Equal(t, `{"age":30,"name":"John"}`, jsonVal) require.Equal(t, `{"age": 30, "name": "John"}`, jsonbVal) // Note: JSONB reorders keys - err = verifyPostgresTimeTimeTableValues(t, ctx, postgres.Target.DB) + err = verifyPostgresTimeTimeTableValues(t, ctx, postgres.Target.DB, alltypesSchema) require.NoError(t, err) }) @@ -440,8 +440,8 @@ func Test_Sync(t *testing.T) { }) } -func verifyPostgresTimeTimeTableValues(t *testing.T, ctx context.Context, db *pgxpool.Pool) error { - rows, err := db.Query(ctx, "select timestamp_col::text, date_col::text from alltypes.time_time;") +func verifyPostgresTimeTimeTableValues(t *testing.T, ctx context.Context, db *pgxpool.Pool, schema string) error { + rows, err := db.Query(ctx, fmt.Sprintf("select timestamp_col::text, date_col::text from %q.time_time;", schema)) if err != nil { return err } diff --git a/worker/pkg/benthos/sql/output_sql_insert.go b/worker/pkg/benthos/sql/output_sql_insert.go index bb9ba833c8..0fa36ffdba 100644 --- a/worker/pkg/benthos/sql/output_sql_insert.go +++ b/worker/pkg/benthos/sql/output_sql_insert.go @@ -276,13 +276,6 @@ func (s *pooledInsertOutput) WriteBatch(ctx context.Context, batch service.Messa return err } - if s.table == "array_types" { - fmt.Println() - fmt.Println("insertQuery", insertQuery) - fmt.Println("args", args) - fmt.Println() - } - if _, err := s.db.ExecContext(ctx, insertQuery, args...); err != nil { shouldRetry := isDeadlockError(err) || (s.skipForeignKeyViolations && neosync_benthos.IsForeignKeyViolationError(err.Error())) if !shouldRetry { diff --git a/worker/pkg/benthos/sql/processor_neosync_pgx.go b/worker/pkg/benthos/sql/processor_neosync_pgx.go index 9e774549dd..06e418da21 100644 --- a/worker/pkg/benthos/sql/processor_neosync_pgx.go +++ b/worker/pkg/benthos/sql/processor_neosync_pgx.go @@ -83,17 +83,11 @@ func (p *neosyncToPgxProcessor) ProcessBatch(ctx context.Context, batch service. return nil, err } - jsonF, _ := json.MarshalIndent(root, "", " ") - fmt.Printf("\n\n root: %s \n\n", string(jsonF)) - newRoot, err := transformNeosyncToPgx(p.logger, root, p.columns, p.columnDataTypes, p.columnDefaultProperties) if err != nil { return nil, err } - jsonF, _ = json.MarshalIndent(newRoot, "", " ") - fmt.Printf("\n\n newRoot: %s \n\n", string(jsonF)) - newMsg := msg.Copy() newMsg.SetStructured(newRoot) newBatch = append(newBatch, newMsg) From 4630faa24f43d68c760eb5d5ee8cfc198ba0a21e Mon Sep 17 00:00:00 2001 From: Alisha Date: Fri, 13 Dec 2024 15:18:24 -0800 Subject: [PATCH 9/9] clean up --- worker/pkg/benthos/sql/processor_neosync_pgx.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/worker/pkg/benthos/sql/processor_neosync_pgx.go b/worker/pkg/benthos/sql/processor_neosync_pgx.go index 06e418da21..c7647f2e83 100644 --- a/worker/pkg/benthos/sql/processor_neosync_pgx.go +++ b/worker/pkg/benthos/sql/processor_neosync_pgx.go @@ -82,12 +82,10 @@ func (p *neosyncToPgxProcessor) ProcessBatch(ctx context.Context, batch service. if err != nil { return nil, err } - newRoot, err := transformNeosyncToPgx(p.logger, root, p.columns, p.columnDataTypes, p.columnDefaultProperties) if err != nil { return nil, err } - newMsg := msg.Copy() newMsg.SetStructured(newRoot) newBatch = append(newBatch, newMsg)