From efb1b83427541842319b651b1d05373b59d308a7 Mon Sep 17 00:00:00 2001 From: Jiyong Huang Date: Sun, 28 Apr 2024 15:16:54 +0800 Subject: [PATCH] refactor(converter): provide schema when instantiating Signed-off-by: Jiyong Huang --- internal/converter/converter.go | 27 ++++++++++++--------------- internal/converter/ext_converter.go | 3 ++- internal/converter/ext_mock.go | 3 ++- pkg/message/artifacts.go | 2 +- 4 files changed, 17 insertions(+), 18 deletions(-) diff --git a/internal/converter/converter.go b/internal/converter/converter.go index 02eccb4d3e..b7eff52422 100644 --- a/internal/converter/converter.go +++ b/internal/converter/converter.go @@ -28,13 +28,16 @@ import ( ) func init() { - modules.RegisterConverter(message.FormatJson, func(_ string, _ string, _ string) (message.Converter, error) { - return json.GetConverter() + modules.RegisterConverter(message.FormatJson, func(_ string, _ string, _ string, schema map[string]*ast.JsonStreamField) (message.Converter, error) { + if schema == nil { + return json.GetConverter() + } + return json.NewFastJsonConverter(schema), nil }) - modules.RegisterConverter(message.FormatBinary, func(_ string, _ string, _ string) (message.Converter, error) { + modules.RegisterConverter(message.FormatBinary, func(_ string, _ string, _ string, _ map[string]*ast.JsonStreamField) (message.Converter, error) { return binary.GetConverter() }) - modules.RegisterConverter(message.FormatDelimited, func(_ string, _ string, delimiter string) (message.Converter, error) { + modules.RegisterConverter(message.FormatDelimited, func(_ string, _ string, delimiter string, _ map[string]*ast.JsonStreamField) (message.Converter, error) { return delimited.NewConverter(delimiter) }) } @@ -50,16 +53,6 @@ func GetOrCreateConverter(options *ast.Options) (c message.Converter, err error) if t == "" { t = message.FormatJson } - if t == message.FormatJson { - // it's unit test - if options.RuleID == "" || options.StreamName == "" { - return json.GetConverter() - } - if options.IsWildCard { - return json.NewFastJsonConverter(nil), nil - } - return json.NewFastJsonConverter(options.Schema), nil - } schemaFile := "" schemaName := options.SCHEMAID @@ -70,8 +63,12 @@ func GetOrCreateConverter(options *ast.Options) (c message.Converter, err error) schemaName = r[1] } } + schema := options.Schema + if options.IsWildCard { + schema = nil + } if c, ok := modules.Converters[t]; ok { - return c(schemaFile, schemaName, options.DELIMITER) + return c(schemaFile, schemaName, options.DELIMITER, schema) } return nil, fmt.Errorf("format type %s not supported", t) } diff --git a/internal/converter/ext_converter.go b/internal/converter/ext_converter.go index 28564c2745..eba3ab0663 100644 --- a/internal/converter/ext_converter.go +++ b/internal/converter/ext_converter.go @@ -20,12 +20,13 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/converter/protobuf" "github.com/lf-edge/ekuiper/v2/internal/pkg/def" "github.com/lf-edge/ekuiper/v2/internal/schema" + "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/message" "github.com/lf-edge/ekuiper/v2/pkg/modules" ) func init() { - modules.RegisterConverter(message.FormatProtobuf, func(schemaFileName string, schemaMessageName string, _ string) (message.Converter, error) { + modules.RegisterConverter(message.FormatProtobuf, func(schemaFileName string, schemaMessageName string, _ string, _ map[string]*ast.JsonStreamField) (message.Converter, error) { ffs, err := schema.GetSchemaFile(def.PROTOBUF, schemaFileName) if err != nil { return nil, err diff --git a/internal/converter/ext_mock.go b/internal/converter/ext_mock.go index dd6fcdc5b0..993dbae143 100644 --- a/internal/converter/ext_mock.go +++ b/internal/converter/ext_mock.go @@ -21,13 +21,14 @@ import ( "time" "github.com/lf-edge/ekuiper/contract/v2/api" + "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/message" "github.com/lf-edge/ekuiper/v2/pkg/modules" "github.com/lf-edge/ekuiper/v2/pkg/timex" ) func init() { - modules.RegisterConverter("mock", func(_ string, _ string, _ string) (message.Converter, error) { + modules.RegisterConverter("mock", func(_ string, _ string, _ string, _ map[string]*ast.JsonStreamField) (message.Converter, error) { return &MockConverter{}, nil }) } diff --git a/pkg/message/artifacts.go b/pkg/message/artifacts.go index bb9a127f64..43c0c26113 100644 --- a/pkg/message/artifacts.go +++ b/pkg/message/artifacts.go @@ -38,7 +38,7 @@ type Converter interface { // ConverterProvider The format, schema information are passed in by stream options // The columns information is defined in the source side, like file source -type ConverterProvider func(schemaFileName string, SchemaMessageName string, delimiter string) (Converter, error) +type ConverterProvider func(schemaFileName string, SchemaMessageName string, delimiter string, logicalSchema map[string]*ast.JsonStreamField) (Converter, error) type SchemaResetAbleConverter interface { ResetSchema(schema map[string]*ast.JsonStreamField)