Skip to content

Commit

Permalink
YQ-2560: add ydb connection_manager files
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Amelin authored and Woodey777 committed Dec 18, 2023
1 parent d88c33b commit b2274bf
Show file tree
Hide file tree
Showing 7 changed files with 453 additions and 0 deletions.
120 changes: 120 additions & 0 deletions app/server/datasource/rdbms/ydb/connection_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package ydb

import (
"context"
"database/sql"
"fmt"
"time"
"strings"

ydb_sdk "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/sugar"
"github.com/ydb-platform/fq-connector-go/library/go/core/log"
api_common "github.com/ydb-platform/fq-connector-go/api/common"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
)

var _ rdbms_utils.Connection = (*Connection)(nil)

type Connection struct {
*sql.DB
logger utils.QueryLogger
}

type rows struct {
*sql.Rows
}

func (r rows) MakeTransformer(ydbTypes []*Ydb.Type) (utils.RowTransformer[any], error) {
columns, err := r.ColumnTypes()
if err != nil {
return nil, fmt.Errorf("column types: %w", err)
}

typeNames := make([]string, 0, len(columns))
for _, column := range columns {
typeNames = append(typeNames, column.DatabaseTypeName())
}

transformer, err := transformerFromSQLTypes(typeNames, ydbTypes)
if err != nil {
return nil, fmt.Errorf("transformer from sql types: %w", err)
}

return transformer, nil
}

func (c Connection) Query(ctx context.Context, query string, args ...any) (rdbms_utils.Rows, error) {
c.logger.Dump(query, args...)

out, err := c.DB.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("query context: %w", err)
}

if err := out.Err(); err != nil {
defer func() {
if err := out.Close(); err != nil {
c.logger.Error("close rows", log.Error(err))
}
}()

return nil, fmt.Errorf("rows err: %w", err)
}

return rows{Rows: out}, nil
}

var _ rdbms_utils.ConnectionManager = (*connectionManager)(nil)

type connectionManager struct {
rdbms_utils.ConnectionManagerBase
// TODO: cache of connections, remove unused connections with TTL
}

func (c *connectionManager) Make(
ctx context.Context,
logger log.Logger,
dsi *api_common.TDataSourceInstance,
) (rdbms_utils.Connection, error) {

// TODO: add credentials (iam and basic) support

endpoint := strings.Join([]string{dsi.Endpoint.Host, fmt.Sprintf("%v", dsi.Endpoint.Port)}, ":")
dsn := sugar.DSN(endpoint, dsi.Database, dsi.UseTls)

ydb_driver, err := ydb_sdk.Open(ctx, dsn)
if err != nil {
return nil, fmt.Errorf("open driver error: %w", err)
}
ydb_conn, err := ydb_sdk.Connector(ydb_driver)
if err != nil {
return nil, fmt.Errorf("connector error: %w", err)
}

conn := sql.OpenDB(ydb_conn)

const (
maxIdleConns = 5
maxOpenConns = 10
connMaxLifetime = time.Hour
)

conn.SetMaxIdleConns(maxIdleConns)
conn.SetMaxOpenConns(maxOpenConns)
conn.SetConnMaxLifetime(connMaxLifetime)

queryLogger := c.QueryLoggerFactory.Make(logger)

return &Connection{DB: conn, logger: queryLogger}, nil
}

func (c *connectionManager) Release(logger log.Logger, conn rdbms_utils.Connection) {
utils.LogCloserError(logger, conn, "close clickhouse connection")
}

func NewConnectionManager(cfg rdbms_utils.ConnectionManagerBase) rdbms_utils.ConnectionManager {
return &connectionManager{ConnectionManagerBase: cfg}
}
2 changes: 2 additions & 0 deletions app/server/datasource/rdbms/ydb/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package ydb contains code specific for YDB database.
package ydb
73 changes: 73 additions & 0 deletions app/server/datasource/rdbms/ydb/schema_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ydb

import (
"context"
"fmt"
"path"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/fq-connector-go/app/server/utils"
my_log "github.com/ydb-platform/fq-connector-go/library/go/core/log"
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
api_service_protos "github.com/ydb-platform/fq-connector-go/libgo/service/protos"
)

type schemaProvider struct {
typeMapper utils.TypeMapper
}
var _ rdbms_utils.SchemaProvider = (*schemaProvider)(nil)

func (f *schemaProvider) GetSchema(
ctx context.Context,
logger my_log.Logger,
conn rdbms_utils.Connection,
request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TSchema, error) {
ydb_conn := conn.(*Connection)
db, err := ydb.Unwrap(ydb_conn.DB)
if err != nil {
return nil, fmt.Errorf("unwrap connection: %w", err)
}

desc := options.Description{}
prefix := path.Join(db.Name(), request.Table)
cl := db.Table()

err = cl.Do(
ctx,
func(ctx context.Context, s table.Session) error {
desc, err = s.DescribeTable(ctx, prefix)
if err != nil {
return err
}
return nil
},
)
if err != nil {
return nil, fmt.Errorf("get table description: %w", err)
}

sb := rdbms_utils.NewSchemaBuilder(f.typeMapper, request.TypeMappingSettings)
for _, column := range desc.Columns {
if err := sb.AddColumn(column.Name, column.Type.String()); err != nil {
return nil, fmt.Errorf("add column to schema builder: %w", err)
}
}

schema, err := sb.Build(logger)
if err != nil {
return nil, fmt.Errorf("build schema: %w", err)
}

return schema, nil
}

func NewSchemaProvider(
typeMapper utils.TypeMapper,
) rdbms_utils.SchemaProvider {
return &schemaProvider{
typeMapper: typeMapper,
}
}
73 changes: 73 additions & 0 deletions app/server/datasource/rdbms/ydb/sql_formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package ydb

import (

rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
api_service_protos "github.com/ydb-platform/fq-connector-go/libgo/service/protos"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
)

var _ rdbms_utils.SQLFormatter = (*sqlFormatter)(nil)

type sqlFormatter struct {
}

// for pushdouwn feature
func (f *sqlFormatter) supportsType(typeID Ydb.Type_PrimitiveTypeId) bool {
switch typeID {
case Ydb.Type_BOOL:
return true
case Ydb.Type_INT8:
return true
case Ydb.Type_UINT8:
return true
case Ydb.Type_INT16:
return true
case Ydb.Type_UINT16:
return true
case Ydb.Type_INT32:
return true
case Ydb.Type_UINT32:
return true
case Ydb.Type_INT64:
return true
case Ydb.Type_UINT64:
return true
case Ydb.Type_FLOAT:
return true
case Ydb.Type_DOUBLE:
return true
case Ydb.Type_STRING:
return true
default:
return false
}
}

func (f *sqlFormatter) supportsConstantValueExpression(t *Ydb.Type) bool {
switch v := t.Type.(type) {
case *Ydb.Type_TypeId:
return f.supportsType(v.TypeId)
case *Ydb.Type_OptionalType:
return f.supportsConstantValueExpression(v.OptionalType.Item)
default:
return false
}
}

func (f sqlFormatter) SupportsPushdownExpression(expression *api_service_protos.TExpression) bool {
return false // TODO: pushdown support
}

func (f sqlFormatter) GetPlaceholder(_ int) string {
return "?"
}

// TODO: add identifiers processing
func (f sqlFormatter) SanitiseIdentifier(ident string) string {
return ident
}

func NewSQLFormatter() rdbms_utils.SQLFormatter {
return sqlFormatter{}
}
Loading

0 comments on commit b2274bf

Please sign in to comment.