From cfa0462d3dce354f3a62f768bfc955736f128943 Mon Sep 17 00:00:00 2001
From: Nick Z <2420177+nickzelei@users.noreply.github.com>
Date: Fri, 3 Nov 2023 21:09:39 +0000
Subject: [PATCH] migrates mysql to use auto-gen code

---
 .mockery.yml                                  |   5 +
 worker/gen/go/db/mysql/mock_DBTX.go           | 280 ++++++++++++++++++
 worker/gen/go/db/mysql/mock_Querier.go        | 259 ++++++++++++++++
 worker/internal/dbschemas/mysql/mock_DBTX.go  | 159 ----------
 worker/internal/dbschemas/mysql/mysql.go      | 270 +----------------
 worker/internal/dbschemas/mysql/mysql_test.go |  15 +-
 .../datasync/activities/activities.go         |   8 +-
 .../datasync/activities/benthos-builder.go    |  33 ++-
 .../activities/benthos-builder_test.go        |  24 +-
 9 files changed, 590 insertions(+), 463 deletions(-)
 create mode 100644 worker/gen/go/db/mysql/mock_DBTX.go
 create mode 100644 worker/gen/go/db/mysql/mock_Querier.go
 delete mode 100644 worker/internal/dbschemas/mysql/mock_DBTX.go

diff --git a/.mockery.yml b/.mockery.yml
index f63bb63900..4826f17193 100644
--- a/.mockery.yml
+++ b/.mockery.yml
@@ -33,3 +33,8 @@ packages:
     interfaces:
       Querier:
       DBTX:
