Skip to content

Commit

Permalink
YQ-2686 split sql_formatter interfase
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Amelin committed Dec 15, 2023
1 parent 8c49a0e commit 198d6ae
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 165 deletions.
12 changes: 12 additions & 0 deletions app/server/datasource/rdbms/clickhouse/get_query_and_args.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package clickhouse

import (
api_service_protos "github.com/ydb-platform/fq-connector-go/libgo/service/protos"
)

func GetQueryAndArgs(request *api_service_protos.TDescribeTableRequest) (string, []any) {
query := "SELECT name, type FROM system.columns WHERE table = ? and database = ?"
args := []any{request.Table, request.DataSourceInstance.Database}

return query, args
}
7 changes: 0 additions & 7 deletions app/server/datasource/rdbms/clickhouse/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,6 @@ func (f sqlFormatter) SupportsPushdownExpression(expression *api_service_protos.
}
}

func (f sqlFormatter) GetDescribeTableQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) {
query := "SELECT name, type FROM system.columns WHERE table = ? and database = ?"
args := []any{request.Table, request.DataSourceInstance.Database}

return query, args
}

func (f sqlFormatter) GetPlaceholder(_ int) string {
return "?"
}
Expand Down
11 changes: 0 additions & 11 deletions app/server/datasource/rdbms/clickhouse/sql_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,12 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/ydb-platform/fq-connector-go/api/common"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
api "github.com/ydb-platform/fq-connector-go/libgo/service/protos"
ydb "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
)

func TestMakeDescribeTableQuery(t *testing.T) {
logger := utils.NewTestLogger(t)
formatter := NewSQLFormatter()
request := &api.TDescribeTableRequest{Table: "table", DataSourceInstance: &common.TDataSourceInstance{Database: "db"}}

output, args := rdbms_utils.MakeDescribeTableQuery(logger, formatter, request)
require.Equal(t, "SELECT name, type FROM system.columns WHERE table = ? and database = ?", output)
require.Equal(t, args, []any{"table", "db"})
}

func TestMakeSQLFormatterQuery(t *testing.T) {
type testCase struct {
testName string
Expand Down
38 changes: 5 additions & 33 deletions app/server/datasource/rdbms/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Preset struct {
SQLFormatter rdbms_utils.SQLFormatter
ConnectionManager rdbms_utils.ConnectionManager
TypeMapper utils.TypeMapper
SchemaProvider rdbms_utils.SchemaProvider
}

var _ datasource.DataSource[any] = (*dataSourceImpl)(nil)
Expand All @@ -24,6 +25,7 @@ type dataSourceImpl struct {
typeMapper utils.TypeMapper
sqlFormatter rdbms_utils.SQLFormatter
connectionManager rdbms_utils.ConnectionManager
schemaProvider rdbms_utils.SchemaProvider
logger log.Logger
}

Expand All @@ -32,46 +34,15 @@ func (ds *dataSourceImpl) DescribeTable(
logger log.Logger,
request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TDescribeTableResponse, error) {
query, args := rdbms_utils.MakeDescribeTableQuery(logger, ds.sqlFormatter, request)

conn, err := ds.connectionManager.Make(ctx, logger, request.DataSourceInstance)
if err != nil {
return nil, fmt.Errorf("make connection: %w", err)
}

defer ds.connectionManager.Release(logger, conn)

rows, err := conn.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("query builder error: %w", err)
}

defer func() { utils.LogCloserError(logger, rows, "close rows") }()

var (
columnName string
typeName string
)

sb := &schemaBuilder{typeMapper: ds.typeMapper, typeMappingSettings: request.TypeMappingSettings}

for rows.Next() {
if err := rows.Scan(&columnName, &typeName); err != nil {
return nil, fmt.Errorf("rows scan: %w", err)
}

if err := sb.addColumn(columnName, typeName); err != nil {
return nil, fmt.Errorf("add column to schema builder: %w", err)
}
}

if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration: %w", err)
}

schema, err := sb.build(logger)
schema, err := ds.schemaProvider.GetSchema(ctx, logger, conn, request)
if err != nil {
return nil, fmt.Errorf("build schema: %w", err)
return nil, fmt.Errorf("get schema: %w", err)
}

return &api_service_protos.TDescribeTableResponse{Schema: schema}, nil
Expand Down Expand Up @@ -154,5 +125,6 @@ func NewDataSource(
sqlFormatter: preset.SQLFormatter,
connectionManager: preset.ConnectionManager,
typeMapper: preset.TypeMapper,
schemaProvider: preset.SchemaProvider,
}
}
9 changes: 7 additions & 2 deletions app/server/datasource/rdbms/data_source_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,21 @@ func NewDataSourceFactory(qlf utils.QueryLoggerFactory) datasource.DataSourceFac
QueryLoggerFactory: qlf,
}

