Skip to content

Commit

Permalink
refactor(converter): provide schema when instantiating
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying committed Apr 28, 2024
1 parent 624e3a9 commit efb1b83
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 18 deletions.
27 changes: 12 additions & 15 deletions internal/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@ import (
)

func init() {
modules.RegisterConverter(message.FormatJson, func(_ string, _ string, _ string) (message.Converter, error) {
return json.GetConverter()
modules.RegisterConverter(message.FormatJson, func(_ string, _ string, _ string, schema map[string]*ast.JsonStreamField) (message.Converter, error) {
if schema == nil {
return json.GetConverter()
}
return json.NewFastJsonConverter(schema), nil
})
modules.RegisterConverter(message.FormatBinary, func(_ string, _ string, _ string) (message.Converter, error) {
modules.RegisterConverter(message.FormatBinary, func(_ string, _ string, _ string, _ map[string]*ast.JsonStreamField) (message.Converter, error) {
return binary.GetConverter()
})
modules.RegisterConverter(message.FormatDelimited, func(_ string, _ string, delimiter string) (message.Converter, error) {
modules.RegisterConverter(message.FormatDelimited, func(_ string, _ string, delimiter string, _ map[string]*ast.JsonStreamField) (message.Converter, error) {
return delimited.NewConverter(delimiter)
})
}
Expand All @@ -50,16 +53,6 @@ func GetOrCreateConverter(options *ast.Options) (c message.Converter, err error)
if t == "" {
t = message.FormatJson
}
if t == message.FormatJson {
// it's unit test
if options.RuleID == "" || options.StreamName == "" {
return json.GetConverter()
}
if options.IsWildCard {
return json.NewFastJsonConverter(nil), nil
}
return json.NewFastJsonConverter(options.Schema), nil
}

schemaFile := ""
schemaName := options.SCHEMAID
Expand All @@ -70,8 +63,12 @@ func GetOrCreateConverter(options *ast.Options) (c message.Converter, err error)
schemaName = r[1]
}
}
schema := options.Schema
if options.IsWildCard {
schema = nil
}
if c, ok := modules.Converters[t]; ok {
return c(schemaFile, schemaName, options.DELIMITER)
return c(schemaFile, schemaName, options.DELIMITER, schema)
}
return nil, fmt.Errorf("format type %s not supported", t)
}
3 changes: 2 additions & 1 deletion internal/converter/ext_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/converter/protobuf"
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
"github.com/lf-edge/ekuiper/v2/internal/schema"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/message"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
)

func init() {
modules.RegisterConverter(message.FormatProtobuf, func(schemaFileName string, schemaMessageName string, _ string) (message.Converter, error) {
modules.RegisterConverter(message.FormatProtobuf, func(schemaFileName string, schemaMessageName string, _ string, _ map[string]*ast.JsonStreamField) (message.Converter, error) {
ffs, err := schema.GetSchemaFile(def.PROTOBUF, schemaFileName)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion internal/converter/ext_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/message"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
)

func init() {
modules.RegisterConverter("mock", func(_ string, _ string, _ string) (message.Converter, error) {
modules.RegisterConverter("mock", func(_ string, _ string, _ string, _ map[string]*ast.JsonStreamField) (message.Converter, error) {
return &MockConverter{}, nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/message/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Converter interface {

// ConverterProvider The format, schema information are passed in by stream options
// The columns information is defined in the source side, like file source
type ConverterProvider func(schemaFileName string, SchemaMessageName string, delimiter string) (Converter, error)
type ConverterProvider func(schemaFileName string, SchemaMessageName string, delimiter string, logicalSchema map[string]*ast.JsonStreamField) (Converter, error)

type SchemaResetAbleConverter interface {
ResetSchema(schema map[string]*ast.JsonStreamField)
Expand Down

0 comments on commit efb1b83

Please sign in to comment.