Skip to content

Commit

Permalink
Fixes postgres and mysql dates / timestamps (#3042)
Browse files Browse the repository at this point in the history
  • Loading branch information
alishakawaguchi authored Dec 13, 2024
1 parent 606354f commit 241feb0
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 68 deletions.
104 changes: 80 additions & 24 deletions cli/internal/cmds/neosync/sync/sync_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"

"connectrpc.com/connect"
"github.com/jackc/pgx/v5/pgxpool"
mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
tcneosyncapi "github.com/nucleuscloud/neosync/backend/pkg/integration-test"
"github.com/nucleuscloud/neosync/cli/internal/output"
Expand Down Expand Up @@ -110,21 +111,32 @@ func Test_Sync(t *testing.T) {
err := sync.configureAndRunSync()
require.NoError(t, err)

rows := postgres.Target.DB.QueryRow(ctx, "select count(*) from humanresources.employees;")
var rowCount int
err = rows.Scan(&rowCount)
rowCount, err := postgres.Target.GetTableRowCount(ctx, "humanresources", "employees")
require.NoError(t, err)
require.Greater(t, rowCount, 1)

rows = postgres.Target.DB.QueryRow(ctx, "select count(*) from humanresources.generated_table;")
err = rows.Scan(&rowCount)
rowCount, err = postgres.Target.GetTableRowCount(ctx, "humanresources", "generated_table")
require.NoError(t, err)
require.Greater(t, rowCount, 1)

rows = postgres.Target.DB.QueryRow(ctx, "select count(*) from alltypes.all_data_types;")
err = rows.Scan(&rowCount)
rowCount, err = postgres.Target.GetTableRowCount(ctx, "alltypes", "all_data_types")
require.NoError(t, err)
require.Greater(t, rowCount, 1)

rowCount, err = postgres.Target.GetTableRowCount(ctx, "alltypes", "time_time")
require.NoError(t, err)
require.Greater(t, rowCount, 0)

var tsvectorVal, jsonVal, jsonbVal string
row := postgres.Target.DB.QueryRow(ctx, "select tsvector_col::text, json_col::text, jsonb_col::text from alltypes.all_data_types where tsvector_col is not null and json_col is not null;")
err = row.Scan(&tsvectorVal, &jsonVal, &jsonbVal)
require.NoError(t, err)
require.Equal(t, "'example' 'tsvector'", tsvectorVal)
require.Equal(t, `{"name": "John", "age": 30}`, jsonVal)
require.Equal(t, `{"age": 30, "name": "John"}`, jsonbVal) // Note: JSONB reorders keys

err = verifyPostgresTimeTimeTableValues(t, ctx, postgres.Target.DB)
require.NoError(t, err)
})

t.Run("S3_end_to_end", func(t *testing.T) {
Expand Down Expand Up @@ -216,16 +228,28 @@ func Test_Sync(t *testing.T) {
require.NoError(t, err)
})

var rowCount int
rows := postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.all_data_types;", alltypesSchema))
err = rows.Scan(&rowCount)
rowCount, err := postgres.Target.GetTableRowCount(ctx, alltypesSchema, "all_data_types")
require.NoError(t, err)
require.Greater(t, rowCount, 1)

rows = postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select count(*) from %s.json_data;", alltypesSchema))
err = rows.Scan(&rowCount)
rowCount, err = postgres.Target.GetTableRowCount(ctx, alltypesSchema, "json_data")
require.NoError(t, err)
require.Greater(t, rowCount, 1)

rowCount, err = postgres.Target.GetTableRowCount(ctx, alltypesSchema, "time_time")
require.NoError(t, err)
require.Greater(t, rowCount, 0)

var tsvectorVal, jsonVal, jsonbVal string
row := postgres.Target.DB.QueryRow(ctx, fmt.Sprintf("select tsvector_col::text, json_col::text, jsonb_col::text from %s.all_data_types where tsvector_col is not null and json_col is not null;", alltypesSchema))
err = row.Scan(&tsvectorVal, &jsonVal, &jsonbVal)
require.NoError(t, err)
require.Equal(t, "'example' 'tsvector'", tsvectorVal)
require.Equal(t, `{"age":30,"name":"John"}`, jsonVal)
require.Equal(t, `{"age": 30, "name": "John"}`, jsonbVal) // Note: JSONB reorders keys

err = verifyPostgresTimeTimeTableValues(t, ctx, postgres.Target.DB)
require.NoError(t, err)
})