+  github.com/nucleuscloud/neosync/worker/gen/go/db/mysql:
+    # config:
+    interfaces:
+      Querier:
+      DBTX:
diff --git a/worker/gen/go/db/mysql/mock_DBTX.go b/worker/gen/go/db/mysql/mock_DBTX.go
new file mode 100644
index 0000000000..3c961c2c12
--- /dev/null
+++ b/worker/gen/go/db/mysql/mock_DBTX.go
@@ -0,0 +1,280 @@
+// Code generated by mockery. DO NOT EDIT.
+
+package mysql_queries
+
+import (
+	context "context"
+	sql "database/sql"
+
+	mock "github.com/stretchr/testify/mock"
+)
+
+// MockDBTX is an autogenerated mock type for the DBTX type
+type MockDBTX struct {
+	mock.Mock
+}
+
+type MockDBTX_Expecter struct {
+	mock *mock.Mock
+}
+
+func (_m *MockDBTX) EXPECT() *MockDBTX_Expecter {
+	return &MockDBTX_Expecter{mock: &_m.Mock}
+}
+
+// ExecContext provides a mock function with given fields: _a0, _a1, _a2
+func (_m *MockDBTX) ExecContext(_a0 context.Context, _a1 string, _a2 ...interface{}) (sql.Result, error) {
+	var _ca []interface{}
+	_ca = append(_ca, _a0, _a1)
+	_ca = append(_ca, _a2...)
+	ret := _m.Called(_ca...)
+
+	var r0 sql.Result
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (sql.Result, error)); ok {
+		return rf(_a0, _a1, _a2...)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) sql.Result); ok {
+		r0 = rf(_a0, _a1, _a2...)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(sql.Result)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok {
+		r1 = rf(_a0, _a1, _a2...)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockDBTX_ExecContext_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ExecContext'
+type MockDBTX_ExecContext_Call struct {
+	*mock.Call
+}
+
+// ExecContext is a helper method to define mock.On call
+//   - _a0 context.Context
+//   - _a1 string
+//   - _a2 ...interface{}
+func (_e *MockDBTX_Expecter) ExecContext(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *MockDBTX_ExecContext_Call {
+	return &MockDBTX_ExecContext_Call{Call: _e.mock.On("ExecContext",
+		append([]interface{}{_a0, _a1}, _a2...)...)}
+}
+
+func (_c *MockDBTX_ExecContext_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *MockDBTX_ExecContext_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		variadicArgs := make([]interface{}, len(args)-2)
+		for i, a := range args[2:] {
+			if a != nil {
+				variadicArgs[i] = a.(interface{})
+			}
+		}
+		run(args[0].(context.Context), args[1].(string), variadicArgs...)
+	})
+	return _c
+}
+
+func (_c *MockDBTX_ExecContext_Call) Return(_a0 sql.Result, _a1 error) *MockDBTX_ExecContext_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockDBTX_ExecContext_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (sql.Result, error)) *MockDBTX_ExecContext_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// PrepareContext provides a mock function with given fields: _a0, _a1
+func (_m *MockDBTX) PrepareContext(_a0 context.Context, _a1 string) (*sql.Stmt, error) {
+	ret := _m.Called(_a0, _a1)
+
+	var r0 *sql.Stmt
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, string) (*sql.Stmt, error)); ok {
+		return rf(_a0, _a1)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, string) *sql.Stmt); ok {
+		r0 = rf(_a0, _a1)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(*sql.Stmt)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, string) error); ok {
+		r1 = rf(_a0, _a1)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockDBTX_PrepareContext_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'PrepareContext'
+type MockDBTX_PrepareContext_Call struct {
+	*mock.Call
+}
+
+// PrepareContext is a helper method to define mock.On call
+//   - _a0 context.Context
+//   - _a1 string
+func (_e *MockDBTX_Expecter) PrepareContext(_a0 interface{}, _a1 interface{}) *MockDBTX_PrepareContext_Call {
+	return &MockDBTX_PrepareContext_Call{Call: _e.mock.On("PrepareContext", _a0, _a1)}
+}
+
+func (_c *MockDBTX_PrepareContext_Call) Run(run func(_a0 context.Context, _a1 string)) *MockDBTX_PrepareContext_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(context.Context), args[1].(string))
+	})
+	return _c
+}
+
+func (_c *MockDBTX_PrepareContext_Call) Return(_a0 *sql.Stmt, _a1 error) *MockDBTX_PrepareContext_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockDBTX_PrepareContext_Call) RunAndReturn(run func(context.Context, string) (*sql.Stmt, error)) *MockDBTX_PrepareContext_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// QueryContext provides a mock function with given fields: _a0, _a1, _a2
+func (_m *MockDBTX) QueryContext(_a0 context.Context, _a1 string, _a2 ...interface{}) (*sql.Rows, error) {
+	var _ca []interface{}
+	_ca = append(_ca, _a0, _a1)
+	_ca = append(_ca, _a2...)
+	ret := _m.Called(_ca...)
+
+	var r0 *sql.Rows
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (*sql.Rows, error)); ok {
+		return rf(_a0, _a1, _a2...)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) *sql.Rows); ok {
+		r0 = rf(_a0, _a1, _a2...)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(*sql.Rows)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok {
+		r1 = rf(_a0, _a1, _a2...)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockDBTX_QueryContext_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryContext'
+type MockDBTX_QueryContext_Call struct {
+	*mock.Call
+}
+
+// QueryContext is a helper method to define mock.On call
+//   - _a0 context.Context
+//   - _a1 string
+//   - _a2 ...interface{}
+func (_e *MockDBTX_Expecter) QueryContext(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *MockDBTX_QueryContext_Call {
+	return &MockDBTX_QueryContext_Call{Call: _e.mock.On("QueryContext",
+		append([]interface{}{_a0, _a1}, _a2...)...)}
+}
+
+func (_c *MockDBTX_QueryContext_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *MockDBTX_QueryContext_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		variadicArgs := make([]interface{}, len(args)-2)
+		for i, a := range args[2:] {
+			if a != nil {
+				variadicArgs[i] = a.(interface{})
+			}
+		}
+		run(args[0].(context.Context), args[1].(string), variadicArgs...)
+	})
+	return _c
+}
+
+func (_c *MockDBTX_QueryContext_Call) Return(_a0 *sql.Rows, _a1 error) *MockDBTX_QueryContext_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockDBTX_QueryContext_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (*sql.Rows, error)) *MockDBTX_QueryContext_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// QueryRowContext provides a mock function with given fields: _a0, _a1, _a2
+func (_m *MockDBTX) QueryRowContext(_a0 context.Context, _a1 string, _a2 ...interface{}) *sql.Row {
+	var _ca []interface{}
+	_ca = append(_ca, _a0, _a1)
+	_ca = append(_ca, _a2...)
+	ret := _m.Called(_ca...)
+
+	var r0 *sql.Row
+	if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) *sql.Row); ok {
+		r0 = rf(_a0, _a1, _a2...)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(*sql.Row)
+		}
+	}
+
+	return r0
+}
+
+// MockDBTX_QueryRowContext_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryRowContext'
+type MockDBTX_QueryRowContext_Call struct {
+	*mock.Call
+}
+
+// QueryRowContext is a helper method to define mock.On call
+//   - _a0 context.Context
+//   - _a1 string
+//   - _a2 ...interface{}
+func (_e *MockDBTX_Expecter) QueryRowContext(_a0 interface{}, _a1 interface{}, _a2 ...interface{}) *MockDBTX_QueryRowContext_Call {
+	return &MockDBTX_QueryRowContext_Call{Call: _e.mock.On("QueryRowContext",
+		append([]interface{}{_a0, _a1}, _a2...)...)}
+}
+
+func (_c *MockDBTX_QueryRowContext_Call) Run(run func(_a0 context.Context, _a1 string, _a2 ...interface{})) *MockDBTX_QueryRowContext_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		variadicArgs := make([]interface{}, len(args)-2)
+		for i, a := range args[2:] {
+			if a != nil {
+				variadicArgs[i] = a.(interface{})
+			}
+		}
+		run(args[0].(context.Context), args[1].(string), variadicArgs...)
+	})
+	return _c
+}
+
+func (_c *MockDBTX_QueryRowContext_Call) Return(_a0 *sql.Row) *MockDBTX_QueryRowContext_Call {
+	_c.Call.Return(_a0)
+	return _c
+}
+
+func (_c *MockDBTX_QueryRowContext_Call) RunAndReturn(run func(context.Context, string, ...interface{}) *sql.Row) *MockDBTX_QueryRowContext_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// NewMockDBTX creates a new instance of MockDBTX. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewMockDBTX(t interface {
+	mock.TestingT
+	Cleanup(func())
+}) *MockDBTX {
+	mock := &MockDBTX{}
+	mock.Mock.Test(t)
+
+	t.Cleanup(func() { mock.AssertExpectations(t) })
+
+	return mock
+}
diff --git a/worker/gen/go/db/mysql/mock_Querier.go b/worker/gen/go/db/mysql/mock_Querier.go
new file mode 100644
index 0000000000..410c59151e
--- /dev/null
+++ b/worker/gen/go/db/mysql/mock_Querier.go
@@ -0,0 +1,259 @@
+// Code generated by mockery. DO NOT EDIT.
+
+package mysql_queries
+
+import (
+	context "context"
+
+	mock "github.com/stretchr/testify/mock"
+)
+
+// MockQuerier is an autogenerated mock type for the Querier type
+type MockQuerier struct {
+	mock.Mock
+}
+
+type MockQuerier_Expecter struct {
+	mock *mock.Mock
+}
+
+func (_m *MockQuerier) EXPECT() *MockQuerier_Expecter {
+	return &MockQuerier_Expecter{mock: &_m.Mock}
+}
+
+// GetDatabaseSchema provides a mock function with given fields: ctx, db
+func (_m *MockQuerier) GetDatabaseSchema(ctx context.Context, db DBTX) ([]*GetDatabaseSchemaRow, error) {
+	ret := _m.Called(ctx, db)
+
+	var r0 []*GetDatabaseSchemaRow
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, DBTX) ([]*GetDatabaseSchemaRow, error)); ok {
+		return rf(ctx, db)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, DBTX) []*GetDatabaseSchemaRow); ok {
+		r0 = rf(ctx, db)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]*GetDatabaseSchemaRow)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, DBTX) error); ok {
+		r1 = rf(ctx, db)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockQuerier_GetDatabaseSchema_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDatabaseSchema'
+type MockQuerier_GetDatabaseSchema_Call struct {
+	*mock.Call
+}
+
+// GetDatabaseSchema is a helper method to define mock.On call
+//   - ctx context.Context
+//   - db DBTX
+func (_e *MockQuerier_Expecter) GetDatabaseSchema(ctx interface{}, db interface{}) *MockQuerier_GetDatabaseSchema_Call {
+	return &MockQuerier_GetDatabaseSchema_Call{Call: _e.mock.On("GetDatabaseSchema", ctx, db)}
+}
+
+func (_c *MockQuerier_GetDatabaseSchema_Call) Run(run func(ctx context.Context, db DBTX)) *MockQuerier_GetDatabaseSchema_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(context.Context), args[1].(DBTX))
+	})
+	return _c
+}
+
+func (_c *MockQuerier_GetDatabaseSchema_Call) Return(_a0 []*GetDatabaseSchemaRow, _a1 error) *MockQuerier_GetDatabaseSchema_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockQuerier_GetDatabaseSchema_Call) RunAndReturn(run func(context.Context, DBTX) ([]*GetDatabaseSchemaRow, error)) *MockQuerier_GetDatabaseSchema_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetDatabaseTableSchema provides a mock function with given fields: ctx, db, arg
+func (_m *MockQuerier) GetDatabaseTableSchema(ctx context.Context, db DBTX, arg *GetDatabaseTableSchemaParams) ([]*GetDatabaseTableSchemaRow, error) {
+	ret := _m.Called(ctx, db, arg)
+
+	var r0 []*GetDatabaseTableSchemaRow
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, DBTX, *GetDatabaseTableSchemaParams) ([]*GetDatabaseTableSchemaRow, error)); ok {
+		return rf(ctx, db, arg)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, DBTX, *GetDatabaseTableSchemaParams) []*GetDatabaseTableSchemaRow); ok {
+		r0 = rf(ctx, db, arg)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]*GetDatabaseTableSchemaRow)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, DBTX, *GetDatabaseTableSchemaParams) error); ok {
+		r1 = rf(ctx, db, arg)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockQuerier_GetDatabaseTableSchema_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDatabaseTableSchema'
+type MockQuerier_GetDatabaseTableSchema_Call struct {
+	*mock.Call
+}
+
+// GetDatabaseTableSchema is a helper method to define mock.On call
+//   - ctx context.Context
+//   - db DBTX
+//   - arg *GetDatabaseTableSchemaParams
+func (_e *MockQuerier_Expecter) GetDatabaseTableSchema(ctx interface{}, db interface{}, arg interface{}) *MockQuerier_GetDatabaseTableSchema_Call {
+	return &MockQuerier_GetDatabaseTableSchema_Call{Call: _e.mock.On("GetDatabaseTableSchema", ctx, db, arg)}
+}
+
+func (_c *MockQuerier_GetDatabaseTableSchema_Call) Run(run func(ctx context.Context, db DBTX, arg *GetDatabaseTableSchemaParams)) *MockQuerier_GetDatabaseTableSchema_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(context.Context), args[1].(DBTX), args[2].(*GetDatabaseTableSchemaParams))
+	})
+	return _c
+}
+
+func (_c *MockQuerier_GetDatabaseTableSchema_Call) Return(_a0 []*GetDatabaseTableSchemaRow, _a1 error) *MockQuerier_GetDatabaseTableSchema_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockQuerier_GetDatabaseTableSchema_Call) RunAndReturn(run func(context.Context, DBTX, *GetDatabaseTableSchemaParams) ([]*GetDatabaseTableSchemaRow, error)) *MockQuerier_GetDatabaseTableSchema_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetForeignKeyConstraints provides a mock function with given fields: ctx, db, tableSchema
+func (_m *MockQuerier) GetForeignKeyConstraints(ctx context.Context, db DBTX, tableSchema string) ([]*GetForeignKeyConstraintsRow, error) {
+	ret := _m.Called(ctx, db, tableSchema)
+
+	var r0 []*GetForeignKeyConstraintsRow
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, DBTX, string) ([]*GetForeignKeyConstraintsRow, error)); ok {
+		return rf(ctx, db, tableSchema)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, DBTX, string) []*GetForeignKeyConstraintsRow); ok {
+		r0 = rf(ctx, db, tableSchema)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]*GetForeignKeyConstraintsRow)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, DBTX, string) error); ok {
+		r1 = rf(ctx, db, tableSchema)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockQuerier_GetForeignKeyConstraints_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetForeignKeyConstraints'
+type MockQuerier_GetForeignKeyConstraints_Call struct {
+	*mock.Call
+}
+
+// GetForeignKeyConstraints is a helper method to define mock.On call
+//   - ctx context.Context
+//   - db DBTX
+//   - tableSchema string
+func (_e *MockQuerier_Expecter) GetForeignKeyConstraints(ctx interface{}, db interface{}, tableSchema interface{}) *MockQuerier_GetForeignKeyConstraints_Call {
+	return &MockQuerier_GetForeignKeyConstraints_Call{Call: _e.mock.On("GetForeignKeyConstraints", ctx, db, tableSchema)}
+}
+
+func (_c *MockQuerier_GetForeignKeyConstraints_Call) Run(run func(ctx context.Context, db DBTX, tableSchema string)) *MockQuerier_GetForeignKeyConstraints_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(context.Context), args[1].(DBTX), args[2].(string))
+	})
+	return _c
+}
+
+func (_c *MockQuerier_GetForeignKeyConstraints_Call) Return(_a0 []*GetForeignKeyConstraintsRow, _a1 error) *MockQuerier_GetForeignKeyConstraints_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockQuerier_GetForeignKeyConstraints_Call) RunAndReturn(run func(context.Context, DBTX, string) ([]*GetForeignKeyConstraintsRow, error)) *MockQuerier_GetForeignKeyConstraints_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// GetTableConstraints provides a mock function with given fields: ctx, db, arg
+func (_m *MockQuerier) GetTableConstraints(ctx context.Context, db DBTX, arg *GetTableConstraintsParams) ([]*GetTableConstraintsRow, error) {
+	ret := _m.Called(ctx, db, arg)
+
+	var r0 []*GetTableConstraintsRow
+	var r1 error
+	if rf, ok := ret.Get(0).(func(context.Context, DBTX, *GetTableConstraintsParams) ([]*GetTableConstraintsRow, error)); ok {
+		return rf(ctx, db, arg)
+	}
+	if rf, ok := ret.Get(0).(func(context.Context, DBTX, *GetTableConstraintsParams) []*GetTableConstraintsRow); ok {
+		r0 = rf(ctx, db, arg)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]*GetTableConstraintsRow)
+		}
+	}
+
+	if rf, ok := ret.Get(1).(func(context.Context, DBTX, *GetTableConstraintsParams) error); ok {
+		r1 = rf(ctx, db, arg)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// MockQuerier_GetTableConstraints_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetTableConstraints'
+type MockQuerier_GetTableConstraints_Call struct {
+	*mock.Call
+}
+
+// GetTableConstraints is a helper method to define mock.On call
+//   - ctx context.Context
+//   - db DBTX
+//   - arg *GetTableConstraintsParams
+func (_e *MockQuerier_Expecter) GetTableConstraints(ctx interface{}, db interface{}, arg interface{}) *MockQuerier_GetTableConstraints_Call {
+	return &MockQuerier_GetTableConstraints_Call{Call: _e.mock.On("GetTableConstraints", ctx, db, arg)}
+}
+
+func (_c *MockQuerier_GetTableConstraints_Call) Run(run func(ctx context.Context, db DBTX, arg *GetTableConstraintsParams)) *MockQuerier_GetTableConstraints_Call {
+	_c.Call.Run(func(args mock.Arguments) {
+		run(args[0].(context.Context), args[1].(DBTX), args[2].(*GetTableConstraintsParams))
+	})
+	return _c
+}
+
+func (_c *MockQuerier_GetTableConstraints_Call) Return(_a0 []*GetTableConstraintsRow, _a1 error) *MockQuerier_GetTableConstraints_Call {
+	_c.Call.Return(_a0, _a1)
+	return _c
+}
+
+func (_c *MockQuerier_GetTableConstraints_Call) RunAndReturn(run func(context.Context, DBTX, *GetTableConstraintsParams) ([]*GetTableConstraintsRow, error)) *MockQuerier_GetTableConstraints_Call {
+	_c.Call.Return(run)
+	return _c
+}
+
+// NewMockQuerier creates a new instance of MockQuerier. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
+// The first argument is typically a *testing.T value.
+func NewMockQuerier(t interface {
+	mock.TestingT
+	Cleanup(func())
+}) *MockQuerier {
+	mock := &MockQuerier{}
+	mock.Mock.Test(t)
+
+	t.Cleanup(func() { mock.AssertExpectations(t) })
+
+	return mock
+}
diff --git a/worker/internal/dbschemas/mysql/mock_DBTX.go b/worker/internal/dbschemas/mysql/mock_DBTX.go
deleted file mode 100644
index 1199112c43..0000000000
--- a/worker/internal/dbschemas/mysql/mock_DBTX.go
+++ /dev/null
@@ -1,159 +0,0 @@
-// Code generated by mockery. DO NOT EDIT.
-
-package dbschemas_mysql
-
-import (
-	context "context"
-	sql "database/sql"
-
-	mock "github.com/stretchr/testify/mock"
-)
-
-// MockDBTX is an autogenerated mock type for the DBTX type
-type MockDBTX struct {
-	mock.Mock
-}
-
-type MockDBTX_Expecter struct {
-	mock *mock.Mock
-}
-
-func (_m *MockDBTX) EXPECT() *MockDBTX_Expecter {
-	return &MockDBTX_Expecter{mock: &_m.Mock}
-}
-
-// QueryContext provides a mock function with given fields: ctx, query, args
-func (_m *MockDBTX) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
-	var _ca []interface{}
-	_ca = append(_ca, ctx, query)
-	_ca = append(_ca, args...)
-	ret := _m.Called(_ca...)
-
-	var r0 *sql.Rows
-	var r1 error
-	if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) (*sql.Rows, error)); ok {
-		return rf(ctx, query, args...)
-	}
-	if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) *sql.Rows); ok {
-		r0 = rf(ctx, query, args...)
-	} else {
-		if ret.Get(0) != nil {
-			r0 = ret.Get(0).(*sql.Rows)
-		}
-	}
-
-	if rf, ok := ret.Get(1).(func(context.Context, string, ...interface{}) error); ok {
-		r1 = rf(ctx, query, args...)
-	} else {
-		r1 = ret.Error(1)
-	}
-
-	return r0, r1
-}
-
-// MockDBTX_QueryContext_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryContext'
-type MockDBTX_QueryContext_Call struct {
-	*mock.Call
-}
-
-// QueryContext is a helper method to define mock.On call
-//   - ctx context.Context
-//   - query string
-//   - args ...interface{}
-func (_e *MockDBTX_Expecter) QueryContext(ctx interface{}, query interface{}, args ...interface{}) *MockDBTX_QueryContext_Call {
-	return &MockDBTX_QueryContext_Call{Call: _e.mock.On("QueryContext",
-		append([]interface{}{ctx, query}, args...)...)}
-}
-
-func (_c *MockDBTX_QueryContext_Call) Run(run func(ctx context.Context, query string, args ...interface{})) *MockDBTX_QueryContext_Call {
-	_c.Call.Run(func(args mock.Arguments) {
-		variadicArgs := make([]interface{}, len(args)-2)
-		for i, a := range args[2:] {
-			if a != nil {
-				variadicArgs[i] = a.(interface{})
-			}
-		}
-		run(args[0].(context.Context), args[1].(string), variadicArgs...)
-	})
-	return _c
-}
-
-func (_c *MockDBTX_QueryContext_Call) Return(_a0 *sql.Rows, _a1 error) *MockDBTX_QueryContext_Call {
-	_c.Call.Return(_a0, _a1)
-	return _c
-}
-
-func (_c *MockDBTX_QueryContext_Call) RunAndReturn(run func(context.Context, string, ...interface{}) (*sql.Rows, error)) *MockDBTX_QueryContext_Call {
-	_c.Call.Return(run)
-	return _c
-}
-
-// QueryRowContext provides a mock function with given fields: ctx, query, args
-func (_m *MockDBTX) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
-	var _ca []interface{}
-	_ca = append(_ca, ctx, query)
-	_ca = append(_ca, args...)
-	ret := _m.Called(_ca...)
-
-	var r0 *sql.Row
-	if rf, ok := ret.Get(0).(func(context.Context, string, ...interface{}) *sql.Row); ok {
-		r0 = rf(ctx, query, args...)
-	} else {
-		if ret.Get(0) != nil {
-			r0 = ret.Get(0).(*sql.Row)
-		}
-	}
-
-	return r0
-}
-
-// MockDBTX_QueryRowContext_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryRowContext'
-type MockDBTX_QueryRowContext_Call struct {
-	*mock.Call
-}
-
-// QueryRowContext is a helper method to define mock.On call
-//   - ctx context.Context
-//   - query string
-//   - args ...interface{}
-func (_e *MockDBTX_Expecter) QueryRowContext(ctx interface{}, query interface{}, args ...interface{}) *MockDBTX_QueryRowContext_Call {
-	return &MockDBTX_QueryRowContext_Call{Call: _e.mock.On("QueryRowContext",
-		append([]interface{}{ctx, query}, args...)...)}
-}
-
-func (_c *MockDBTX_QueryRowContext_Call) Run(run func(ctx context.Context, query string, args ...interface{})) *MockDBTX_QueryRowContext_Call {
-	_c.Call.Run(func(args mock.Arguments) {
-		variadicArgs := make([]interface{}, len(args)-2)
-		for i, a := range args[2:] {
-			if a != nil {
-				variadicArgs[i] = a.(interface{})
-			}
-		}
-		run(args[0].(context.Context), args[1].(string), variadicArgs...)
-	})
-	return _c
-}
-
-func (_c *MockDBTX_QueryRowContext_Call) Return(_a0 *sql.Row) *MockDBTX_QueryRowContext_Call {
-	_c.Call.Return(_a0)
-	return _c
-}
-
-func (_c *MockDBTX_QueryRowContext_Call) RunAndReturn(run func(context.Context, string, ...interface{}) *sql.Row) *MockDBTX_QueryRowContext_Call {
-	_c.Call.Return(run)
-	return _c
-}
-
-// NewMockDBTX creates a new instance of MockDBTX. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
-// The first argument is typically a *testing.T value.
-func NewMockDBTX(t interface {
-	mock.TestingT
-	Cleanup(func())
-}) *MockDBTX {
-	mock := &MockDBTX{}
-	mock.Mock.Test(t)
-
-	t.Cleanup(func() { mock.AssertExpectations(t) })
-
-	return mock
-}
diff --git a/worker/internal/dbschemas/mysql/mysql.go b/worker/internal/dbschemas/mysql/mysql.go
index c3bb38afe0..0ccf243f92 100644
--- a/worker/internal/dbschemas/mysql/mysql.go
+++ b/worker/internal/dbschemas/mysql/mysql.go
@@ -2,174 +2,27 @@ package dbschemas_mysql
 
 import (
 	"context"
-	"database/sql"
 	"fmt"
 
+	mysql_queries "github.com/nucleuscloud/neosync/worker/gen/go/db/mysql"
 	neosync_benthos "github.com/nucleuscloud/neosync/worker/internal/benthos"
 )
 
