diff --git a/go/vt/vttablet/tabletserver/action.go b/go/vt/vttablet/tabletserver/action.go index fdc82c5f45..27de0b31ca 100644 --- a/go/vt/vttablet/tabletserver/action.go +++ b/go/vt/vttablet/tabletserver/action.go @@ -20,10 +20,6 @@ type ActionInterface interface { SetParams(args ActionArgs) error GetRule() *rules.Rule - - GetSkipFlag() bool - - SetSkipFlag(skip bool) } type ActionArgs interface { diff --git a/go/vt/vttablet/tabletserver/action_builtin.go b/go/vt/vttablet/tabletserver/action_builtin.go index 7819d12d09..297941d40d 100644 --- a/go/vt/vttablet/tabletserver/action_builtin.go +++ b/go/vt/vttablet/tabletserver/action_builtin.go @@ -3,7 +3,6 @@ package tabletserver import ( "context" "fmt" - "regexp" "time" "github.com/BurntSushi/toml" @@ -20,8 +19,6 @@ type ContinueAction struct { // Action is the action to take if the rule matches Action rules.Action - - skipFlag bool } func (p *ContinueAction) BeforeExecution(_ *QueryExecutor) *ActionExecutionResponse { @@ -55,16 +52,6 @@ type FailAction struct { // Action is the action to take if the rule matches Action rules.Action - - skipFlag bool -} - -func (p *ContinueAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *ContinueAction) SetSkipFlag(skip bool) { - p.skipFlag = skip } func (p *FailAction) BeforeExecution(_ *QueryExecutor) *ActionExecutionResponse { @@ -93,21 +80,11 @@ func (p *FailAction) GetRule() *rules.Rule { return p.Rule } -func (p *FailAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *FailAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - type FailRetryAction struct { Rule *rules.Rule // Action is the action to take if the rule matches Action rules.Action - - skipFlag bool } func (p *FailRetryAction) BeforeExecution(_ *QueryExecutor) *ActionExecutionResponse { @@ -136,21 +113,11 @@ func (p *FailRetryAction) GetRule() *rules.Rule { return p.Rule } -func (p *FailRetryAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *FailRetryAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - type BufferAction struct { Rule *rules.Rule // Action is the action to take if the rule matches Action rules.Action - - skipFlag bool } func (p *BufferAction) BeforeExecution(qre *QueryExecutor) *ActionExecutionResponse { @@ -198,14 +165,6 @@ func (p *BufferAction) GetRule() *rules.Rule { return p.Rule } -func (p *BufferAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *BufferAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - type ConcurrencyControlAction struct { Rule *rules.Rule @@ -213,8 +172,6 @@ type ConcurrencyControlAction struct { Action rules.Action Args *ConcurrencyControlActionArgs - - skipFlag bool } type ConcurrencyControlActionArgs struct { @@ -292,14 +249,6 @@ func (p *ConcurrencyControlAction) GetRule() *rules.Rule { return p.Rule } -func (p *ConcurrencyControlAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *ConcurrencyControlAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - type WasmPluginAction struct { Rule *rules.Rule @@ -307,8 +256,6 @@ type WasmPluginAction struct { Action rules.Action Args *WasmPluginActionArgs - - skipFlag bool } type WasmPluginActionArgs struct { @@ -337,13 +284,13 @@ func (args *WasmPluginActionArgs) Parse(stringParams string) (ActionArgs, error) func (p *WasmPluginAction) BeforeExecution(qre *QueryExecutor) *ActionExecutionResponse { controller := qre.tsv.qe.wasmPluginController - ok, module := controller.VM.GetWasmModule(p.Args.WasmBinaryName) + ok, module := controller.VM.GetWasmModule(p.GetRule().Name) if !ok { wasmBytes, err := controller.GetWasmBytesByBinaryName(qre.ctx, p.Args.WasmBinaryName) if err != nil { return &ActionExecutionResponse{Err: err} } - module, err = controller.VM.InitWasmModule(p.Args.WasmBinaryName, wasmBytes) + module, err = controller.VM.InitWasmModule(p.GetRule().Name, wasmBytes) if err != nil { return &ActionExecutionResponse{Err: err} } @@ -351,6 +298,7 @@ func (p *WasmPluginAction) BeforeExecution(qre *QueryExecutor) *ActionExecutionR instance, err := module.NewInstance(qre) if err != nil { + //todo wasm: if instance is nil, we will not be able to get the it in AfterExecution. We need to handle this case return &ActionExecutionResponse{Err: err} } @@ -399,91 +347,3 @@ func (p *WasmPluginAction) SetParams(args ActionArgs) error { func (p *WasmPluginAction) GetRule() *rules.Rule { return p.Rule } - -func (p *WasmPluginAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *WasmPluginAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - -type SkipFilterAction struct { - Rule *rules.Rule - - // Action is the action to take if the rule matches - Action rules.Action - - Args *SkipFilterActionArgs - - skipFlag bool -} - -type SkipFilterActionArgs struct { - AllowRegexString string `toml:"skip_filter_regex"` - AllowRegex *regexp.Regexp -} - -func (args *SkipFilterActionArgs) Parse(stringParams string) (ActionArgs, error) { - s := &SkipFilterActionArgs{} - if stringParams == "" { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "stringParams is empty when parsing skip filter action args") - } - - userInputTOML := ConvertUserInputToTOML(stringParams) - err := toml.Unmarshal([]byte(userInputTOML), s) - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "error when parsing skip filter action args: %v", err) - } - s.AllowRegex, err = regexp.Compile(fmt.Sprintf("^%s$", s.AllowRegexString)) - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "error when compiling skip filter action args: %v", err) - } - - return s, nil -} - -func (p *SkipFilterAction) BeforeExecution(qre *QueryExecutor) *ActionExecutionResponse { - findSelf := false - for _, a := range qre.matchedActionList { - if a.GetRule().Name == p.GetRule().Name { - findSelf = true - continue - } - if findSelf { - if p.Args.AllowRegex.MatchString(a.GetRule().Name) { - a.SetSkipFlag(true) - } - } - } - return &ActionExecutionResponse{Err: nil} -} - -func (p *SkipFilterAction) AfterExecution(qre *QueryExecutor, reply *sqltypes.Result, err error) *ActionExecutionResponse { - return &ActionExecutionResponse{Reply: reply, Err: err} -} - -func (p *SkipFilterAction) ParseParams(argsStr string) (ActionArgs, error) { - return p.Args.Parse(argsStr) -} - -func (p *SkipFilterAction) SetParams(args ActionArgs) error { - skipFilterArgs, ok := args.(*SkipFilterActionArgs) - if !ok { - return fmt.Errorf("args :%v is not a valid SkipFilterActionArgs)", args) - } - p.Args = skipFilterArgs - return nil -} - -func (p *SkipFilterAction) GetRule() *rules.Rule { - return p.Rule -} - -func (p *SkipFilterAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *SkipFilterAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} diff --git a/go/vt/vttablet/tabletserver/action_factory.go b/go/vt/vttablet/tabletserver/action_factory.go index 190a85da1f..e8baa28b28 100644 --- a/go/vt/vttablet/tabletserver/action_factory.go +++ b/go/vt/vttablet/tabletserver/action_factory.go @@ -60,8 +60,6 @@ func CreateActionInstance(action rules.Action, rule *rules.Rule) (ActionInterfac actInst, err = &ConcurrencyControlAction{Rule: rule, Action: action}, nil case rules.QRWasmPlugin: actInst, err = &WasmPluginAction{Rule: rule, Action: action}, nil - case rules.QRSkipFilter: - actInst, err = &SkipFilterAction{Rule: rule, Action: action}, nil default: log.Errorf("unknown action: %v", action) actInst, err = nil, fmt.Errorf("unknown action: %v", action) diff --git a/go/vt/vttablet/tabletserver/action_stats.go b/go/vt/vttablet/tabletserver/action_stats.go new file mode 100644 index 0000000000..07db32a14e --- /dev/null +++ b/go/vt/vttablet/tabletserver/action_stats.go @@ -0,0 +1,26 @@ +package tabletserver + +import ( + "time" + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/servenv" +) + +type ActionStats struct { + FilterBeforeExecutionTiming *servenv.TimingsWrapper + FilterAfterExecutionTiming *servenv.TimingsWrapper + FilterErrorCounts *stats.CountersWithSingleLabel + FilterQPSRates *stats.Rates + FilterWasmMemorySize *stats.GaugesWithMultiLabels +} + +func NewActionStats(exporter *servenv.Exporter) *ActionStats { + stats := &ActionStats{ + FilterBeforeExecutionTiming: exporter.NewTimings("FilterBeforeExecution", "Filter before execution timings", "Name"), + FilterAfterExecutionTiming: exporter.NewTimings("FilterAfterExecution", "Filter before execution timings", "Name"), + FilterErrorCounts: exporter.NewCountersWithSingleLabel("FilterErrorCounts", "filter error counts", "Name"), + FilterWasmMemorySize: exporter.NewGaugesWithMultiLabels("FilterWasmMemorySize", "Wasm memory size", []string{"Name", "BeforeOrAfter"}), + } + stats.FilterQPSRates = exporter.NewRates("FilterQps", stats.FilterBeforeExecutionTiming, 15*60/5, 5*time.Second) + return stats +} diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 2011dcb1d7..086e21c59a 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -187,6 +187,8 @@ type QueryEngine struct { // stats queryCounts, queryTimes, queryErrorCounts, queryRowsAffected, queryRowsReturned *stats.CountersWithMultiLabels + // actionStats for filters + actionStats *ActionStats // Loggers accessCheckerLogger *logutil.ThrottledLogger @@ -281,6 +283,8 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine { qe.queryRowsReturned = env.Exporter().NewCountersWithMultiLabels("QueryRowsReturned", "query rows returned", []string{"Table", "Plan"}) qe.queryErrorCounts = env.Exporter().NewCountersWithMultiLabels("QueryErrorCounts", "query error counts", []string{"Table", "Plan"}) + qe.actionStats = NewActionStats(env.Exporter()) + env.Exporter().HandleFunc("/debug/ccl", qe.concurrencyController.ServeHTTP) env.Exporter().HandleFunc("/debug/hotrows", qe.txSerializer.ServeHTTP) env.Exporter().HandleFunc("/debug/tablet_plans", qe.handleHTTPQueryPlans) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 757ea97150..94763d6632 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -611,12 +611,16 @@ func (qre *QueryExecutor) runActionListBeforeExecution() (*sqltypes.Result, erro return nil, nil } for _, a := range qre.matchedActionList { - if !a.GetSkipFlag() { - resp := a.BeforeExecution(qre) - qre.calledActionList = append(qre.calledActionList, a) - if resp.Reply != nil || resp.Err != nil { - return resp.Reply, resp.Err - } + startTime := time.Now() + // execute the filter action + resp := a.BeforeExecution(qre) + qre.tsv.qe.actionStats.FilterBeforeExecutionTiming.Add(a.GetRule().Name, time.Since(startTime)) + qre.calledActionList = append(qre.calledActionList, a) + if resp.Err != nil { + qre.tsv.qe.actionStats.FilterErrorCounts.Add(a.GetRule().Name, 1) + } + if resp.Reply != nil || resp.Err != nil { + return resp.Reply, resp.Err } } return nil, nil @@ -631,7 +635,13 @@ func (qre *QueryExecutor) runActionListAfterExecution(reply *sqltypes.Result, er for i := len(qre.calledActionList) - 1; i >= 0; i-- { a := qre.calledActionList[i] + startTime := time.Now() + // execute the filter action resp := a.AfterExecution(qre, newReply, newErr) + qre.tsv.qe.actionStats.FilterAfterExecutionTiming.Add(a.GetRule().Name, time.Since(startTime)) + if resp.Err != nil { + qre.tsv.qe.actionStats.FilterErrorCounts.Add(a.GetRule().Name, 1) + } newReply, newErr = resp.Reply, resp.Err } return newReply, newErr diff --git a/go/vt/vttablet/tabletserver/query_executor_filter_test.go b/go/vt/vttablet/tabletserver/query_executor_filter_test.go index 20b348960b..22b906df58 100644 --- a/go/vt/vttablet/tabletserver/query_executor_filter_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_filter_test.go @@ -66,7 +66,10 @@ func TestQueryExecutor_runActionListBeforeExecution(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - qre := &QueryExecutor{ctx: ctx} + db := setUpQueryExecutorTest(t) + defer db.Close() + tsv := newTestTabletServer(ctx, noFlags, db) + qre := newTestQueryExecutor(ctx, tsv, "select 1", 0) qre.matchedActionList = tt.actionList _, err := qre.runActionListBeforeExecution() tt.wantErr(t, err, "runActionListBeforeExecution()") @@ -129,7 +132,10 @@ func TestQueryExecutor_runActionListAfterExecution(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - qre := &QueryExecutor{ctx: ctx} + db := setUpQueryExecutorTest(t) + defer db.Close() + tsv := newTestTabletServer(ctx, noFlags, db) + qre := newTestQueryExecutor(ctx, tsv, "select 1", 0) qre.matchedActionList = tt.actionList qr := &sqltypes.Result{} var err error @@ -162,7 +168,10 @@ func TestQueryExecutor_actions_can_be_skipped(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - qre := &QueryExecutor{ctx: ctx} + db := setUpQueryExecutorTest(t) + defer db.Close() + tsv := newTestTabletServer(ctx, noFlags, db) + qre := newTestQueryExecutor(ctx, tsv, "select 1", 0) qre.matchedActionList = tt.actionList qr, err := qre.runActionListBeforeExecution() tt.wantErr(t, err, "runActionListBeforeExecution()") diff --git a/go/vt/vttablet/tabletserver/rules/rules.go b/go/vt/vttablet/tabletserver/rules/rules.go index 543bb5d8b7..14269734a4 100644 --- a/go/vt/vttablet/tabletserver/rules/rules.go +++ b/go/vt/vttablet/tabletserver/rules/rules.go @@ -772,7 +772,6 @@ const ( QRBuffer QRConcurrencyControl QRWasmPlugin - QRSkipFilter ) func ParseStringToAction(s string) (Action, error) { @@ -789,8 +788,6 @@ func ParseStringToAction(s string) (Action, error) { return QRConcurrencyControl, nil case "WASM_PLUGIN": return QRWasmPlugin, nil - case "SKIP_FILTER": - return QRSkipFilter, nil default: return QRContinue, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid Action %s", s) } @@ -810,8 +807,6 @@ func (act Action) ToString() string { return "CONCURRENCY_CONTROL" case QRWasmPlugin: return "WASM_PLUGIN" - case QRSkipFilter: - return "SKIP_FILTER" default: return "INVALID" } diff --git a/go/vt/vttablet/tabletserver/skip_filter_test.go b/go/vt/vttablet/tabletserver/skip_filter_test.go deleted file mode 100644 index 961bf0ec3e..0000000000 --- a/go/vt/vttablet/tabletserver/skip_filter_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package tabletserver - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" -) - -func TestSkipFilter(t *testing.T) { - - skipRule := rules.NewActiveQueryRule("skip all filters", "s1", rules.QRSkipFilter) - skipActionArgs := &SkipFilterActionArgs{} - - args, _ := skipActionArgs.Parse(`skip_filter_regex=".*"`) - skipActionArgs = args.(*SkipFilterActionArgs) - skipFilter := &SkipFilterAction{Rule: skipRule, Action: rules.QRSkipFilter, Args: skipActionArgs} - - argsTestRegex, _ := skipActionArgs.Parse(`skip_filter_regex="c.*|d1"`) - skipActionArgsTestRegex := argsTestRegex.(*SkipFilterActionArgs) - skipFilterTestRegex := &SkipFilterAction{Rule: skipRule, Action: rules.QRSkipFilter, Args: skipActionArgsTestRegex} - - tests := []struct { - name string - actionList []ActionInterface - expectedCalledNameList []string - }{ - - { - name: "skip all", - actionList: []ActionInterface{ - skipFilter, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c1", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c2", rules.QRContinue), Action: rules.QRContinue}}, - expectedCalledNameList: []string{"s1"}, - }, - { - name: "none to skip", - actionList: []ActionInterface{ - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c1", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c2", rules.QRContinue), Action: rules.QRContinue}, - skipFilter}, - expectedCalledNameList: []string{"c1", "c2", "s1"}, - }, - { - name: "just skip c2", - actionList: []ActionInterface{ - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c1", rules.QRContinue), Action: rules.QRContinue}, - skipFilter, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c2", rules.QRContinue), Action: rules.QRContinue}}, - expectedCalledNameList: []string{"c1", "s1"}, - }, - { - name: "test regex", - actionList: []ActionInterface{ - skipFilterTestRegex, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c1", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c2", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "d1", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "d2", rules.QRContinue), Action: rules.QRContinue}}, - expectedCalledNameList: []string{"s1", "d2"}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - - qre := &QueryExecutor{} - qre.matchedActionList = tt.actionList - _, err := qre.runActionListBeforeExecution() - assert.Nil(t, err) - assert.Equal(t, len(tt.expectedCalledNameList), len(qre.calledActionList)) - for i, name := range tt.expectedCalledNameList { - assert.Equal(t, name, qre.calledActionList[i].GetRule().Name) - } - }) - } -} diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_common.go b/go/vt/vttablet/tabletserver/wasm_plugin_common.go index 4be6b19e25..31e6a83046 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_common.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_common.go @@ -1,19 +1,7 @@ package tabletserver -import "unsafe" - const AbiVersion = "v1alpha1" -const WASMER = "wamser-go" const WAZERO = "wazero" const WasmBinaryTableName = "mysql.wasm_binary" - -func ptrToString(ptr uint32, size uint32) string { - return unsafe.String((*byte)(unsafe.Pointer(uintptr(ptr))), size) -} - -func stringToPtr(s string) (uint32, uint32) { - ptr := unsafe.Pointer(unsafe.StringData(s)) - return uint32(uintptr(ptr)), uint32(len(s)) -} diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_controller.go b/go/vt/vttablet/tabletserver/wasm_plugin_controller.go index 5f853b25f7..bdf5ac803c 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_controller.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_controller.go @@ -19,7 +19,7 @@ var ( ) func registerWasmFlags(fs *pflag.FlagSet) { - fs.StringVar(&RuntimeType, "wasm_runtime_type", WAZERO, "the runtime for wasm plugin. default is wazero.") + fs.StringVar(&RuntimeType, "wasm_runtime_type", WAZERO, "the runtime for wasm plugin. default is wazero. options: [wazero]") } func init() { @@ -53,7 +53,7 @@ func CalcMd5String32(data []byte) string { } func (wpc *WasmPluginController) GetWasmBytesByBinaryName(ctx context.Context, wasmBinaryName string) ([]byte, error) { - query := getQueryByName(wasmBinaryName) + query := generateWasmQueryByName(wasmBinaryName) qr, err := wpc.qe.ExecuteQuery(ctx, query) if err != nil { return nil, fmt.Errorf("get wasm binary by name %s failed : %v", wasmBinaryName, err) @@ -62,34 +62,37 @@ func (wpc *WasmPluginController) GetWasmBytesByBinaryName(ctx context.Context, w return nil, fmt.Errorf("get wasm binary by name %s failed : qr len is %v instead of 1", wasmBinaryName, len(qr.Named().Rows)) } + bytes, err := qr.Named().Rows[0].ToBytes("data") + if err != nil { + return nil, fmt.Errorf("get wasm binary by name %s failed : uncompress data error: %v", wasmBinaryName, err) + } + compressAlgorithm := qr.Named().Rows[0].AsString("compress_algorithm", "") - if compressAlgorithm != wasmBinaryCompressAlgorithm { + if compressAlgorithm != "" && compressAlgorithm != wasmBinaryCompressAlgorithm { return nil, fmt.Errorf("get wasm binary by name %s failed : compress algorithm is %v instead of %v", wasmBinaryName, compressAlgorithm, wasmBinaryCompressAlgorithm) } - compressedBytes, err := qr.Named().Rows[0].ToBytes("data") - if err != nil { - return nil, fmt.Errorf("get wasm binary by name %s failed : uncompress data error: %v", wasmBinaryName, err) + if compressAlgorithm == wasmBinaryCompressAlgorithm { + bytes, err = UnCompressByBZip2(bytes) + if err != nil { + return nil, fmt.Errorf("get wasm binary by name %s failed : uncompress data error: %v", wasmBinaryName, err) + } } - originalBytes, err := UnCompressByBZip2(compressedBytes) - if err != nil { - return nil, fmt.Errorf("get wasm binary by name %s failed : uncompress data error: %v", wasmBinaryName, err) - } - - hash := CalcMd5String32(originalBytes) hashInTable := qr.Named().Rows[0].AsString("hash_before_compress", "") - if hash != hashInTable { - return nil, fmt.Errorf("get wasm binary by name %s failed : hash is not equal", wasmBinaryName) + if hashInTable != "" { + hash := CalcMd5String32(bytes) + if hash != hashInTable { + return nil, fmt.Errorf("get wasm binary by name %s failed : hash is not equal", wasmBinaryName) + } } - return originalBytes, nil + + return bytes, nil } func initWasmVM() WasmVM { switch RuntimeType { - //case WASMER: - // return initWasmerRuntime(qe) case WAZERO: return initWazeroVM() default: diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go b/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go index 6a91be4479..441df68f1f 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go @@ -43,152 +43,7 @@ func ErrorLogOnHost(ctx context.Context, mod api.Module, ptr, size uint32) uint3 return uint32(StatusOK) } -const ( - SharedScopeMODULE = uint32(0) - SharedScopeTABLET = uint32(1) -) - -func SetValueByKeyOnHost(ctx context.Context, mod api.Module, wazeroRuntime *WazeroVM, scope uint32, hostModulePtr uint64, keyPtr, keySize, valuePtr, valueSize uint32) uint32 { - switch scope { - case SharedScopeMODULE: - return setModuleValueByKeyOnHost(ctx, mod, hostModulePtr, keyPtr, keySize, valuePtr, valueSize) - case SharedScopeTABLET: - return setTabletValueByKeyOnHost(ctx, mod, wazeroRuntime, keyPtr, keySize, valuePtr, valueSize) - default: - log.Errorf("unknown scope %d", scope) - return uint32(StatusBadArgument) - } -} - -func GetValueByKeyOnHost(ctx context.Context, mod api.Module, wazeroRuntime *WazeroVM, scope uint32, hostModulePtr uint64, keyPtr, keySize, returnValuePtr, returnValueSize uint32) uint32 { - switch scope { - case SharedScopeMODULE: - return getModuleValueByKeyOnHost(ctx, mod, hostModulePtr, keyPtr, keySize, returnValuePtr, returnValueSize) - case SharedScopeTABLET: - return getTabletValueByKeyOnHost(ctx, mod, wazeroRuntime, keyPtr, keySize, returnValuePtr, returnValueSize) - default: - log.Errorf("unknown scope %d", scope) - return uint32(StatusBadArgument) - } -} - -func setTabletValueByKeyOnHost(ctx context.Context, mod api.Module, wazeroRuntime *WazeroVM, keyPtr, keySize, valuePtr, valueSize uint32) uint32 { - wazeroRuntime.mu.Lock() - defer wazeroRuntime.mu.Unlock() - - keyBytes, ok := mod.Memory().Read(keyPtr, keySize) - if !ok { - return uint32(StatusInternalFailure) - } - key := string(keyBytes) - - valueBytes, ok := mod.Memory().Read(valuePtr, valueSize) - if !ok { - return uint32(StatusInternalFailure) - } - - wazeroRuntime.hostSharedVariables[key] = valueBytes - return uint32(StatusOK) -} - -func getTabletValueByKeyOnHost(ctx context.Context, mod api.Module, wazeroRuntime *WazeroVM, keyPtr, keySize, returnValuePtr, returnValueSize uint32) uint32 { - wazeroRuntime.mu.Lock() - defer wazeroRuntime.mu.Unlock() - - keyBytes, ok := mod.Memory().Read(keyPtr, keySize) - if !ok { - return uint32(StatusInternalFailure) - } - key := string(keyBytes) - - _, exist := wazeroRuntime.hostSharedVariables[key] - if !exist { - return uint32(StatusNotFound) - } - return uint32(copyHostBytesIntoGuest(ctx, mod, wazeroRuntime.hostSharedVariables[key], returnValuePtr, returnValueSize)) -} - -func setModuleValueByKeyOnHost(ctx context.Context, mod api.Module, hostModulePtr uint64, keyPtr, keySize, valuePtr, valueSize uint32) uint32 { - module := (*WazeroModule)(unsafe.Pointer(uintptr(hostModulePtr))) - module.mu.Lock() - defer module.mu.Unlock() - - keyBytes, ok := mod.Memory().Read(keyPtr, keySize) - if !ok { - return uint32(StatusInternalFailure) - } - key := string(keyBytes) - - valueBytes, ok := mod.Memory().Read(valuePtr, valueSize) - if !ok { - return uint32(StatusInternalFailure) - } - - module.moduleSharingVariables[key] = valueBytes - return uint32(StatusOK) -} - -func getModuleValueByKeyOnHost(ctx context.Context, mod api.Module, hostModulePtr uint64, keyPtr, keySize, returnValuePtr, returnValueSize uint32) uint32 { - module := (*WazeroModule)(unsafe.Pointer(uintptr(hostModulePtr))) - module.mu.Lock() - defer module.mu.Unlock() - - keyBytes, ok := mod.Memory().Read(keyPtr, keySize) - if !ok { - return uint32(StatusInternalFailure) - } - key := string(keyBytes) - - _, exist := module.moduleSharingVariables[key] - if !exist { - return uint32(StatusNotFound) - } - - return uint32(copyHostBytesIntoGuest(ctx, mod, module.moduleSharingVariables[key], returnValuePtr, returnValueSize)) -} - -func LockOnHost(wazeroRuntime *WazeroVM, scope uint32, hostModulePtr uint64) { - switch scope { - case SharedScopeMODULE: - moduleLockOnHost(hostModulePtr) - case SharedScopeTABLET: - tabletLockOnHost(wazeroRuntime) - default: - log.Errorf("unknown scope %d", scope) - } -} - -func UnlockOnHost(wazeroRuntime *WazeroVM, scope uint32, hostModulePtr uint64) { - switch scope { - case SharedScopeMODULE: - moduleUnlockOnHost(hostModulePtr) - case SharedScopeTABLET: - tabletUnLockOnHost(wazeroRuntime) - default: - log.Errorf("unknown scope %d", scope) - } -} - -func tabletLockOnHost(wazeroRuntime *WazeroVM) { - wazeroRuntime.globalMu.Lock() -} - -func tabletUnLockOnHost(wazeroRuntime *WazeroVM) { - wazeroRuntime.globalMu.Unlock() -} - -func moduleLockOnHost(hostModulePtr uint64) { - module := (*WazeroModule)(unsafe.Pointer(uintptr(hostModulePtr))) - module.moduleMu.Lock() -} - -func moduleUnlockOnHost(hostModulePtr uint64) { - module := (*WazeroModule)(unsafe.Pointer(uintptr(hostModulePtr))) - module.moduleMu.Unlock() -} - -func GetQueryOnHost(ctx context.Context, mod api.Module, hostInstancePtr uint64, returnValueData, - returnValueSize uint32) uint32 { +func GetQueryOnHost(ctx context.Context, mod api.Module, hostInstancePtr uint64, returnValueData, returnValueSize uint32) uint32 { w := (*WazeroInstance)(unsafe.Pointer(uintptr(hostInstancePtr))) return uint32(copyHostStringIntoGuest(ctx, mod, w.qre.query, returnValueData, returnValueSize)) } diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_tools.go b/go/vt/vttablet/tabletserver/wasm_plugin_tools.go index a20c5f2170..c30afc31a6 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_tools.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_tools.go @@ -5,7 +5,7 @@ import ( "fmt" ) -func getQueryByName(wasmBinaryName string) string { +func generateWasmQueryByName(wasmBinaryName string) string { return fmt.Sprintf("select * from %s where name = '%s'", WasmBinaryTableName, wasmBinaryName) } diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_types.go b/go/vt/vttablet/tabletserver/wasm_plugin_types.go index fcabbe08c6..ac88e8b101 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_types.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_types.go @@ -5,19 +5,15 @@ import "vitess.io/vitess/go/sqltypes" type WasmVM interface { GetRuntimeType() string InitRuntime() error - GetWasmModule(key string) (bool, WasmModule) - SetWasmModule(key string, wasmModule WasmModule) - InitWasmModule(key string, wasmBytes []byte) (WasmModule, error) - CompileWasmModule(wasmBytes []byte) (WasmModule, error) - ClearWasmModule(key string) + GetWasmModule(filterName string) (bool, WasmModule) + InitWasmModule(filterName string, wasmBytes []byte) (WasmModule, error) + ClearWasmModule(filterName string) Close() error } type WasmModule interface { NewInstance(qre *QueryExecutor) (WasmInstance, error) Close() error - GetModuleSharingVariables() map[string][]byte - SetModuleSharingVariables(map[string][]byte) } type WasmInstance interface { diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wasmer.go b/go/vt/vttablet/tabletserver/wasm_plugin_wasmer.go deleted file mode 100644 index b8b4491284..0000000000 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wasmer.go +++ /dev/null @@ -1,191 +0,0 @@ -//go:build ignore -// +build ignore - -package tabletserver - -import ( - "encoding/binary" - "encoding/json" - "sync" - - "vitess.io/vitess/go/vt/log" - - "github.com/wasmerio/wasmer-go/wasmer" -) - -func initWasmerRuntime(qe *QueryEngine) *WasmerRuntime { - engine := wasmer.NewEngine() - store := wasmer.NewStore(engine) - return &WasmerRuntime{store: store, instances: make(map[string]WasmInstance), qe: qe} -} - -type WasmerRuntime struct { - mu sync.Mutex - store *wasmer.Store - instances map[string]WasmInstance - qe *QueryEngine -} - -func (*WasmerRuntime) GetRuntimeType() string { - return WASMER -} - -func (w *WasmerRuntime) GetModule(key string) (bool, WasmModule) { - log.Error("WasmerRuntime.GetWasmModule is not implemented") - return false, nil -} - -func (w *WasmerRuntime) InitOrGetWasmModule(key string, wasmBytes []byte) (WasmInstance, error) { - w.mu.Lock() - defer w.mu.Unlock() - instance, exist := w.instances[key] - if exist { - return instance, nil - } - instance, err := w.initWasmInstance(wasmBytes) - if err != nil { - return nil, err - } - w.instances[key] = instance - return instance, nil -} - -func (w *WasmerRuntime) ClearWasmInstance() { - w.mu.Lock() - defer w.mu.Unlock() - w.instances = make(map[string]WasmInstance) -} - -func (w *WasmerRuntime) initWasmInstance(wasmBytes []byte) (WasmInstance, error) { - // Compiles the modules - module, err := wasmer.NewModule(w.store, wasmBytes) - if err != nil { - return nil, err - } - - // todo,let user decide which config to use? i think it's not necessary - // Instantiates the modules - wasiEnv, err := wasmer.NewWasiStateBuilder("wasi-program"). - // Choose according to your actual situation - // Argument("--foo"). - // Environment("ABC", "DEF"). - // MapDirectory("./", "."). - Finalize() - if err != nil { - return nil, err - } - importObject, err := wasiEnv.GenerateImportObject(w.store, module) - if err != nil { - return nil, err - } - - instance, err := wasmer.NewInstance(module, importObject) - if err != nil { - return nil, err - } - return &WasmerInstance{instance: instance}, nil -} - -type WasmerInstance struct { - instance *wasmer.Instance -} - -func (ins *WasmerInstance) RunWASMPlugin() error { - // todo fix wasmer - - //writeBuf, _ := ins.instance.Exports.GetFunction("writeBuf") - //readBuf, _ := ins.instance.Exports.GetFunction("readBuf") - //clearBuf, _ := ins.instance.Exports.GetFunction("clearBuf") - //getBufLen, _ := ins.instance.Exports.GetFunction("getBufLen") - //wasmerGuestFunc, _ := ins.instance.Exports.GetFunction("wasmerGuestFunc") - // - //err := sendStructToWASI(args, clearBuf, writeBuf) - //if err != nil { - // return nil, err - //} - //_, err = wasmerGuestFunc() - //if err != nil { - // return nil, err - //} - - return nil -} - -func (ins *WasmerInstance) RunWASMPluginAfter(args *WasmPluginExchangeAfter) (*WasmPluginExchangeAfter, error) { - // todo - return nil, nil -} - -func (ins *WasmerInstance) Close() error { - ins.instance.Close() - return nil -} - -func sendStructToWASI(args *WasmPluginExchange, clearBuf, writeBuf wasmer.NativeFunction) error { - _, err := clearBuf() - if err != nil { - return err - } - bytes, err := json.Marshal(args) - if err != nil { - return err - } - - bytesCount := len(bytes) - overBytesCount := bytesCount % 8 - bytesCount = bytesCount - overBytesCount - for idx := 0; idx < bytesCount; idx += 8 { - data := binary.LittleEndian.Uint64(bytes[idx : idx+8]) - _, err = writeBuf(int64(data), int64(8)) - if err != nil { - return err - } - } - - last8Bytes := make([]byte, 8) - for idx := 0; idx < overBytesCount; idx++ { - last8Bytes[idx] = bytes[bytesCount+idx] - } - - data := binary.LittleEndian.Uint64(last8Bytes) - _, err = writeBuf(int64(data), int64(overBytesCount)) - if err != nil { - return err - } - return nil -} - -func getStructFromWASI(getBufLen, readBuf wasmer.NativeFunction) (*WasmPluginExchange, error) { - bytesCountI, err := getBufLen() - if err != nil { - return nil, err - } - bytesCount := bytesCountI.(int64) - - overBytesCount := bytesCount % 8 - bytesCount = bytesCount - overBytesCount - bytes := make([]byte, bytesCount) - for idx := int64(0); idx < bytesCount; idx += 8 { - data, err := readBuf(idx) - if err != nil { - return nil, err - } - binary.LittleEndian.PutUint64(bytes[idx:], uint64(data.(int64))) - } - - data, err := readBuf(bytesCount) - if err != nil { - return nil, err - } - lastBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(lastBytes, uint64(data.(int64))) - lastBytes = lastBytes[:overBytesCount] - bytes = append(bytes, lastBytes...) - - rst := WasmPluginExchange{} - err = json.Unmarshal(bytes, &rst) - if err != nil { - return nil, err - } - return &rst, nil -} diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index fcb837ed1e..0131f61db7 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -18,19 +18,13 @@ type WazeroVM struct { ctx context.Context runtime wazero.Runtime modules map[string]WasmModule - - globalMu sync.Mutex - - hostSharedVariables map[string][]byte - hostSharedMu sync.Mutex } func initWazeroVM() *WazeroVM { ctx := context.Background() w := &WazeroVM{ - ctx: ctx, - modules: make(map[string]WasmModule), - hostSharedVariables: make(map[string][]byte), + ctx: ctx, + modules: make(map[string]WasmModule), } w.InitRuntime() return w @@ -67,20 +61,6 @@ func exportHostABIV1(ctx context.Context, wazeroRuntime *WazeroVM) error { }). Export("ErrorLogOnHost"). NewFunctionBuilder(). - WithParameterNames("scope", "hostModulePtr", "keyPtr", "keySize", "valuePtr", "valueSize"). - WithResultNames("callStatus"). - WithFunc(func(ctx context.Context, mod api.Module, scope uint32, hostModulePtr uint64, keyPtr, keySize, valuePtr, valueSize uint32) uint32 { - return SetValueByKeyOnHost(ctx, mod, wazeroRuntime, scope, hostModulePtr, keyPtr, keySize, valuePtr, valueSize) - }). - Export("SetValueByKeyOnHost"). - NewFunctionBuilder(). - WithParameterNames("scope", "hostModulePtr", "keyPtr", "keySize", "returnValuePtr", "returnValueSize"). - WithResultNames("callStatus"). - WithFunc(func(ctx context.Context, mod api.Module, scope uint32, hostModulePtr uint64, keyPtr, keySize, returnValuePtr, returnValueSize uint32) uint32 { - return GetValueByKeyOnHost(ctx, mod, wazeroRuntime, scope, hostModulePtr, keyPtr, keySize, returnValuePtr, returnValueSize) - }). - Export("GetValueByKeyOnHost"). - NewFunctionBuilder(). WithParameterNames("hostInstancePtr", "returnQueryValueData", "returnQueryValueSize"). WithResultNames("callStatus"). @@ -97,18 +77,6 @@ func exportHostABIV1(ctx context.Context, wazeroRuntime *WazeroVM) error { }). Export("SetQueryOnHost"). NewFunctionBuilder(). - WithParameterNames("scope", "hostModulePtr"). - WithFunc(func(ctx context.Context, mod api.Module, scope uint32, hostModulePtr uint64) { - LockOnHost(wazeroRuntime, scope, hostModulePtr) - }). - Export("LockOnHost"). - NewFunctionBuilder(). - WithParameterNames("scope", "hostModulePtr"). - WithFunc(func(ctx context.Context, mod api.Module, scope uint32, hostModulePtr uint64) { - UnlockOnHost(wazeroRuntime, scope, hostModulePtr) - }). - Export("UnlockOnHost"). - NewFunctionBuilder(). WithParameterNames("hostInstancePtr", "errMessagePtr", "errMessageSize"). WithResultNames("callStatus"). WithFunc(func(ctx context.Context, mod api.Module, hostInstancePtr uint64, errMessagePtr, errMessageSize uint32) uint32 { @@ -147,62 +115,51 @@ func (*WazeroVM) GetRuntimeType() string { } func (w *WazeroVM) InitRuntime() error { - w.runtime = wazero.NewRuntimeWithConfig(w.ctx, wazero.NewRuntimeConfig().WithCompilationCache(wazero.NewCompilationCache())) + runtimeConfig := wazero.NewRuntimeConfig(). + WithCompilationCache(wazero.NewCompilationCache()). + WithCloseOnContextDone(true). + WithMemoryLimitPages(16 * 10) //64KB each page, 10 * 16pages = 10MB + + w.runtime = wazero.NewRuntimeWithConfig(w.ctx, runtimeConfig) wasi_snapshot_preview1.MustInstantiate(w.ctx, w.runtime) return exportHostABIV1(w.ctx, w) } -func (w *WazeroVM) ClearWasmModule(key string) { +func (w *WazeroVM) ClearWasmModule(filterName string) { w.mu.Lock() defer w.mu.Unlock() - if mod, exist := w.modules[key]; exist { + if mod, exist := w.modules[filterName]; exist { defer mod.Close() } - delete(w.modules, key) + delete(w.modules, filterName) } -func (w *WazeroVM) GetWasmModule(key string) (bool, WasmModule) { +func (w *WazeroVM) GetWasmModule(filterName string) (bool, WasmModule) { w.mu.Lock() defer w.mu.Unlock() - module, exist := w.modules[key] + module, exist := w.modules[filterName] return exist, module } -func (w *WazeroVM) SetWasmModule(key string, wasmModule WasmModule) { - w.mu.Lock() - defer w.mu.Unlock() - // the goal is to change the compiled module bytes, and keep module variables map - oldWasmModule, exist := w.modules[key] - if exist { - oldMap := oldWasmModule.GetModuleSharingVariables() - wasmModule.SetModuleSharingVariables(oldMap) - } - w.modules[key] = wasmModule -} - -func (w *WazeroVM) InitWasmModule(key string, wasmBytes []byte) (WasmModule, error) { +func (w *WazeroVM) InitWasmModule(filterName string, wasmBytes []byte) (WasmModule, error) { w.mu.Lock() defer w.mu.Unlock() - module, exist := w.modules[key] + module, exist := w.modules[filterName] if exist { return module, nil } - module, err := w.CompileWasmModule(wasmBytes) + compiled, err := w.runtime.CompileModule(w.ctx, wasmBytes) if err != nil { return nil, err } - w.modules[key] = module - return module, nil -} - -func (w *WazeroVM) CompileWasmModule(wasmBytes []byte) (WasmModule, error) { - module, err := w.runtime.CompileModule(w.ctx, wasmBytes) - if err != nil { - return nil, err + module = &WazeroModule{ + filterName: filterName, + compliedModule: compiled, + wazeroRuntime: w, } - - return &WazeroModule{compliedModule: module, wazeroRuntime: w, moduleSharingVariables: make(map[string][]byte)}, nil + w.modules[filterName] = module + return module, nil } func (w *WazeroVM) Close() error { @@ -211,26 +168,9 @@ func (w *WazeroVM) Close() error { } type WazeroModule struct { + filterName string wazeroRuntime *WazeroVM compliedModule wazero.CompiledModule - - mu sync.Mutex - moduleSharingVariables map[string][]byte - - moduleMu sync.Mutex - tmp int -} - -func (mod *WazeroModule) GetModuleSharingVariables() map[string][]byte { - mod.mu.Lock() - defer mod.mu.Unlock() - return mod.moduleSharingVariables -} - -func (mod *WazeroModule) SetModuleSharingVariables(m map[string][]byte) { - mod.mu.Lock() - defer mod.mu.Unlock() - mod.moduleSharingVariables = m } func (mod *WazeroModule) NewInstance(qre *QueryExecutor) (WasmInstance, error) { @@ -240,7 +180,8 @@ func (mod *WazeroModule) NewInstance(qre *QueryExecutor) (WasmInstance, error) { if mod.compliedModule == nil { return nil, fmt.Errorf("compliedModule is nil in NewInstance") } - instance, err := mod.wazeroRuntime.runtime.InstantiateModule(mod.wazeroRuntime.ctx, mod.compliedModule, wazero.NewModuleConfig().WithName("")) + config := wazero.NewModuleConfig().WithName("") + instance, err := mod.wazeroRuntime.runtime.InstantiateModule(mod.wazeroRuntime.ctx, mod.compliedModule, config) if err != nil { return nil, err } @@ -264,8 +205,12 @@ type WazeroInstance struct { func (ins *WazeroInstance) RunWASMPlugin() error { ctx := context.Background() + defer ins.qre.tsv.qe.actionStats.FilterWasmMemorySize.Set([]string{ins.module.filterName, "Before"}, int64(ins.instance.Memory().Size())) wazeroGuestFunc := ins.instance.ExportedFunction("RunBeforeExecutionOnGuest") + if wazeroGuestFunc == nil { + return fmt.Errorf("Wasm Plugin ABI version is not compatible, missing RunBeforeExecutionOnGuest function in wasm module") + } instancePtr := uint64(uintptr(unsafe.Pointer(ins))) modulePtr := uint64(uintptr(unsafe.Pointer(ins.module))) @@ -281,6 +226,7 @@ func (ins *WazeroInstance) RunWASMPlugin() error { func (ins *WazeroInstance) RunWASMPluginAfter() error { ctx := context.Background() + defer ins.qre.tsv.qe.actionStats.FilterWasmMemorySize.Set([]string{ins.module.filterName, "After"}, int64(ins.instance.Memory().Size())) wazeroGuestFunc := ins.instance.ExportedFunction("RunAfterExecutionOnGuest") @@ -322,18 +268,16 @@ func copyHostBytesIntoGuest(ctx context.Context, mod api.Module, bytes []byte, w return StatusBadArgument } - //todo wasm: why not use 'malloc' here? - alloc := mod.ExportedFunction("proxy_on_memory_allocate") + // call 'malloc' to allocate memory in guest, guest need to free it + alloc := mod.ExportedFunction("malloc") res, err := alloc.Call(ctx, uint64(size)) if err != nil { return StatusInternalFailure } - buf, ok := mod.Memory().Read(uint32(res[0]), uint32(size)) + ok := mod.Memory().Write(uint32(res[0]), bytes) if !ok { return StatusInternalFailure } - - copy(buf, bytes) ok = mod.Memory().WriteUint32Le(wasmPtrPtr, uint32(res[0])) if !ok { return StatusInternalFailure diff --git a/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go b/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go index 38d479571e..3ba25f0e6e 100644 --- a/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go +++ b/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go @@ -200,7 +200,7 @@ func (qe *QueryEngine) HandleDropFilter(stmt *sqlparser.DropWescaleFilter) (*sql return rst, nil } -func (qe *QueryEngine) removeFilterMemoryData(name, actionArgs string, action rules.Action) { +func (qe *QueryEngine) removeFilterMemoryData(filterName, actionArgs string, action rules.Action) { switch action { case rules.QRWasmPlugin: query, err := customrule.GetSelectByActionArgsSQL(actionArgs) @@ -216,11 +216,11 @@ func (qe *QueryEngine) removeFilterMemoryData(name, actionArgs string, action ru a := &WasmPluginActionArgs{} tmp, _ := a.Parse(actionArgs) a = tmp.(*WasmPluginActionArgs) - qe.wasmPluginController.VM.ClearWasmModule(a.WasmBinaryName) + qe.wasmPluginController.VM.ClearWasmModule(filterName) } case rules.QRConcurrencyControl: - qe.concurrencyController.DeleteQueue(name) + qe.concurrencyController.DeleteQueue(filterName) } } @@ -316,20 +316,12 @@ func CheckAndFormatActionArgs(qe *QueryEngine, filerName, actionType, actionArgs if err != nil { return "", err } - wasmModule, err := qe.wasmPluginController.VM.CompileWasmModule(bytes) + qe.wasmPluginController.VM.ClearWasmModule(filerName) + _, err = qe.wasmPluginController.VM.InitWasmModule(filerName, bytes) if err != nil { return "", fmt.Errorf("err when compiling wasm moulde %v", err) } - // whether create or alter, just set - qe.wasmPluginController.VM.SetWasmModule(wasmPluginArgs.WasmBinaryName, wasmModule) - return FormatUserInputStr(actionArgs), nil - case rules.QRSkipFilter: - skipFilterArgs := &SkipFilterActionArgs{} - _, err := skipFilterArgs.Parse(actionArgs) - if err != nil { - return "", err - } return FormatUserInputStr(actionArgs), nil default: if actionArgs != "" {