diff --git a/app/server/datasource/rdbms/clickhouse/get_query_and_args.go b/app/server/datasource/rdbms/clickhouse/get_query_and_args.go new file mode 100644 index 00000000..ab854cc0 --- /dev/null +++ b/app/server/datasource/rdbms/clickhouse/get_query_and_args.go @@ -0,0 +1,12 @@ +package clickhouse + +import ( + api_service_protos "github.com/ydb-platform/fq-connector-go/libgo/service/protos" +) + +func GetQueryAndArgs(request *api_service_protos.TDescribeTableRequest) (string, []any) { + query := "SELECT name, type FROM system.columns WHERE table = ? and database = ?" + args := []any{request.Table, request.DataSourceInstance.Database} + + return query, args +} \ No newline at end of file diff --git a/app/server/datasource/rdbms/clickhouse/sql_formatter.go b/app/server/datasource/rdbms/clickhouse/sql_formatter.go index d57882aa..4e1439db 100644 --- a/app/server/datasource/rdbms/clickhouse/sql_formatter.go +++ b/app/server/datasource/rdbms/clickhouse/sql_formatter.go @@ -68,13 +68,6 @@ func (f sqlFormatter) SupportsPushdownExpression(expression *api_service_protos. } } -func (f sqlFormatter) GetDescribeTableQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) { - query := "SELECT name, type FROM system.columns WHERE table = ? and database = ?" - args := []any{request.Table, request.DataSourceInstance.Database} - - return query, args -} - func (f sqlFormatter) GetPlaceholder(_ int) string { return "?" } diff --git a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go index 1b7cc690..928eb1ba 100644 --- a/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go +++ b/app/server/datasource/rdbms/clickhouse/sql_formatter_test.go @@ -5,23 +5,12 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/ydb-platform/fq-connector-go/api/common" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/utils" api "github.com/ydb-platform/fq-connector-go/libgo/service/protos" ydb "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" ) -func TestMakeDescribeTableQuery(t *testing.T) { - logger := utils.NewTestLogger(t) - formatter := NewSQLFormatter() - request := &api.TDescribeTableRequest{Table: "table", DataSourceInstance: &common.TDataSourceInstance{Database: "db"}} - - output, args := rdbms_utils.MakeDescribeTableQuery(logger, formatter, request) - require.Equal(t, "SELECT name, type FROM system.columns WHERE table = ? and database = ?", output) - require.Equal(t, args, []any{"table", "db"}) -} - func TestMakeSQLFormatterQuery(t *testing.T) { type testCase struct { testName string diff --git a/app/server/datasource/rdbms/data_source.go b/app/server/datasource/rdbms/data_source.go index dee86871..b1b43d83 100644 --- a/app/server/datasource/rdbms/data_source.go +++ b/app/server/datasource/rdbms/data_source.go @@ -16,6 +16,7 @@ type Preset struct { SQLFormatter rdbms_utils.SQLFormatter ConnectionManager rdbms_utils.ConnectionManager TypeMapper utils.TypeMapper + SchemaProvider rdbms_utils.SchemaProvider } var _ datasource.DataSource[any] = (*dataSourceImpl)(nil) @@ -24,6 +25,7 @@ type dataSourceImpl struct { typeMapper utils.TypeMapper sqlFormatter rdbms_utils.SQLFormatter connectionManager rdbms_utils.ConnectionManager + schemaProvider rdbms_utils.SchemaProvider logger log.Logger } @@ -32,46 +34,15 @@ func (ds *dataSourceImpl) DescribeTable( logger log.Logger, request *api_service_protos.TDescribeTableRequest, ) (*api_service_protos.TDescribeTableResponse, error) { - query, args := rdbms_utils.MakeDescribeTableQuery(logger, ds.sqlFormatter, request) - conn, err := ds.connectionManager.Make(ctx, logger, request.DataSourceInstance) if err != nil { return nil, fmt.Errorf("make connection: %w", err) } defer ds.connectionManager.Release(logger, conn) - - rows, err := conn.Query(ctx, query, args...) - if err != nil { - return nil, fmt.Errorf("query builder error: %w", err) - } - - defer func() { utils.LogCloserError(logger, rows, "close rows") }() - - var ( - columnName string - typeName string - ) - - sb := &schemaBuilder{typeMapper: ds.typeMapper, typeMappingSettings: request.TypeMappingSettings} - - for rows.Next() { - if err := rows.Scan(&columnName, &typeName); err != nil { - return nil, fmt.Errorf("rows scan: %w", err) - } - - if err := sb.addColumn(columnName, typeName); err != nil { - return nil, fmt.Errorf("add column to schema builder: %w", err) - } - } - - if err := rows.Err(); err != nil { - return nil, fmt.Errorf("rows iteration: %w", err) - } - - schema, err := sb.build(logger) + schema, err := ds.schemaProvider.GetSchema(ctx, logger, conn, request) if err != nil { - return nil, fmt.Errorf("build schema: %w", err) + return nil, fmt.Errorf("get schema: %w", err) } return &api_service_protos.TDescribeTableResponse{Schema: schema}, nil @@ -154,5 +125,7 @@ func NewDataSource( sqlFormatter: preset.SQLFormatter, connectionManager: preset.ConnectionManager, typeMapper: preset.TypeMapper, + schemaProvider: preset.SchemaProvider, } } + diff --git a/app/server/datasource/rdbms/data_source_factory.go b/app/server/datasource/rdbms/data_source_factory.go index 61099bb1..243f0c8f 100644 --- a/app/server/datasource/rdbms/data_source_factory.go +++ b/app/server/datasource/rdbms/data_source_factory.go @@ -38,16 +38,21 @@ func NewDataSourceFactory(qlf utils.QueryLoggerFactory) datasource.DataSourceFac QueryLoggerFactory: qlf, } + postgresqlTypeMapper := postgresql.NewTypeMapper() + clickhouseTypeMapper := clickhouse.NewTypeMapper() + return &dataSourceFactory{ clickhouse: Preset{ SQLFormatter: clickhouse.NewSQLFormatter(), ConnectionManager: clickhouse.NewConnectionManager(connManagerCfg), - TypeMapper: clickhouse.NewTypeMapper(), + TypeMapper: clickhouseTypeMapper, + SchemaProvider: rdbms_utils.NewDefaultSchemaProvider(clickhouseTypeMapper, clickhouse.GetQueryAndArgs), }, postgresql: Preset{ SQLFormatter: postgresql.NewSQLFormatter(), ConnectionManager: postgresql.NewConnectionManager(connManagerCfg), - TypeMapper: postgresql.NewTypeMapper(), + TypeMapper: postgresqlTypeMapper, + SchemaProvider: rdbms_utils.NewDefaultSchemaProvider(postgresqlTypeMapper, postgresql.GetQueryAndArgs), }, } } diff --git a/app/server/datasource/rdbms/postgresql/get_query_and_args.go b/app/server/datasource/rdbms/postgresql/get_query_and_args.go new file mode 100644 index 00000000..76dcb0d0 --- /dev/null +++ b/app/server/datasource/rdbms/postgresql/get_query_and_args.go @@ -0,0 +1,13 @@ +package postgresql + +import ( + api_service_protos "github.com/ydb-platform/fq-connector-go/libgo/service/protos" +) + +func GetQueryAndArgs(request *api_service_protos.TDescribeTableRequest) (string, []any) { + opts := request.GetDataSourceInstance().GetPgOptions().GetSchema() + query := "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1 AND table_schema = $2" + args := []any{request.Table, opts} + + return query, args +} \ No newline at end of file diff --git a/app/server/datasource/rdbms/postgresql/sql_formatter.go b/app/server/datasource/rdbms/postgresql/sql_formatter.go index 4b4a8534..9a877c3b 100644 --- a/app/server/datasource/rdbms/postgresql/sql_formatter.go +++ b/app/server/datasource/rdbms/postgresql/sql_formatter.go @@ -69,14 +69,6 @@ func (f sqlFormatter) SupportsPushdownExpression(expression *api_service_protos. } } -func (f sqlFormatter) GetDescribeTableQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) { - schema := request.GetDataSourceInstance().GetPgOptions().GetSchema() - query := "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1 AND table_schema = $2" - args := []any{request.Table, schema} - - return query, args -} - func (f sqlFormatter) GetPlaceholder(n int) string { return fmt.Sprintf("$%d", n+1) } diff --git a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go index 8ad2c147..c6fc1499 100644 --- a/app/server/datasource/rdbms/postgresql/sql_formatter_test.go +++ b/app/server/datasource/rdbms/postgresql/sql_formatter_test.go @@ -5,24 +5,12 @@ import ( "testing" "github.com/stretchr/testify/require" - "github.com/ydb-platform/fq-connector-go/api/common" rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/utils" api "github.com/ydb-platform/fq-connector-go/libgo/service/protos" ydb "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" ) -func TestMakeDescribeTableQuery(t *testing.T) { - logger := utils.NewTestLogger(t) - formatter := NewSQLFormatter() - dsi := &common.TDataSourceInstance{Options: &common.TDataSourceInstance_PgOptions{PgOptions: &common.TPostgreSQLDataSourceOptions{Schema: "schema"}}} - request := &api.TDescribeTableRequest{Table: "table", DataSourceInstance: dsi} - - output, args := rdbms_utils.MakeDescribeTableQuery(logger, formatter, request) - require.Equal(t, "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1 AND table_schema = $2", output) - require.Equal(t, args, []any{"table", "schema"}) -} - func TestMakeReadSplitQuery(t *testing.T) { type testCase struct { testName string diff --git a/app/server/datasource/rdbms/schema_builder_test.go b/app/server/datasource/rdbms/schema_builder_test.go index 30166f1d..f3a82185 100644 --- a/app/server/datasource/rdbms/schema_builder_test.go +++ b/app/server/datasource/rdbms/schema_builder_test.go @@ -1,98 +1,118 @@ package rdbms import ( + "fmt" "testing" "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" + "google.golang.org/protobuf/proto" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/clickhouse" "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/postgresql" + rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils" "github.com/ydb-platform/fq-connector-go/app/server/utils" - "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" - "google.golang.org/protobuf/proto" + api_service_protos "github.com/ydb-platform/fq-connector-go/libgo/service/protos" ) func TestSchemaBuilder(t *testing.T) { - t.Run("ClickHouse", func(t *testing.T) { - sb := &schemaBuilder{ - typeMapper: clickhouse.NewTypeMapper(), - } - - require.NoError(t, sb.addColumn("col1", "Int32")) // supported - require.NoError(t, sb.addColumn("col2", "String")) // supported - require.NoError(t, sb.addColumn("col3", "UUID")) // yet unsupported - - logger := utils.NewTestLogger(t) - schema, err := sb.build(logger) - require.NoError(t, err) - require.NotNil(t, schema) - - require.Len(t, schema.Columns, 2) - - require.Equal(t, schema.Columns[0].Name, "col1") - require.True( - t, - proto.Equal(schema.Columns[0].Type, &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT32}}), - schema.Columns[0].Type) - - require.Equal(t, schema.Columns[1].Name, "col2") - require.True( - t, - proto.Equal(schema.Columns[1].Type, &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_STRING}}), - schema.Columns[1].Type) - }) - - t.Run("PostgreSQL", func(t *testing.T) { - sb := &schemaBuilder{ + type nameToType struct { + name string + ydbType *Ydb.Type + } + + type testCase struct { + name string + typeMapper utils.TypeMapper + supportedTypesMatch []nameToType + unsupportedTypes []nameToType + } + + testCases := []testCase{ + { + name: "PostgreSQL", typeMapper: postgresql.NewTypeMapper(), - } - - require.NoError(t, sb.addColumn("col1", "bigint")) // supported - require.NoError(t, sb.addColumn("col2", "text")) // supported - require.NoError(t, sb.addColumn("col3", "time")) // yet unsupported - - logger := utils.NewTestLogger(t) - schema, err := sb.build(logger) - require.NoError(t, err) - require.NotNil(t, schema) - - require.Len(t, schema.Columns, 2) - - require.Equal(t, schema.Columns[0].Name, "col1") - require.True( - t, - proto.Equal( - schema.Columns[0].Type, - &Ydb.Type{Type: &Ydb.Type_OptionalType{OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT64}}}}}, - ), - schema.Columns[0].Type) - - require.Equal(t, schema.Columns[1].Name, "col2") - require.True( - t, - proto.Equal( - schema.Columns[1].Type, - &Ydb.Type{Type: &Ydb.Type_OptionalType{OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UTF8}}}}}, - ), - schema.Columns[1].Type) - }) + supportedTypesMatch: []nameToType{ + {"bigint", &Ydb.Type{Type: &Ydb.Type_OptionalType{OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT64}}}}}}, + {"text", &Ydb.Type{Type: &Ydb.Type_OptionalType{OptionalType: &Ydb.OptionalType{Item: &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UTF8}}}}}}, + }, + unsupportedTypes: []nameToType{ + {"time", nil}, // yet unsupported + }, + }, + { + name: "ClickHouse", + typeMapper: clickhouse.NewTypeMapper(), + supportedTypesMatch: []nameToType{ + {"Int32", &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_INT32}}}, + {"String", &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_STRING}}}, + }, + unsupportedTypes: []nameToType{ + {"UUID", nil}, // yet unsupported + }, + }, + } + + for _, tc := range testCases { + t.Run(fmt.Sprintf("Positive_%s", tc.name), func(t *testing.T) { + tc := tc + sb := rdbms_utils.NewSchemaBuilder(tc.typeMapper, &api_service_protos.TTypeMappingSettings{}) + + for num, supportedType := range tc.supportedTypesMatch { + require.NoError( + t, + sb.AddColumn(fmt.Sprintf("suppTypeCol%d", num), + supportedType.name)) // supported + } + + for num, unsuppType := range tc.unsupportedTypes { + require.NoError( + t, + sb.AddColumn(fmt.Sprintf("unsuppTypeCol%d", num), + unsuppType.name)) // yet unsupported + } + + logger := utils.NewTestLogger(t) + schema, err := sb.Build(logger) + require.NoError(t, err) + require.NotNil(t, schema) + + require.Len(t, schema.Columns, len(tc.supportedTypesMatch)) + + for num, supportedType := range tc.supportedTypesMatch { + require.Equal(t, schema.Columns[num].Name, fmt.Sprintf("suppTypeCol%d", num)) + require.True( + t, + proto.Equal( + schema.Columns[num].Type, + supportedType.ydbType, + ), + schema.Columns[num].Type) + } + }) + + t.Run(fmt.Sprintf("EmptyTable_%s", tc.name), func(t *testing.T) { + tc := tc + sb := rdbms_utils.NewSchemaBuilder(tc.typeMapper, &api_service_protos.TTypeMappingSettings{}) + + for num, unsuppType := range tc.unsupportedTypes { + require.NoError( + t, + sb.AddColumn( + fmt.Sprintf("unsuppTypeCol%d", num), + unsuppType.name)) // yet unsupported + } + + schema, err := sb.Build(utils.NewTestLogger(t)) + require.NoError(t, err) + require.NotNil(t, schema) + require.Len(t, schema.Columns, 0) + }) + } t.Run("NonExistingTable", func(t *testing.T) { - sb := &schemaBuilder{} - schema, err := sb.build(utils.NewTestLogger(t)) + sb := &rdbms_utils.SchemaBuilder{} + schema, err := sb.Build(utils.NewTestLogger(t)) require.ErrorIs(t, err, utils.ErrTableDoesNotExist) require.Nil(t, schema) }) - - t.Run("EmptyTable", func(t *testing.T) { - sb := &schemaBuilder{ - typeMapper: clickhouse.NewTypeMapper(), - } - - require.NoError(t, sb.addColumn("col1", "UUID")) // yet unsupported - - schema, err := sb.build(utils.NewTestLogger(t)) - require.NoError(t, err) - require.NotNil(t, schema) - require.Len(t, schema.Columns, 0) - }) -} +} \ No newline at end of file diff --git a/app/server/datasource/rdbms/utils/default_schema_provider.go b/app/server/datasource/rdbms/utils/default_schema_provider.go new file mode 100644 index 00000000..8307af8e --- /dev/null +++ b/app/server/datasource/rdbms/utils/default_schema_provider.go @@ -0,0 +1,71 @@ +package utils + +import ( + "context" + "fmt" + + my_log "github.com/ydb-platform/fq-connector-go/library/go/core/log" + "github.com/ydb-platform/fq-connector-go/app/server/utils" + api_service_protos "github.com/ydb-platform/fq-connector-go/libgo/service/protos" +) + +type DefaultSchemaProvider struct { + typeMapper utils.TypeMapper + getArgsAndQuery func(request *api_service_protos.TDescribeTableRequest) (string, []any) +} + +var _ SchemaProvider = (*DefaultSchemaProvider)(nil) + +func (f *DefaultSchemaProvider) GetSchema( + ctx context.Context, + logger my_log.Logger, + conn Connection, + request *api_service_protos.TDescribeTableRequest, +) (*api_service_protos.TSchema, error) { + query, args := f.getArgsAndQuery(request) + + rows, err := conn.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query builder error: %w", err) + } + + defer func() { utils.LogCloserError(logger, rows, "close rows") }() + + var ( + columnName string + typeName string + ) + + sb := NewSchemaBuilder(f.typeMapper, request.TypeMappingSettings) + + for rows.Next() { + if err := rows.Scan(&columnName, &typeName); err != nil { + return nil, fmt.Errorf("rows scan: %w", err) + } + + if err := sb.AddColumn(columnName, typeName); err != nil { + return nil, fmt.Errorf("add column to schema builder: %w", err) + } + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("rows iteration: %w", err) + } + + schema, err := sb.Build(logger) + if err != nil { + return nil, fmt.Errorf("build schema: %w", err) + } + + return schema, nil +} + +func NewDefaultSchemaProvider( + typeMapper utils.TypeMapper, + getArgsAndQueryFunc func(request *api_service_protos.TDescribeTableRequest) (string, []any), +) SchemaProvider { + return &DefaultSchemaProvider{ + typeMapper: typeMapper, + getArgsAndQuery: getArgsAndQueryFunc, + } +} \ No newline at end of file diff --git a/app/server/datasource/rdbms/utils/query_builder.go b/app/server/datasource/rdbms/utils/query_builder.go index 7b740c67..714e568e 100644 --- a/app/server/datasource/rdbms/utils/query_builder.go +++ b/app/server/datasource/rdbms/utils/query_builder.go @@ -8,12 +8,6 @@ import ( "github.com/ydb-platform/fq-connector-go/library/go/core/log" ) -func MakeDescribeTableQuery(logger log.Logger, formatter SQLFormatter, request *api_service_protos.TDescribeTableRequest) (string, []any) { - query, args := formatter.GetDescribeTableQuery(request) - - return query, args -} - func MakeReadSplitQuery(logger log.Logger, formatter SQLFormatter, request *api_service_protos.TSelect) (string, []any, error) { var ( sb strings.Builder diff --git a/app/server/datasource/rdbms/schema_builder.go b/app/server/datasource/rdbms/utils/schema_builder.go similarity index 78% rename from app/server/datasource/rdbms/schema_builder.go rename to app/server/datasource/rdbms/utils/schema_builder.go index 42cb2b46..0e746a00 100644 --- a/app/server/datasource/rdbms/schema_builder.go +++ b/app/server/datasource/rdbms/utils/schema_builder.go @@ -1,4 +1,4 @@ -package rdbms +package utils import ( "errors" @@ -16,13 +16,13 @@ type schemaItem struct { ydbColumn *Ydb.Column } -type schemaBuilder struct { +type SchemaBuilder struct { typeMapper utils.TypeMapper typeMappingSettings *api_service_protos.TTypeMappingSettings items []*schemaItem } -func (sb *schemaBuilder) addColumn(columnName, columnType string) error { +func (sb *SchemaBuilder) AddColumn(columnName, columnType string) error { item := &schemaItem{ columnName: columnName, columnType: columnType, @@ -40,7 +40,7 @@ func (sb *schemaBuilder) addColumn(columnName, columnType string) error { return nil } -func (sb *schemaBuilder) build(logger log.Logger) (*api_service_protos.TSchema, error) { +func (sb *SchemaBuilder) Build(logger log.Logger) (*api_service_protos.TSchema, error) { if len(sb.items) == 0 { return nil, utils.ErrTableDoesNotExist } @@ -67,3 +67,13 @@ func (sb *schemaBuilder) build(logger log.Logger) (*api_service_protos.TSchema, return &schema, nil } + +func NewSchemaBuilder( + typeMapper utils.TypeMapper, + typeMappingSettings *api_service_protos.TTypeMappingSettings, +) *SchemaBuilder { + return &SchemaBuilder{ + typeMapper: typeMapper, + typeMappingSettings: typeMappingSettings, + } +} \ No newline at end of file diff --git a/app/server/datasource/rdbms/utils/sql.go b/app/server/datasource/rdbms/utils/sql.go index 6bb208df..a530d73e 100644 --- a/app/server/datasource/rdbms/utils/sql.go +++ b/app/server/datasource/rdbms/utils/sql.go @@ -33,8 +33,6 @@ type ConnectionManagerBase struct { } type SQLFormatter interface { - GetDescribeTableQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) - // Get placeholder for n'th argument (starting from 0) for prepared statement GetPlaceholder(n int) string @@ -44,3 +42,12 @@ type SQLFormatter interface { // Support for high level expression (without subexpressions, they are checked separately) SupportsPushdownExpression(expression *api_service_protos.TExpression) bool } + +type SchemaProvider interface { + GetSchema( + ctx context.Context, + logger log.Logger, + conn Connection, + request *api_service_protos.TDescribeTableRequest, + ) (*api_service_protos.TSchema, error) +} \ No newline at end of file