t.Cleanup(func() {
Expand Down Expand Up @@ -290,19 +314,15 @@ func Test_Sync(t *testing.T) {
err := sync.configureAndRunSync()
require.NoError(t, err)

rows := mysql.Target.DB.QueryRowContext(ctx, "select count(*) from humanresources.locations;")
var rowCount int
err = rows.Scan(&rowCount)
rowCount, err := mysql.Target.GetTableRowCount(ctx, "humanresources", "locations")
require.NoError(t, err)
require.Greater(t, rowCount, 1)

rows = mysql.Target.DB.QueryRowContext(ctx, "select count(*) from humanresources.generated_table;")
err = rows.Scan(&rowCount)
rowCount, err = mysql.Target.GetTableRowCount(ctx, "humanresources", "generated_table")
require.NoError(t, err)
require.Greater(t, rowCount, 1)

rows = mysql.Target.DB.QueryRowContext(ctx, "select count(*) from alltypes.all_data_types;")
err = rows.Scan(&rowCount)
rowCount, err = mysql.Target.GetTableRowCount(ctx, "alltypes", "all_data_types")
require.NoError(t, err)
require.Greater(t, rowCount, 1)
})
Expand Down Expand Up @@ -395,14 +415,11 @@ func Test_Sync(t *testing.T) {
require.NoError(t, err)
})

var rowCount int
rows := mysql.Target.DB.QueryRowContext(ctx, fmt.Sprintf("select count(*) from %s.all_data_types;", alltypesSchema))
err = rows.Scan(&rowCount)
rowCount, err := mysql.Target.GetTableRowCount(ctx, alltypesSchema, "all_data_types")
require.NoError(t, err)
require.Greater(t, rowCount, 1)

rows = mysql.Target.DB.QueryRowContext(ctx, fmt.Sprintf("select count(*) from %s.json_data;", alltypesSchema))
err = rows.Scan(&rowCount)
rowCount, err = mysql.Target.GetTableRowCount(ctx, alltypesSchema, "json_data")
require.NoError(t, err)
require.Greater(t, rowCount, 1)
})
Expand All @@ -422,3 +439,42 @@ func Test_Sync(t *testing.T) {
}
})
}

func verifyPostgresTimeTimeTableValues(t *testing.T, ctx context.Context, db *pgxpool.Pool) error {
rows, err := db.Query(ctx, "select timestamp_col::text, date_col::text from alltypes.time_time;")
if err != nil {
return err
}
defer rows.Close()

expectedTimestamps := [][]byte{
[]byte("2024-03-18 10:30:00"),
[]byte("0001-01-01 00:00:00 BC"),
[]byte("0002-01-01 00:00:00 BC"),
}
expectedDates := [][]byte{
[]byte("2024-03-18"),
[]byte("0001-01-01 BC"),
[]byte("0002-01-01 BC"),
}
var actualTimestamps [][]byte
var actualDates [][]byte

for rows.Next() {
var timestampCol, dateCol []byte
err = rows.Scan(&timestampCol, &dateCol)
if err != nil {
return err
}
actualTimestamps = append(actualTimestamps, timestampCol)
actualDates = append(actualDates, dateCol)
}

if err = rows.Err(); err != nil {
return err
}

require.ElementsMatch(t, expectedTimestamps, actualTimestamps, "Expected timestamp_col values to match")
require.ElementsMatch(t, expectedDates, actualDates, "Expected date_col values to match")
return nil
}
10 changes: 10 additions & 0 deletions internal/testutil/testcontainers/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,13 @@ func (m *MysqlTestContainer) CreateDatabases(ctx context.Context, schemas []stri
}
return nil
}

func (m *MysqlTestContainer) GetTableRowCount(ctx context.Context, schema, table string) (int, error) {
rows := m.DB.QueryRowContext(ctx, fmt.Sprintf("SELECT COUNT(*) FROM `%s`.`%s`;", schema, table))
var count int
err := rows.Scan(&count)
if err != nil {
return 0, err
}
return count, nil
}
10 changes: 10 additions & 0 deletions internal/testutil/testcontainers/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,13 @@ func (p *PostgresTestContainer) CreateSchemas(ctx context.Context, schemas []str
}
return nil
}