-type DatabaseSchema struct {
-	TableSchema     string  `db:"table_schema"`
-	TableName       string  `db:"table_name"`
-	ColumnName      string  `db:"column_name"`
-	OrdinalPosition int     `db:"ordinal_position"`
-	ColumnDefault   *string `db:"column_default,omitempty"`
-	IsNullable      string  `db:"is_nullable"`
-	DataType        string  `db:"data_type"`
-}
-
-func (d *DatabaseSchema) GetTableKey() string {
-	return fmt.Sprintf("%s.%s", d.TableSchema, d.TableName)
-}
-
-const (
-	getDatabaseSchemaSql = `-- name: GetDatabaseSchema
-SELECT
-	c.table_schema,
-	c.table_name,
-	c.column_name,
-	c.ordinal_position,
-	c.column_default,
-	c.is_nullable,
-	c.data_type
-FROM
-	information_schema.columns AS c
-	JOIN information_schema.tables AS t ON c.table_schema = t.table_schema
-		AND c.table_name = t.table_name
-WHERE
-	c.table_schema NOT IN('sys', 'performance_schema', 'mysql')
-	AND t.table_type = 'BASE TABLE';
-	`
-	getDatabaseTableSchemaSql = `-- name: GetDatabaseTableSchema
-SELECT
-	c.table_schema,
-	c.table_name,
-	c.column_name,
-	c.ordinal_position,
-	c.column_default,
-	c.is_nullable,
-	c.data_type
-FROM
-	information_schema.columns AS c
-	JOIN information_schema.tables AS t ON c.table_schema = t.table_schema
-		AND c.table_name = t.table_name
-WHERE
-	c.table_schema = ? AND t.table_name = ?
-	AND t.table_type = 'BASE TABLE'
-	ORDER BY c.ordinal_position ASC;
-	`
-)
-
-type DBTX interface {
-	QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
-	QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
-}
-
-func GetDatabaseSchemas(
-	ctx context.Context,
-	conn DBTX,
-) ([]*DatabaseSchema, error) {
-	rows, err := conn.QueryContext(ctx, getDatabaseSchemaSql)
-	if err != nil && !isNoRows(err) {
-		return nil, err
-	} else if err != nil && isNoRows(err) {
-		return []*DatabaseSchema{}, nil
-	}
-
-	output := []*DatabaseSchema{}
-	for rows.Next() {
-		var o DatabaseSchema
-		err := rows.Scan(
-			&o.TableSchema,
-			&o.TableName,
-			&o.ColumnName,
-			&o.OrdinalPosition,
-			&o.ColumnDefault,
-			&o.IsNullable,
-			&o.DataType,
-		)
-		if err != nil {
-			return nil, err
-		}
-		output = append(output, &o)
-	}
-	return output, nil
-}
-
-type DatabaseTableConstraint struct {
-	Schema            string `db:"db_schema"`
-	Table             string `db:"table_name"`
-	ConstraintName    string `db:"constraint_name"`
-	ColumnName        string `db:"column_name"`
-	ForeignSchemaName string `db:"foreign_schema_name"`
-	ForeignTableName  string `db:"foreign_table_name"`
-	ForeignColumnName string `db:"foreign_column_name"`
-	UpdateRule        string `db:"update_rule"`
-	DeleteRule        string `db:"delete_rule"`
+type GetTableCreateStatementRequest struct {
+	Schema string
+	Table  string
 }
 
-const (
-	getTableConstraintsSql = `-- name: GetTableConstraints
-	SELECT
-	kcu.constraint_name
-	,
-	kcu.table_schema AS db_schema
-	,
-	kcu.table_name as table_name
-	,
-	kcu.column_name as column_name
-	,
-	kcu.referenced_table_schema AS foreign_schema_name
-	,
-	kcu.referenced_table_name AS foreign_table_name
-	,
-	kcu.referenced_column_name AS foreign_column_name
-	,
-	rc.update_rule
-	,
-	rc.delete_rule
-FROM information_schema.key_column_usage kcu
-LEFT JOIN information_schema.referential_constraints rc
-	ON
-	kcu.constraint_name = rc.constraint_name
-WHERE
-	kcu.table_schema = ? AND kcu.table_name = ?;
-`
-)
-
-func GetTableConstraints(
+func GetTableCreateStatement(
 	ctx context.Context,
-	conn DBTX,
-	schema string,
-	table string,
-) ([]*DatabaseTableConstraint, error) {
-	rows, err := conn.QueryContext(ctx, getTableConstraintsSql, schema, table)
-	if err != nil && !isNoRows(err) {
-		return nil, err
-	} else if err != nil && isNoRows(err) {
-		return []*DatabaseTableConstraint{}, nil
-	}
-
-	output := []*DatabaseTableConstraint{}
-	for rows.Next() {
-		var o DatabaseTableConstraint
-		err := rows.Scan(
-			&o.Schema,
-			&o.Table,
-			&o.ConstraintName,
-			&o.ColumnName,
-			&o.ForeignSchemaName,
-			&o.ForeignTableName,
-			&o.ForeignColumnName,
-			&o.UpdateRule,
-			&o.DeleteRule,
-		)
-		if err != nil {
-			return nil, err
-		}
-		output = append(output, &o)
+	conn mysql_queries.DBTX,
+	req *GetTableCreateStatementRequest,
+) (string, error) {
+	result, err := getShowTableCreate(ctx, conn, req.Schema, req.Table)
+	if err != nil {
+		return "", fmt.Errorf("unable to get table create statement: %w", err)
 	}
-	return output, nil
+	return result.CreateTable, nil
 }
 
 type DatabaseTableShowCreate struct {
@@ -179,7 +32,7 @@ type DatabaseTableShowCreate struct {
 
 func getShowTableCreate(
 	ctx context.Context,
-	conn DBTX,
+	conn mysql_queries.DBTX,
 	schema string,
 	table string,
 ) (*DatabaseTableShowCreate, error) {
@@ -197,104 +50,11 @@ func getShowTableCreate(
 	return &output, nil
 }
 
-func isNoRows(err error) bool {
-	return err != nil && err == sql.ErrNoRows
-}
-
-type GetTableCreateStatementRequest struct {
-	Schema string
-	Table  string
-}
-
-func GetTableCreateStatement(
-	ctx context.Context,
-	conn DBTX,
-	req *GetTableCreateStatementRequest,
-) (string, error) {
-	result, err := getShowTableCreate(ctx, conn, req.Schema, req.Table)
-	if err != nil {
-		return "", fmt.Errorf("unable to get table create statement: %w", err)
-	}
-	return result.CreateTable, nil
-}
-
-const (
-	fkConstraintSql = `
-	SELECT
-	rc.constraint_name
-	,
-	kcu.table_schema AS schema_name
-	,
-	kcu.table_name as table_name
-	,
-	kcu.column_name as column_name
-	,
-	kcu.referenced_table_schema AS foreign_schema_name
-	,
-	kcu.referenced_table_name AS foreign_table_name
-	,
-	kcu.referenced_column_name AS foreign_column_name
-FROM
-	information_schema.referential_constraints rc
-JOIN information_schema.key_column_usage kcu
-	ON
-	kcu.constraint_name = rc.constraint_name
-WHERE
-	kcu.table_schema = ?
-ORDER BY
-	rc.constraint_name,
-	kcu.ordinal_position;
-	`
-)
-
-type ForeignKeyConstraint struct {
-	ConstraintName    string `db:"constraint_name"`
-	SchemaName        string `db:"schema_name"`
-	TableName         string `db:"table_name"`
-	ColumnName        string `db:"column_name"`
-	ForeignSchemaName string `db:"foreign_schema_name"`
-	ForeignTableName  string `db:"foreign_table_name"`
-	ForeignColumnName string `db:"foreign_column_name"`
-}
-
-func GetForeignKeyConstraints(
-	ctx context.Context,
-	conn DBTX,
-	tableSchema string,
-) ([]*ForeignKeyConstraint, error) {
-
-	rows, err := conn.QueryContext(ctx, fkConstraintSql, tableSchema)
-	if err != nil && !isNoRows(err) {
-		return nil, err
-	} else if err != nil && isNoRows(err) {
-		return []*ForeignKeyConstraint{}, nil
-	}
-
-	output := []*ForeignKeyConstraint{}
-	for rows.Next() {
-		var o ForeignKeyConstraint
-		err := rows.Scan(
-			&o.ConstraintName,
-			&o.SchemaName,
-			&o.TableName,
-			&o.ColumnName,
-			&o.ForeignSchemaName,
-			&o.ForeignTableName,
-			&o.ForeignColumnName,
-		)
-		if err != nil {
-			return nil, err
-		}
-		output = append(output, &o)
-	}
-	return output, nil
-}
-
 type TableDependency = map[string][]string
 
 // Key is schema.table value is list of tables that key depends on
 func GetMysqlTableDependencies(
-	constraints []*ForeignKeyConstraint,
+	constraints []*mysql_queries.GetForeignKeyConstraintsRow,
 ) TableDependency {
 	tdmap := map[string][]string{}
 	for _, constraint := range constraints {
@@ -337,7 +97,7 @@ func buildTableKey(
 }
 
 func GetUniqueSchemaColMappings(
-	dbschemas []*DatabaseSchema,
+	dbschemas []*mysql_queries.GetDatabaseSchemaRow,
 ) map[string]map[string]struct{} {
 	groupedSchemas := map[string]map[string]struct{}{} // ex: {public.users: { id: struct{}{}, created_at: struct{}{}}}
 	for _, record := range dbschemas {
diff --git a/worker/internal/dbschemas/mysql/mysql_test.go b/worker/internal/dbschemas/mysql/mysql_test.go
index 1a2b28cc83..db4763bbb1 100644
--- a/worker/internal/dbschemas/mysql/mysql_test.go
+++ b/worker/internal/dbschemas/mysql/mysql_test.go
@@ -3,19 +3,12 @@ package dbschemas_mysql
 import (
 	"testing"
 
+	mysql_queries "github.com/nucleuscloud/neosync/worker/gen/go/db/mysql"
 	"github.com/stretchr/testify/assert"
 )
 
-func Test_DatabaseSchema_GetTableKey(t *testing.T) {
-	schema := &DatabaseSchema{
-		TableSchema: "public",
-		TableName:   "users",
-	}
-	assert.Equal(t, schema.GetTableKey(), "public.users")
-}
-
 func TestGetMysqlTableDependencies(t *testing.T) {
-	constraints := []*ForeignKeyConstraint{
+	constraints := []*mysql_queries.GetForeignKeyConstraintsRow{
 		{ConstraintName: "fk_account_user_associations_account_id", SchemaName: "neosync_api", TableName: "account_user_associations", ColumnName: "account_id", ForeignSchemaName: "neosync_api", ForeignTableName: "accounts", ForeignColumnName: "id"},               //nolint
 		{ConstraintName: "fk_account_user_associations_user_id", SchemaName: "neosync_api", TableName: "account_user_associations", ColumnName: "user_id", ForeignSchemaName: "neosync_api", ForeignTableName: "users", ForeignColumnName: "id"},                        //nolint
 		{ConstraintName: "fk_connections_accounts_id", SchemaName: "neosync_api", TableName: "connections", ColumnName: "account_id", ForeignSchemaName: "neosync_api", ForeignTableName: "accounts", ForeignColumnName: "id"},                                          //nolint
@@ -41,7 +34,7 @@ func TestGetMysqlTableDependencies(t *testing.T) {
 }
 
 func TestGetMysqlTableDependenciesExtraEdgeCases(t *testing.T) {
-	constraints := []*ForeignKeyConstraint{
+	constraints := []*mysql_queries.GetForeignKeyConstraintsRow{
 		{ConstraintName: "t1_b_c_fkey", SchemaName: "neosync_api", TableName: "t1", ColumnName: "b", ForeignSchemaName: "neosync_api", ForeignTableName: "account_user_associations", ForeignColumnName: "account_id"}, //nolint
 		{ConstraintName: "t1_b_c_fkey", SchemaName: "neosync_api", TableName: "t1", ColumnName: "c", ForeignSchemaName: "neosync_api", ForeignTableName: "account_user_associations", ForeignColumnName: "user_id"},    //nolint
 		{ConstraintName: "t2_b_fkey", SchemaName: "neosync_api", TableName: "t2", ColumnName: "b", ForeignSchemaName: "neosync_api", ForeignTableName: "t2", ForeignColumnName: "a"},                                   //nolint
@@ -60,7 +53,7 @@ func TestGetMysqlTableDependenciesExtraEdgeCases(t *testing.T) {
 
 func TestGetUniqueSchemaColMappings(t *testing.T) {
 	mappings := GetUniqueSchemaColMappings(
-		[]*DatabaseSchema{
+		[]*mysql_queries.GetDatabaseSchemaRow{
 			{TableSchema: "public", TableName: "users", ColumnName: "id"},
 			{TableSchema: "public", TableName: "users", ColumnName: "created_by"},
 			{TableSchema: "public", TableName: "users", ColumnName: "updated_by"},
diff --git a/worker/pkg/workflows/datasync/activities/activities.go b/worker/pkg/workflows/datasync/activities/activities.go
index b3d1115240..015df11043 100644
--- a/worker/pkg/workflows/datasync/activities/activities.go
+++ b/worker/pkg/workflows/datasync/activities/activities.go
@@ -21,10 +21,10 @@ import (
 
 	mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
 	"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
+	mysql_queries "github.com/nucleuscloud/neosync/worker/gen/go/db/mysql"
 	pg_queries "github.com/nucleuscloud/neosync/worker/gen/go/db/postgresql"
 	neosync_benthos "github.com/nucleuscloud/neosync/worker/internal/benthos"
 	_ "github.com/nucleuscloud/neosync/worker/internal/benthos/transformers"
-	dbschemas_mysql "github.com/nucleuscloud/neosync/worker/internal/dbschemas/mysql"
 )
 
 const nullString = "null"
@@ -65,7 +65,8 @@ func (a *Activities) GenerateBenthosConfigs(
 
 	pgpoolmap := map[string]pg_queries.DBTX{}
 	pgquerier := pg_queries.New()
-	mysqlPoolMap := map[string]dbschemas_mysql.DBTX{}
+	mysqlpoolmap := map[string]mysql_queries.DBTX{}
+	mysqlquerier := mysql_queries.New()
 
 	jobclient := mgmtv1alpha1connect.NewJobServiceClient(
 		http.DefaultClient,
@@ -79,7 +80,8 @@ func (a *Activities) GenerateBenthosConfigs(
 	bbuilder := newBenthosBuilder(
 		pgpoolmap,
 		pgquerier,
-		mysqlPoolMap,
+		mysqlpoolmap,
+		mysqlquerier,
 		jobclient,
 		connclient,
 	)
diff --git a/worker/pkg/workflows/datasync/activities/benthos-builder.go b/worker/pkg/workflows/datasync/activities/benthos-builder.go
index 2c354239fb..6be64e5ed4 100644
--- a/worker/pkg/workflows/datasync/activities/benthos-builder.go
+++ b/worker/pkg/workflows/datasync/activities/benthos-builder.go
@@ -11,6 +11,7 @@ import (
 	"github.com/jackc/pgx/v5/pgxpool"
 	mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
 	"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
+	mysql_queries "github.com/nucleuscloud/neosync/worker/gen/go/db/mysql"
 	pg_queries "github.com/nucleuscloud/neosync/worker/gen/go/db/postgresql"
 	neosync_benthos "github.com/nucleuscloud/neosync/worker/internal/benthos"
 	dbschemas_mysql "github.com/nucleuscloud/neosync/worker/internal/dbschemas/mysql"
@@ -24,7 +25,8 @@ type benthosBuilder struct {
 	pgpool    map[string]pg_queries.DBTX
 	pgquerier pg_queries.Querier
 
-	mysqlpool map[string]dbschemas_mysql.DBTX
+	mysqlpool    map[string]mysql_queries.DBTX
+	mysqlquerier mysql_queries.Querier
 
 	jobclient  mgmtv1alpha1connect.JobServiceClient
 	connclient mgmtv1alpha1connect.ConnectionServiceClient
@@ -34,17 +36,19 @@ func newBenthosBuilder(
 	pgpool map[string]pg_queries.DBTX,
 	pgquerier pg_queries.Querier,
 
-	mysqlpool map[string]dbschemas_mysql.DBTX,
+	mysqlpool map[string]mysql_queries.DBTX,
+	mysqlquerier mysql_queries.Querier,
 
 	jobclient mgmtv1alpha1connect.JobServiceClient,
 	connclient mgmtv1alpha1connect.ConnectionServiceClient,
 ) *benthosBuilder {
 	return &benthosBuilder{
-		pgpool:     pgpool,
-		pgquerier:  pgquerier,
-		mysqlpool:  mysqlpool,
-		jobclient:  jobclient,
-		connclient: connclient,
+		pgpool:       pgpool,
+		pgquerier:    pgquerier,
+		mysqlpool:    mysqlpool,
+		mysqlquerier: mysqlquerier,
+		jobclient:    jobclient,
+		connclient:   connclient,
 	}
 }
 
@@ -152,7 +156,7 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
 		pool := b.mysqlpool[dsn]
 
 		// validate job mappings align with sql connections
-		dbschemas, err := dbschemas_mysql.GetDatabaseSchemas(ctx, pool)
+		dbschemas, err := b.mysqlquerier.GetDatabaseSchema(ctx, pool)
 		if err != nil {
 			return nil, err
 		}
@@ -406,17 +410,17 @@ func (b *benthosBuilder) getAllPostgresFkConstraintsFromMappings(
 
 func (b *benthosBuilder) getAllMysqlFkConstraintsFromMappings(
 	ctx context.Context,
-	conn dbschemas_mysql.DBTX,
+	conn mysql_queries.DBTX,
 	mappings []*mgmtv1alpha1.JobMapping,
-) ([]*dbschemas_mysql.ForeignKeyConstraint, error) {
+) ([]*mysql_queries.GetForeignKeyConstraintsRow, error) {
 	uniqueSchemas := getUniqueSchemasFromMappings(mappings)
-	holder := make([][]*dbschemas_mysql.ForeignKeyConstraint, len(uniqueSchemas))
+	holder := make([][]*mysql_queries.GetForeignKeyConstraintsRow, len(uniqueSchemas))
 	errgrp, errctx := errgroup.WithContext(ctx)
 	for idx := range uniqueSchemas {
 		idx := idx
 		schema := uniqueSchemas[idx]
 		errgrp.Go(func() error {
-			constraints, err := dbschemas_mysql.GetForeignKeyConstraints(errctx, conn, schema)
+			constraints, err := b.mysqlquerier.GetForeignKeyConstraints(errctx, conn, schema)
 			if err != nil {
 				return err
 			}
@@ -429,7 +433,7 @@ func (b *benthosBuilder) getAllMysqlFkConstraintsFromMappings(
 		return nil, err
 	}
 
-	output := []*dbschemas_mysql.ForeignKeyConstraint{}
+	output := []*mysql_queries.GetForeignKeyConstraintsRow{}
 	for _, schemas := range holder {
 		output = append(output, schemas...)
 	}
@@ -470,12 +474,11 @@ func (b *benthosBuilder) getInitStatementFromPostgres(
 
 func (b *benthosBuilder) getInitStatementFromMysql(
 	ctx context.Context,
-	conn dbschemas_mysql.DBTX,
+	conn mysql_queries.DBTX,
 	schema string,
 	table string,
 	opts *initStatementOpts,
 ) (string, error) {
-
 	statements := []string{}
 	if opts != nil && opts.InitSchema {
 		stmt, err := dbschemas_mysql.GetTableCreateStatement(ctx, conn, &dbschemas_mysql.GetTableCreateStatementRequest{
diff --git a/worker/pkg/workflows/datasync/activities/benthos-builder_test.go b/worker/pkg/workflows/datasync/activities/benthos-builder_test.go
index c4be45e09a..3b7047958a 100644
--- a/worker/pkg/workflows/datasync/activities/benthos-builder_test.go
+++ b/worker/pkg/workflows/datasync/activities/benthos-builder_test.go
@@ -9,8 +9,8 @@ import (
 	"connectrpc.com/connect"
 	mgmtv1alpha1 "github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1"
 	"github.com/nucleuscloud/neosync/backend/gen/go/protos/mgmt/v1alpha1/mgmtv1alpha1connect"
+	mysql_queries "github.com/nucleuscloud/neosync/worker/gen/go/db/mysql"
 	pg_queries "github.com/nucleuscloud/neosync/worker/gen/go/db/postgresql"
-	dbschemas_mysql "github.com/nucleuscloud/neosync/worker/internal/dbschemas/mysql"
 	"github.com/stretchr/testify/assert"
 	"github.com/stretchr/testify/mock"
 	"go.temporal.io/sdk/log"
@@ -26,7 +26,8 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_Basic_Pg_Pg(t *testing.T) {
 		"fake-stage-url": pg_queries.NewMockDBTX(t),
 	}
 	pgquerier := pg_queries.NewMockQuerier(t)
-	mysqlcache := map[string]dbschemas_mysql.DBTX{}
+	mysqlcache := map[string]mysql_queries.DBTX{}
+	mysqlquerier := mysql_queries.NewMockQuerier(t)
 
 	mockJobClient.On("GetJob", mock.Anything, mock.Anything).
 		Return(connect.NewResponse(&mgmtv1alpha1.GetJobResponse{
@@ -122,24 +123,7 @@ func Test_BenthosBuilder_GenerateBenthosConfigs_Basic_Pg_Pg(t *testing.T) {
 		}, nil)
 	pgquerier.On("GetForeignKeyConstraints", mock.Anything, mock.Anything, mock.Anything).
 		Return([]*pg_queries.GetForeignKeyConstraintsRow{}, nil)
-	// pgquerier.On("GetDatabaseTableSchema", mock.Anything, mock.Anything, mock.Anything).
-	// 	Return([]*pg_queries.GetDatabaseTableSchemaRow{
-	// 		{
-	// 			TableSchema: "public",
-	// 			TableName:   "users",
-	// 			ColumnName:  "id",
-	// 		},
-	// 		{
-	// 			TableSchema: "public",
-	// 			TableName:   "users",
-	// 			ColumnName:  "name",
-	// 		},
-	// 	}, nil)
-	// pgquerier.On("GetTableConstraints", mock.Anything, mock.Anything, mock.Anything).
-	// 	Return([]*pg_queries.GetTableConstraintsRow{}, nil).
-	// 	Return([]*pg_queries.GetTableConstraintsRow{}, nil)
-
-	bbuilder := newBenthosBuilder(pgcache, pgquerier, mysqlcache, mockJobClient, mockConnectionClient)
+	bbuilder := newBenthosBuilder(pgcache, pgquerier, mysqlcache, mysqlquerier, mockJobClient, mockConnectionClient)
 
 	resp, err := bbuilder.GenerateBenthosConfigs(
 		context.Background(),