Skip to content

Commit

Permalink
YDB: implement connection via Query Service (#202)
Browse files Browse the repository at this point in the history
* YDB: first steps with Query Service

* YDB Connector mode in config proto

* Parametrize YQL connector mode

* Reached first reading

* one more appender

* Supporting new way of making acceptors

* Supporting new way of making acceptors pt. 2

* Basic tests passed with new connector

* Debugging query logic

* Linter complainings

* Pushdown works basically

* Troubles with database_missing test pt. 3

* Change signature of connection.Query

* Not operational

* Use QueryArgs everywhere

* Most of tests are passed with QueryService

* Linter complainings

* Unit test fix

* Linter fix

* Linter fix

* Review fix
  • Loading branch information
vitalyisaev2 authored Oct 14, 2024
1 parent ec3f6b7 commit f76af43
Show file tree
Hide file tree
Showing 44 changed files with 1,042 additions and 391 deletions.
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,9 @@ linters:
- misspell
- nakedret
- noctx
- nolintlint
# - nolintlint
- revive
# - staticcheck
# - staticcheck
- stylecheck
- typecheck
- unconvert
Expand Down
4 changes: 4 additions & 0 deletions app/client/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ func readSplits(
return fmt.Errorf("read splits: %w", err)
}

if err = common.ExtractErrorFromReadResponses(readSplitsResponses); err != nil {
return fmt.Errorf("extract error from read responses: %w", err)
}

logger.Debug("Obtained read splits responses", zap.Int("count", len(readSplitsResponses)))

records, err := common.ReadResponsesToArrowRecords(readSplitsResponses)
Expand Down
277 changes: 177 additions & 100 deletions app/config/server.pb.go

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions app/config/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,21 @@ message TYdbConfig {
// Flag forcing the usage of underlay networks for dedicated YDB databases
bool use_underlay_network_for_dedicated_databases = 3;

enum Mode {
MODE_UNSPECIFIED = 0;
// In MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES the YDB connector uses YDB's Table Service
// via Go's standard library database/sql interface.
// All the requests are marked as scan queries.
MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES = 1;
// In MODE_QUERY_SERVICE_NATIVE the YDB connector uses YDB's Query Service
// via native YDB interface.
MODE_QUERY_SERVICE_NATIVE = 2;
}

// Mode parametrizes the way YDB connector interacts with YDB servers.
// MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES is the default mode.
Mode mode = 4;

TExponentialBackoffConfig exponential_backoff = 10;
}

Expand Down
1 change: 1 addition & 0 deletions app/server/config/config.debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,4 @@ datasources:
ydb:
<<: *data_source_default_var
use_underlay_network_for_dedicated_databases: false
mode: MODE_QUERY_SERVICE_NATIVE
23 changes: 18 additions & 5 deletions app/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func fillServerConfigDefaults(c *config.TServerConfig) {
c.Datasources.Oracle.ExponentialBackoff = makeDefaultExponentialBackoffConfig()
}

// Postgresql
// PostgreSQL

if c.Datasources.Postgresql == nil {
c.Datasources.Postgresql = &config.TPostgreSQLConfig{
Expand All @@ -136,10 +136,19 @@ func fillServerConfigDefaults(c *config.TServerConfig) {
// YDB

if c.Datasources.Ydb == nil {
c.Datasources.Ydb = &config.TYdbConfig{
OpenConnectionTimeout: "5s",
PingConnectionTimeout: "5s",
}
c.Datasources.Ydb = &config.TYdbConfig{}
}

if c.Datasources.Ydb.OpenConnectionTimeout == "" {
c.Datasources.Ydb.OpenConnectionTimeout = "5s"
}

if c.Datasources.Ydb.PingConnectionTimeout == "" {
c.Datasources.Ydb.PingConnectionTimeout = "5s"
}

if c.Datasources.Ydb.Mode == config.TYdbConfig_MODE_UNSPECIFIED {
c.Datasources.Ydb.Mode = config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES
}

if c.Datasources.Ydb.ExponentialBackoff == nil {
Expand Down Expand Up @@ -307,6 +316,10 @@ func validateYdbConfig(c *config.TYdbConfig) error {
return fmt.Errorf("validate `ping_connection_timeout`: %v", err)
}

if c.Mode == config.TYdbConfig_MODE_UNSPECIFIED {
return fmt.Errorf("invalid `mode` value: %v", c.Mode)
}

if err := validateExponentialBackoff(c.ExponentialBackoff); err != nil {
return fmt.Errorf("validate `exponential_backoff`: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions app/server/datasource/rdbms/clickhouse/connection_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ func (r *rows) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collection) (
return transformer, nil
}

func (c *connectionHTTP) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)
func (c *connectionHTTP) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
c.logger.Dump(params.QueryText, params.QueryArgs.Values()...)

out, err := c.DB.QueryContext(ctx, query, args...)
out, err := c.DB.QueryContext(params.Ctx, params.QueryText, params.QueryArgs.Values()...)
if err != nil {
return nil, fmt.Errorf("query context: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions app/server/datasource/rdbms/clickhouse/connection_native.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func (r *rowsNative) MakeTransformer(ydbTypes []*Ydb.Type, cc conversion.Collect
return transformer, nil
}

func (c *connectionNative) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)
func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
c.logger.Dump(params.QueryText, params.QueryArgs.Values()...)

out, err := c.Conn.Query(ctx, query, args...)
out, err := c.Conn.Query(params.Ctx, params.QueryText, params.QueryArgs.Values()...)
if err != nil {
return nil, fmt.Errorf("query context: %w", err)
}
Expand Down
10 changes: 7 additions & 3 deletions app/server/datasource/rdbms/clickhouse/sql_formatter_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clickhouse

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -451,15 +452,18 @@ func TestMakeSQLFormatterQuery(t *testing.T) {

t.Run(tc.testName, func(t *testing.T) {
readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery(
logger, formatter, tc.selectReq, api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL)
context.Background(),
logger, formatter,
tc.selectReq,
api_service_protos.TReadSplitsRequest_FILTERING_OPTIONAL)
if tc.err != nil {
require.True(t, errors.Is(err, tc.err))
return
}

require.NoError(t, err)
require.Equal(t, tc.outputQuery, readSplitsQuery.Query)
require.Equal(t, tc.outputArgs, readSplitsQuery.Args)
require.Equal(t, tc.outputQuery, readSplitsQuery.QueryText)
require.Equal(t, tc.outputArgs, readSplitsQuery.QueryArgs.Values())
require.Equal(t, tc.outputSelectWhat, readSplitsQuery.What)
})
}
Expand Down
11 changes: 8 additions & 3 deletions app/server/datasource/rdbms/clickhouse/table_metadata_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ package clickhouse

import (
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
)

func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) {
func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) {
query := "SELECT name, type FROM system.columns WHERE table = ? and database = ?"
args := []any{request.Table, request.DataSourceInstance.Database}

return query, args
var args rdbms_utils.QueryArgs

args.AddUntyped(request.Table)
args.AddUntyped(request.DataSourceInstance.Database)

return query, &args
}
10 changes: 6 additions & 4 deletions app/server/datasource/rdbms/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,16 @@ func (ds *dataSourceImpl) doReadSplit(
split *api_service_protos.TSplit,
sink paging.Sink[any],
) error {
readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery(logger, ds.sqlFormatter, split.Select, request.Filtering)
readSplitsQuery, err := rdbms_utils.MakeReadSplitsQuery(ctx, logger, ds.sqlFormatter, split.Select, request.Filtering)
if err != nil {
return fmt.Errorf("make read split query: %w", err)
}

var conn rdbms_utils.Connection

err = ds.retrierSet.MakeConnection.Run(ctx, logger,
err = ds.retrierSet.MakeConnection.Run(
ctx,
logger,
func() error {
var makeConnErr error

Expand All @@ -110,8 +112,8 @@ func (ds *dataSourceImpl) doReadSplit(
func() error {
var queryErr error

if rows, queryErr = conn.Query(ctx, logger, readSplitsQuery.Query, readSplitsQuery.Args...); queryErr != nil {
return fmt.Errorf("query '%s' error: %w", readSplitsQuery.Query, queryErr)
if rows, queryErr = conn.Query(&readSplitsQuery.QueryParams); queryErr != nil {
return fmt.Errorf("query '%s' error: %w", readSplitsQuery.QueryText, queryErr)
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions app/server/datasource/rdbms/data_source_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewDataSourceFactory(
TypeMapper: postgresqlTypeMapper,
SchemaProvider: rdbms_utils.NewDefaultSchemaProvider(
postgresqlTypeMapper,
func(request *api_service_protos.TDescribeTableRequest) (string, []any) {
func(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) {
return postgresql.TableMetadataQuery(
request,
schemaGetters[api_common.EDataSourceKind_POSTGRESQL](request.DataSourceInstance))
Expand All @@ -108,7 +108,7 @@ func NewDataSourceFactory(
},
},
ydb: Preset{
SQLFormatter: ydb.NewSQLFormatter(),
SQLFormatter: ydb.NewSQLFormatter(cfg.Ydb.Mode),
ConnectionManager: ydb.NewConnectionManager(cfg.Ydb, connManagerBase),
TypeMapper: ydbTypeMapper,
SchemaProvider: ydb.NewSchemaProvider(ydbTypeMapper),
Expand Down Expand Up @@ -144,7 +144,7 @@ func NewDataSourceFactory(
TypeMapper: postgresqlTypeMapper,
SchemaProvider: rdbms_utils.NewDefaultSchemaProvider(
postgresqlTypeMapper,
func(request *api_service_protos.TDescribeTableRequest) (string, []any) {
func(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) {
return postgresql.TableMetadataQuery(
request,
schemaGetters[api_common.EDataSourceKind_GREENPLUM](request.DataSourceInstance))
Expand Down
8 changes: 3 additions & 5 deletions app/server/datasource/rdbms/ms_sql_server/connection.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package ms_sql_server

import (
"context"
"database/sql"

_ "github.com/denisenkom/go-mssqldb"
"go.uber.org/zap"

rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/fq-connector-go/common"
Expand All @@ -22,10 +20,10 @@ func (c Connection) Close() error {
return c.db.Close()
}

func (c Connection) Query(ctx context.Context, _ *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)
func (c Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
c.logger.Dump(params.QueryText, params.QueryArgs.Values()...)

out, err := c.db.QueryContext(ctx, query, args...)
out, err := c.db.QueryContext(params.Ctx, params.QueryText, params.QueryArgs.Values()...)

return rows{out}, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ import (
_ "github.com/denisenkom/go-mssqldb"

api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
)

func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) {
func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) {
// opts := request.GetDataSourceInstance().GetPgOptions().GetSchema()
query := "SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = @p1;"
args := []any{request.Table} // , opts}

return query, args
var args rdbms_utils.QueryArgs

args.AddUntyped(request.Table)

return query, &args
}
18 changes: 8 additions & 10 deletions app/server/datasource/rdbms/mysql/connection.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package mysql

import (
"context"
"fmt"
"sync/atomic"

"github.com/go-mysql-org/go-mysql/client"
"github.com/go-mysql-org/go-mysql/mysql"
"go.uber.org/zap"

"github.com/ydb-platform/fq-connector-go/app/config"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
Expand All @@ -26,16 +24,16 @@ func (c *Connection) Close() error {
return c.conn.Close()
}

func (c *Connection) Query(ctx context.Context, logger *zap.Logger, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)
func (c *Connection) Query(params *rdbms_utils.QueryParams) (rdbms_utils.Rows, error) {
c.logger.Dump(params.QueryText, params.QueryArgs.Values()...)

results := make(chan rowData, c.cfg.ResultChanCapacity)
result := &mysql.Result{}

r := &rows{
ctx: ctx,
ctx: params.Ctx,
cfg: c.cfg,
logger: logger,
logger: params.Logger,
rowChan: results,
errChan: make(chan error, 1),
lastRow: nil,
Expand All @@ -44,7 +42,7 @@ func (c *Connection) Query(ctx context.Context, logger *zap.Logger, query string
inputFinished: false,
}

stmt, err := c.conn.Prepare(query)
stmt, err := c.conn.Prepare(params.QueryText)
if err != nil {
return r, fmt.Errorf("mysql: failed to prepare query: %w", err)
}
Expand Down Expand Up @@ -78,14 +76,14 @@ func (c *Connection) Query(ctx context.Context, logger *zap.Logger, query string

select {
case r.rowChan <- rowData{newRow, result.Fields}:
case <-ctx.Done():
return ctx.Err()
case <-params.Ctx.Done():
return params.Ctx.Err()
}

return nil
},
nil,
args...,
params.QueryArgs.Values()...,
)
}()

Expand Down
10 changes: 7 additions & 3 deletions app/server/datasource/rdbms/mysql/table_metadata_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@ package mysql

import (
api_service_protos "github.com/ydb-platform/fq-connector-go/api/service/protos"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
)

func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, []any) {
func TableMetadataQuery(request *api_service_protos.TDescribeTableRequest) (string, *rdbms_utils.QueryArgs) {
// TODO: do not add 'unsigned' modifiers to column type and use the driver-provided fields instead.
// In MySQL schema and database are basically the same thing. So we can safely pass dbname as
// `schema_name` when quering `information_schema`.
query := `SELECT column_name, column_type FROM information_schema.columns
WHERE table_name = ? AND table_schema = ?`

args := []any{request.Table, request.GetDataSourceInstance().Database}
var args rdbms_utils.QueryArgs

return query, args
args.AddUntyped(request.Table)
args.AddUntyped(request.GetDataSourceInstance().Database)

return query, &args
}
Loading

0 comments on commit f76af43

Please sign in to comment.