func (p *PostgresTestContainer) GetTableRowCount(ctx context.Context, schema, table string) (int, error) {
rows := p.DB.QueryRow(ctx, fmt.Sprintf("SELECT COUNT(*) FROM %q.%q;", schema, table))
var count int
err := rows.Scan(&count)
if err != nil {
return 0, err
}
return count, nil
}
92 changes: 52 additions & 40 deletions internal/testutil/testdata/postgres/alltypes/create-tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ CREATE TABLE IF NOT EXISTS all_data_types (
timestamp_col TIMESTAMP,
timestamptz_col TIMESTAMPTZ,
date_col DATE,
-- time_col TIME,
-- timetz_col TIMETZ,
time_col TIME,
timetz_col TIMETZ,
interval_col INTERVAL,

-- Boolean Type
Expand Down Expand Up @@ -100,8 +100,8 @@ INSERT INTO all_data_types (
timestamp_col,
timestamptz_col,
date_col,
-- time_col,
-- timetz_col,
time_col,
timetz_col,
interval_col,
boolean_col,
uuid_col,
Expand Down Expand Up @@ -149,8 +149,8 @@ INSERT INTO all_data_types (
'2024-01-01 12:34:56', -- timestamp_col
'2024-01-01 12:34:56+00', -- timestamptz_col
'2024-01-01', -- date_col
-- '12:34:56', -- time_col
-- '12:34:56+00', -- timetz_col
'12:34:56', -- time_col
'12:34:56+00', -- timetz_col
'1 day', -- interval_col
TRUE, -- boolean_col
'123e4567-e89b-12d3-a456-426614174000', -- uuid_col
Expand Down Expand Up @@ -188,34 +188,46 @@ INSERT INTO all_data_types (
);


-- CREATE TABLE IF NOT EXISTS time_time (
-- id SERIAL PRIMARY KEY,
-- timestamp_col TIMESTAMP,
-- timestamptz_col TIMESTAMPTZ,
-- date_col DATE
-- );

-- INSERT INTO time_time (
-- timestamp_col,
-- timestamptz_col,
-- date_col
-- )
-- VALUES (
-- '2024-03-18 10:30:00',
-- '2024-03-18 10:30:00+00',
-- '2024-03-18'
-- );

-- INSERT INTO time_time (
-- timestamp_col,
-- timestamptz_col,
-- date_col
-- )
-- VALUES (
-- '0001-01-01 00:00:00 BC',
-- '0001-01-01 00:00:00+00 BC',
-- '0001-01-01 BC'
-- );
CREATE TABLE IF NOT EXISTS time_time (
id SERIAL PRIMARY KEY,
timestamp_col TIMESTAMP,
timestamptz_col TIMESTAMPTZ,
date_col DATE
);

INSERT INTO time_time (
timestamp_col,
timestamptz_col,
date_col
)
VALUES (
'2024-03-18 10:30:00',
'2024-03-18 10:30:00+00',
'2024-03-18'
);

INSERT INTO time_time (
timestamp_col,
timestamptz_col,
date_col
)
VALUES (
'0001-01-01 00:00:00 BC',
'0001-01-01 00:00:00+00 BC',
'0001-01-01 BC'
);

INSERT INTO time_time (
timestamp_col,
timestamptz_col,
date_col
)
VALUES (
'0002-01-01 00:00:00 BC',
'0002-01-01 00:00:00+00 BC',
'0002-01-01 BC'
);



CREATE TABLE IF NOT EXISTS array_types (
Expand All @@ -233,7 +245,7 @@ CREATE TABLE IF NOT EXISTS array_types (
-- "time_array" _time,
-- "timestamp_array" _timestamp,
-- "timestamptz_array" _timestamptz,
"interval_array" _interval
"interval_array" _interval,
-- "inet_array" _inet, // broken
-- "cidr_array" _cidr,
-- "point_array" _point,
Expand All @@ -243,7 +255,7 @@ CREATE TABLE IF NOT EXISTS array_types (
-- "path_array" _path,
-- "polygon_array" _polygon,
-- "circle_array" _circle,
-- "uuid_array" _uuid,
"uuid_array" _uuid
-- "json_array" _json,
-- "jsonb_array" _jsonb,
-- "bit_array" _bit,
Expand All @@ -263,12 +275,12 @@ INSERT INTO array_types (
-- text_array, varchar_array, char_array, boolean_array,
-- date_array,
-- time_array, timestamp_array, timestamptz_array,
interval_array
interval_array,
-- inet_array, cidr_array,
-- point_array, line_array, lseg_array,
-- box_array,
-- path_array, polygon_array, circle_array,
-- uuid_array,
uuid_array
-- json_array, jsonb_array,
-- bit_array, varbit_array, numeric_array,
-- money_array,
Expand All @@ -289,7 +301,7 @@ INSERT INTO array_types (
-- ARRAY['12:00:00'::time, '13:00:00'::time],
-- ARRAY['2023-01-01 12:00:00'::timestamp, '2023-01-02 13:00:00'::timestamp],
-- ARRAY['2023-01-01 12:00:00+00'::timestamptz, '2023-01-02 13:00:00+00'::timestamptz],
ARRAY['1 day'::interval, '2 hours'::interval]
ARRAY['1 day'::interval, '2 hours'::interval],
-- ARRAY['192.168.0.1'::inet, '10.0.0.1'::inet],
-- ARRAY['192.168.0.0/24'::cidr, '10.0.0.0/8'::cidr],
-- ARRAY['(1,1)'::point, '(2,2)'::point],
Expand All @@ -299,7 +311,7 @@ INSERT INTO array_types (
-- ARRAY['((1,1),(2,2),(3,3))'::path, '((4,4),(5,5),(6,6))'::path],
-- ARRAY['((1,1),(2,2),(3,3))'::polygon, '((4,4),(5,5),(6,6))'::polygon],
-- ARRAY['<(1,1),1>'::circle, '<(2,2),2>'::circle],
-- ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid],
ARRAY['a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid]
-- ARRAY['{"key": "value1"}'::json, '{"key": "value2"}'::json],
-- ARRAY['{"key": "value1"}'::jsonb, '{"key": "value2"}'::jsonb],
-- ARRAY['101'::bit(3), '110'::bit(3)],
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/benthos/json/processor_neosync_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func transform(root any) any {
}
return newSlice
case time.Time:
return v.Format(time.DateTime)
return v.Format(time.RFC3339)
case []uint8:
return string(v)
// TODO this should be neosync bit type
Expand Down
Loading

0 comments on commit 241feb0

Please sign in to comment.