postgresqlTypeMapper := postgresql.NewTypeMapper()
clickhouseTypeMapper := clickhouse.NewTypeMapper()

return &dataSourceFactory{
clickhouse: Preset{
SQLFormatter: clickhouse.NewSQLFormatter(),
ConnectionManager: clickhouse.NewConnectionManager(connManagerCfg),
TypeMapper: clickhouse.NewTypeMapper(),
TypeMapper: clickhouseTypeMapper,
SchemaProvider: rdbms_utils.NewDefaultSchemaProvider(clickhouseTypeMapper, clickhouse.GetQueryAndArgs),
},
postgresql: Preset{
SQLFormatter: postgresql.NewSQLFormatter(),
ConnectionManager: postgresql.NewConnectionManager(connManagerCfg),
TypeMapper: postgresql.NewTypeMapper(),
TypeMapper: postgresqlTypeMapper,
SchemaProvider: rdbms_utils.NewDefaultSchemaProvider(postgresqlTypeMapper, postgresql.GetQueryAndArgs),
},
}
}
13 changes: 13 additions & 0 deletions app/server/datasource/rdbms/postgresql/get_query_and_args.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package postgresql

import (
api_service_protos "github.com/ydb-platform/fq-connector-go/libgo/service/protos"
)

func GetQueryAndArgs(request *api_service_protos.TDescribeTableRequest) (string, []any) {
opts := request.GetDataSourceInstance().GetPgOptions().GetSchema()
query := "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1 AND table_schema = $2"
args := []any{request.Table, opts}

return query, args
}
8 changes: 0 additions & 8 deletions app/server/datasource/rdbms/postgresql/sql_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@ func (f sqlFormatter) SupportsPushdownExpression(expression *api_service_protos.
}
}

func (f sqlFormatter) GetDescribeTableQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) {
schema := request.GetDataSourceInstance().GetPgOptions().GetSchema()
query := "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1 AND table_schema = $2"
args := []any{request.Table, schema}

return query, args
}

func (f sqlFormatter) GetPlaceholder(n int) string {
return fmt.Sprintf("$%d", n+1)
}
Expand Down
12 changes: 0 additions & 12 deletions app/server/datasource/rdbms/postgresql/sql_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,12 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/ydb-platform/fq-connector-go/api/common"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
api "github.com/ydb-platform/fq-connector-go/libgo/service/protos"
ydb "github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
)

func TestMakeDescribeTableQuery(t *testing.T) {
logger := utils.NewTestLogger(t)
formatter := NewSQLFormatter()
dsi := &common.TDataSourceInstance{Options: &common.TDataSourceInstance_PgOptions{PgOptions: &common.TPostgreSQLDataSourceOptions{Schema: "schema"}}}
request := &api.TDescribeTableRequest{Table: "table", DataSourceInstance: dsi}

output, args := rdbms_utils.MakeDescribeTableQuery(logger, formatter, request)
require.Equal(t, "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1 AND table_schema = $2", output)
require.Equal(t, args, []any{"table", "schema"})
}

func TestMakeReadSplitQuery(t *testing.T) {
type testCase struct {
testName string
Expand Down
Loading

0 comments on commit 198d6ae

Please sign in to comment.