From e3717e26db38767bbbbee16de583317d25d63f66 Mon Sep 17 00:00:00 2001 From: earayu Date: Tue, 19 Nov 2024 23:19:36 +0800 Subject: [PATCH 01/14] feat: update wasm plugin --- .../tabletserver/wasm_plugin_controller.go | 31 +-- .../wasm_plugin_host_functions.go | 3 +- .../tabletserver/wasm_plugin_wasmer.go | 191 ------------------ .../tabletserver/wasm_plugin_wazero.go | 3 +- 4 files changed, 21 insertions(+), 207 deletions(-) delete mode 100644 go/vt/vttablet/tabletserver/wasm_plugin_wasmer.go diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_controller.go b/go/vt/vttablet/tabletserver/wasm_plugin_controller.go index 5f853b25f7..60567b63d8 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_controller.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_controller.go @@ -62,28 +62,33 @@ func (wpc *WasmPluginController) GetWasmBytesByBinaryName(ctx context.Context, w return nil, fmt.Errorf("get wasm binary by name %s failed : qr len is %v instead of 1", wasmBinaryName, len(qr.Named().Rows)) } + bytes, err := qr.Named().Rows[0].ToBytes("data") + if err != nil { + return nil, fmt.Errorf("get wasm binary by name %s failed : uncompress data error: %v", wasmBinaryName, err) + } + compressAlgorithm := qr.Named().Rows[0].AsString("compress_algorithm", "") - if compressAlgorithm != wasmBinaryCompressAlgorithm { + if compressAlgorithm != "" && compressAlgorithm != wasmBinaryCompressAlgorithm { return nil, fmt.Errorf("get wasm binary by name %s failed : compress algorithm is %v instead of %v", wasmBinaryName, compressAlgorithm, wasmBinaryCompressAlgorithm) } - compressedBytes, err := qr.Named().Rows[0].ToBytes("data") - if err != nil { - return nil, fmt.Errorf("get wasm binary by name %s failed : uncompress data error: %v", wasmBinaryName, err) + if compressAlgorithm == wasmBinaryCompressAlgorithm { + bytes, err = UnCompressByBZip2(bytes) + if err != nil { + return nil, fmt.Errorf("get wasm binary by name %s failed : uncompress data error: %v", wasmBinaryName, err) + } } - originalBytes, err := UnCompressByBZip2(compressedBytes) - if err != nil { - return nil, fmt.Errorf("get wasm binary by name %s failed : uncompress data error: %v", wasmBinaryName, err) - } - - hash := CalcMd5String32(originalBytes) hashInTable := qr.Named().Rows[0].AsString("hash_before_compress", "") - if hash != hashInTable { - return nil, fmt.Errorf("get wasm binary by name %s failed : hash is not equal", wasmBinaryName) + if hashInTable != "" { + hash := CalcMd5String32(bytes) + if hash != hashInTable { + return nil, fmt.Errorf("get wasm binary by name %s failed : hash is not equal", wasmBinaryName) + } } - return originalBytes, nil + + return bytes, nil } func initWasmVM() WasmVM { diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go b/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go index 6a91be4479..907d8368e6 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go @@ -187,8 +187,7 @@ func moduleUnlockOnHost(hostModulePtr uint64) { module.moduleMu.Unlock() } -func GetQueryOnHost(ctx context.Context, mod api.Module, hostInstancePtr uint64, returnValueData, - returnValueSize uint32) uint32 { +func GetQueryOnHost(ctx context.Context, mod api.Module, hostInstancePtr uint64, returnValueData, returnValueSize uint32) uint32 { w := (*WazeroInstance)(unsafe.Pointer(uintptr(hostInstancePtr))) return uint32(copyHostStringIntoGuest(ctx, mod, w.qre.query, returnValueData, returnValueSize)) } diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wasmer.go b/go/vt/vttablet/tabletserver/wasm_plugin_wasmer.go deleted file mode 100644 index b8b4491284..0000000000 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wasmer.go +++ /dev/null @@ -1,191 +0,0 @@ -//go:build ignore -// +build ignore - -package tabletserver - -import ( - "encoding/binary" - "encoding/json" - "sync" - - "vitess.io/vitess/go/vt/log" - - "github.com/wasmerio/wasmer-go/wasmer" -) - -func initWasmerRuntime(qe *QueryEngine) *WasmerRuntime { - engine := wasmer.NewEngine() - store := wasmer.NewStore(engine) - return &WasmerRuntime{store: store, instances: make(map[string]WasmInstance), qe: qe} -} - -type WasmerRuntime struct { - mu sync.Mutex - store *wasmer.Store - instances map[string]WasmInstance - qe *QueryEngine -} - -func (*WasmerRuntime) GetRuntimeType() string { - return WASMER -} - -func (w *WasmerRuntime) GetModule(key string) (bool, WasmModule) { - log.Error("WasmerRuntime.GetWasmModule is not implemented") - return false, nil -} - -func (w *WasmerRuntime) InitOrGetWasmModule(key string, wasmBytes []byte) (WasmInstance, error) { - w.mu.Lock() - defer w.mu.Unlock() - instance, exist := w.instances[key] - if exist { - return instance, nil - } - instance, err := w.initWasmInstance(wasmBytes) - if err != nil { - return nil, err - } - w.instances[key] = instance - return instance, nil -} - -func (w *WasmerRuntime) ClearWasmInstance() { - w.mu.Lock() - defer w.mu.Unlock() - w.instances = make(map[string]WasmInstance) -} - -func (w *WasmerRuntime) initWasmInstance(wasmBytes []byte) (WasmInstance, error) { - // Compiles the modules - module, err := wasmer.NewModule(w.store, wasmBytes) - if err != nil { - return nil, err - } - - // todo,let user decide which config to use? i think it's not necessary - // Instantiates the modules - wasiEnv, err := wasmer.NewWasiStateBuilder("wasi-program"). - // Choose according to your actual situation - // Argument("--foo"). - // Environment("ABC", "DEF"). - // MapDirectory("./", "."). - Finalize() - if err != nil { - return nil, err - } - importObject, err := wasiEnv.GenerateImportObject(w.store, module) - if err != nil { - return nil, err - } - - instance, err := wasmer.NewInstance(module, importObject) - if err != nil { - return nil, err - } - return &WasmerInstance{instance: instance}, nil -} - -type WasmerInstance struct { - instance *wasmer.Instance -} - -func (ins *WasmerInstance) RunWASMPlugin() error { - // todo fix wasmer - - //writeBuf, _ := ins.instance.Exports.GetFunction("writeBuf") - //readBuf, _ := ins.instance.Exports.GetFunction("readBuf") - //clearBuf, _ := ins.instance.Exports.GetFunction("clearBuf") - //getBufLen, _ := ins.instance.Exports.GetFunction("getBufLen") - //wasmerGuestFunc, _ := ins.instance.Exports.GetFunction("wasmerGuestFunc") - // - //err := sendStructToWASI(args, clearBuf, writeBuf) - //if err != nil { - // return nil, err - //} - //_, err = wasmerGuestFunc() - //if err != nil { - // return nil, err - //} - - return nil -} - -func (ins *WasmerInstance) RunWASMPluginAfter(args *WasmPluginExchangeAfter) (*WasmPluginExchangeAfter, error) { - // todo - return nil, nil -} - -func (ins *WasmerInstance) Close() error { - ins.instance.Close() - return nil -} - -func sendStructToWASI(args *WasmPluginExchange, clearBuf, writeBuf wasmer.NativeFunction) error { - _, err := clearBuf() - if err != nil { - return err - } - bytes, err := json.Marshal(args) - if err != nil { - return err - } - - bytesCount := len(bytes) - overBytesCount := bytesCount % 8 - bytesCount = bytesCount - overBytesCount - for idx := 0; idx < bytesCount; idx += 8 { - data := binary.LittleEndian.Uint64(bytes[idx : idx+8]) - _, err = writeBuf(int64(data), int64(8)) - if err != nil { - return err - } - } - - last8Bytes := make([]byte, 8) - for idx := 0; idx < overBytesCount; idx++ { - last8Bytes[idx] = bytes[bytesCount+idx] - } - - data := binary.LittleEndian.Uint64(last8Bytes) - _, err = writeBuf(int64(data), int64(overBytesCount)) - if err != nil { - return err - } - return nil -} - -func getStructFromWASI(getBufLen, readBuf wasmer.NativeFunction) (*WasmPluginExchange, error) { - bytesCountI, err := getBufLen() - if err != nil { - return nil, err - } - bytesCount := bytesCountI.(int64) - - overBytesCount := bytesCount % 8 - bytesCount = bytesCount - overBytesCount - bytes := make([]byte, bytesCount) - for idx := int64(0); idx < bytesCount; idx += 8 { - data, err := readBuf(idx) - if err != nil { - return nil, err - } - binary.LittleEndian.PutUint64(bytes[idx:], uint64(data.(int64))) - } - - data, err := readBuf(bytesCount) - if err != nil { - return nil, err - } - lastBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(lastBytes, uint64(data.(int64))) - lastBytes = lastBytes[:overBytesCount] - bytes = append(bytes, lastBytes...) - - rst := WasmPluginExchange{} - err = json.Unmarshal(bytes, &rst) - if err != nil { - return nil, err - } - return &rst, nil -} diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index fcb837ed1e..1f0118ab95 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -322,12 +322,13 @@ func copyHostBytesIntoGuest(ctx context.Context, mod api.Module, bytes []byte, w return StatusBadArgument } - //todo wasm: why not use 'malloc' here? + // call 'proxy_on_memory_allocate' to allocate memory in guest, 'proxy_on_memory_allocate' is a exported function in guest alloc := mod.ExportedFunction("proxy_on_memory_allocate") res, err := alloc.Call(ctx, uint64(size)) if err != nil { return StatusInternalFailure } + // res[0] is the pointer to the allocated memory in guest buf, ok := mod.Memory().Read(uint32(res[0]), uint32(size)) if !ok { return StatusInternalFailure From 617d7d9cbeba2ce957452db4ce8f9b2acfc9ff09 Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 11:27:43 +0800 Subject: [PATCH 02/14] feat: update wasm plugin --- go/vt/vttablet/tabletserver/wasm_plugin_wazero.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index 1f0118ab95..f591dba973 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -266,6 +266,9 @@ func (ins *WazeroInstance) RunWASMPlugin() error { ctx := context.Background() wazeroGuestFunc := ins.instance.ExportedFunction("RunBeforeExecutionOnGuest") + if wazeroGuestFunc == nil { + return fmt.Errorf("Wasm Plugin ABI version is not compatible, missing RunBeforeExecutionOnGuest function in wasm module") + } instancePtr := uint64(uintptr(unsafe.Pointer(ins))) modulePtr := uint64(uintptr(unsafe.Pointer(ins.module))) From 39d947588e7d6bd14a41a3eb6cedb9f4dda66c4d Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 14:06:35 +0800 Subject: [PATCH 03/14] feat: update wasm plugin --- .../tabletserver/wasm_plugin_common.go | 14 +- .../tabletserver/wasm_plugin_controller.go | 4 +- .../wasm_plugin_host_functions.go | 144 ------------------ .../tabletserver/wasm_plugin_types.go | 2 - .../tabletserver/wasm_plugin_wazero.go | 62 +------- 5 files changed, 5 insertions(+), 221 deletions(-) diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_common.go b/go/vt/vttablet/tabletserver/wasm_plugin_common.go index 4be6b19e25..1d0468151a 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_common.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_common.go @@ -1,19 +1,7 @@ package tabletserver -import "unsafe" +const AbiVersion = "v1alpha2" -const AbiVersion = "v1alpha1" - -const WASMER = "wamser-go" const WAZERO = "wazero" const WasmBinaryTableName = "mysql.wasm_binary" - -func ptrToString(ptr uint32, size uint32) string { - return unsafe.String((*byte)(unsafe.Pointer(uintptr(ptr))), size) -} - -func stringToPtr(s string) (uint32, uint32) { - ptr := unsafe.Pointer(unsafe.StringData(s)) - return uint32(uintptr(ptr)), uint32(len(s)) -} diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_controller.go b/go/vt/vttablet/tabletserver/wasm_plugin_controller.go index 60567b63d8..dedf11c731 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_controller.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_controller.go @@ -19,7 +19,7 @@ var ( ) func registerWasmFlags(fs *pflag.FlagSet) { - fs.StringVar(&RuntimeType, "wasm_runtime_type", WAZERO, "the runtime for wasm plugin. default is wazero.") + fs.StringVar(&RuntimeType, "wasm_runtime_type", WAZERO, "the runtime for wasm plugin. default is wazero. options: [wazero]") } func init() { @@ -93,8 +93,6 @@ func (wpc *WasmPluginController) GetWasmBytesByBinaryName(ctx context.Context, w func initWasmVM() WasmVM { switch RuntimeType { - //case WASMER: - // return initWasmerRuntime(qe) case WAZERO: return initWazeroVM() default: diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go b/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go index 907d8368e6..441df68f1f 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_host_functions.go @@ -43,150 +43,6 @@ func ErrorLogOnHost(ctx context.Context, mod api.Module, ptr, size uint32) uint3 return uint32(StatusOK) } -const ( - SharedScopeMODULE = uint32(0) - SharedScopeTABLET = uint32(1) -) - -func SetValueByKeyOnHost(ctx context.Context, mod api.Module, wazeroRuntime *WazeroVM, scope uint32, hostModulePtr uint64, keyPtr, keySize, valuePtr, valueSize uint32) uint32 { - switch scope { - case SharedScopeMODULE: - return setModuleValueByKeyOnHost(ctx, mod, hostModulePtr, keyPtr, keySize, valuePtr, valueSize) - case SharedScopeTABLET: - return setTabletValueByKeyOnHost(ctx, mod, wazeroRuntime, keyPtr, keySize, valuePtr, valueSize) - default: - log.Errorf("unknown scope %d", scope) - return uint32(StatusBadArgument) - } -} - -func GetValueByKeyOnHost(ctx context.Context, mod api.Module, wazeroRuntime *WazeroVM, scope uint32, hostModulePtr uint64, keyPtr, keySize, returnValuePtr, returnValueSize uint32) uint32 { - switch scope { - case SharedScopeMODULE: - return getModuleValueByKeyOnHost(ctx, mod, hostModulePtr, keyPtr, keySize, returnValuePtr, returnValueSize) - case SharedScopeTABLET: - return getTabletValueByKeyOnHost(ctx, mod, wazeroRuntime, keyPtr, keySize, returnValuePtr, returnValueSize) - default: - log.Errorf("unknown scope %d", scope) - return uint32(StatusBadArgument) - } -} - -func setTabletValueByKeyOnHost(ctx context.Context, mod api.Module, wazeroRuntime *WazeroVM, keyPtr, keySize, valuePtr, valueSize uint32) uint32 { - wazeroRuntime.mu.Lock() - defer wazeroRuntime.mu.Unlock() - - keyBytes, ok := mod.Memory().Read(keyPtr, keySize) - if !ok { - return uint32(StatusInternalFailure) - } - key := string(keyBytes) - - valueBytes, ok := mod.Memory().Read(valuePtr, valueSize) - if !ok { - return uint32(StatusInternalFailure) - } - - wazeroRuntime.hostSharedVariables[key] = valueBytes - return uint32(StatusOK) -} - -func getTabletValueByKeyOnHost(ctx context.Context, mod api.Module, wazeroRuntime *WazeroVM, keyPtr, keySize, returnValuePtr, returnValueSize uint32) uint32 { - wazeroRuntime.mu.Lock() - defer wazeroRuntime.mu.Unlock() - - keyBytes, ok := mod.Memory().Read(keyPtr, keySize) - if !ok { - return uint32(StatusInternalFailure) - } - key := string(keyBytes) - - _, exist := wazeroRuntime.hostSharedVariables[key] - if !exist { - return uint32(StatusNotFound) - } - return uint32(copyHostBytesIntoGuest(ctx, mod, wazeroRuntime.hostSharedVariables[key], returnValuePtr, returnValueSize)) -} - -func setModuleValueByKeyOnHost(ctx context.Context, mod api.Module, hostModulePtr uint64, keyPtr, keySize, valuePtr, valueSize uint32) uint32 { - module := (*WazeroModule)(unsafe.Pointer(uintptr(hostModulePtr))) - module.mu.Lock() - defer module.mu.Unlock() - - keyBytes, ok := mod.Memory().Read(keyPtr, keySize) - if !ok { - return uint32(StatusInternalFailure) - } - key := string(keyBytes) - - valueBytes, ok := mod.Memory().Read(valuePtr, valueSize) - if !ok { - return uint32(StatusInternalFailure) - } - - module.moduleSharingVariables[key] = valueBytes - return uint32(StatusOK) -} - -func getModuleValueByKeyOnHost(ctx context.Context, mod api.Module, hostModulePtr uint64, keyPtr, keySize, returnValuePtr, returnValueSize uint32) uint32 { - module := (*WazeroModule)(unsafe.Pointer(uintptr(hostModulePtr))) - module.mu.Lock() - defer module.mu.Unlock() - - keyBytes, ok := mod.Memory().Read(keyPtr, keySize) - if !ok { - return uint32(StatusInternalFailure) - } - key := string(keyBytes) - - _, exist := module.moduleSharingVariables[key] - if !exist { - return uint32(StatusNotFound) - } - - return uint32(copyHostBytesIntoGuest(ctx, mod, module.moduleSharingVariables[key], returnValuePtr, returnValueSize)) -} - -func LockOnHost(wazeroRuntime *WazeroVM, scope uint32, hostModulePtr uint64) { - switch scope { - case SharedScopeMODULE: - moduleLockOnHost(hostModulePtr) - case SharedScopeTABLET: - tabletLockOnHost(wazeroRuntime) - default: - log.Errorf("unknown scope %d", scope) - } -} - -func UnlockOnHost(wazeroRuntime *WazeroVM, scope uint32, hostModulePtr uint64) { - switch scope { - case SharedScopeMODULE: - moduleUnlockOnHost(hostModulePtr) - case SharedScopeTABLET: - tabletUnLockOnHost(wazeroRuntime) - default: - log.Errorf("unknown scope %d", scope) - } -} - -func tabletLockOnHost(wazeroRuntime *WazeroVM) { - wazeroRuntime.globalMu.Lock() -} - -func tabletUnLockOnHost(wazeroRuntime *WazeroVM) { - wazeroRuntime.globalMu.Unlock() -} - -func moduleLockOnHost(hostModulePtr uint64) { - module := (*WazeroModule)(unsafe.Pointer(uintptr(hostModulePtr))) - module.moduleMu.Lock() -} - -func moduleUnlockOnHost(hostModulePtr uint64) { - module := (*WazeroModule)(unsafe.Pointer(uintptr(hostModulePtr))) - module.moduleMu.Unlock() -} - func GetQueryOnHost(ctx context.Context, mod api.Module, hostInstancePtr uint64, returnValueData, returnValueSize uint32) uint32 { w := (*WazeroInstance)(unsafe.Pointer(uintptr(hostInstancePtr))) return uint32(copyHostStringIntoGuest(ctx, mod, w.qre.query, returnValueData, returnValueSize)) diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_types.go b/go/vt/vttablet/tabletserver/wasm_plugin_types.go index fcabbe08c6..0d2eea4822 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_types.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_types.go @@ -16,8 +16,6 @@ type WasmVM interface { type WasmModule interface { NewInstance(qre *QueryExecutor) (WasmInstance, error) Close() error - GetModuleSharingVariables() map[string][]byte - SetModuleSharingVariables(map[string][]byte) } type WasmInstance interface { diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index f591dba973..62206d2f60 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -18,19 +18,13 @@ type WazeroVM struct { ctx context.Context runtime wazero.Runtime modules map[string]WasmModule - - globalMu sync.Mutex - - hostSharedVariables map[string][]byte - hostSharedMu sync.Mutex } func initWazeroVM() *WazeroVM { ctx := context.Background() w := &WazeroVM{ - ctx: ctx, - modules: make(map[string]WasmModule), - hostSharedVariables: make(map[string][]byte), + ctx: ctx, + modules: make(map[string]WasmModule), } w.InitRuntime() return w @@ -67,20 +61,6 @@ func exportHostABIV1(ctx context.Context, wazeroRuntime *WazeroVM) error { }). Export("ErrorLogOnHost"). NewFunctionBuilder(). - WithParameterNames("scope", "hostModulePtr", "keyPtr", "keySize", "valuePtr", "valueSize"). - WithResultNames("callStatus"). - WithFunc(func(ctx context.Context, mod api.Module, scope uint32, hostModulePtr uint64, keyPtr, keySize, valuePtr, valueSize uint32) uint32 { - return SetValueByKeyOnHost(ctx, mod, wazeroRuntime, scope, hostModulePtr, keyPtr, keySize, valuePtr, valueSize) - }). - Export("SetValueByKeyOnHost"). - NewFunctionBuilder(). - WithParameterNames("scope", "hostModulePtr", "keyPtr", "keySize", "returnValuePtr", "returnValueSize"). - WithResultNames("callStatus"). - WithFunc(func(ctx context.Context, mod api.Module, scope uint32, hostModulePtr uint64, keyPtr, keySize, returnValuePtr, returnValueSize uint32) uint32 { - return GetValueByKeyOnHost(ctx, mod, wazeroRuntime, scope, hostModulePtr, keyPtr, keySize, returnValuePtr, returnValueSize) - }). - Export("GetValueByKeyOnHost"). - NewFunctionBuilder(). WithParameterNames("hostInstancePtr", "returnQueryValueData", "returnQueryValueSize"). WithResultNames("callStatus"). @@ -97,18 +77,6 @@ func exportHostABIV1(ctx context.Context, wazeroRuntime *WazeroVM) error { }). Export("SetQueryOnHost"). NewFunctionBuilder(). - WithParameterNames("scope", "hostModulePtr"). - WithFunc(func(ctx context.Context, mod api.Module, scope uint32, hostModulePtr uint64) { - LockOnHost(wazeroRuntime, scope, hostModulePtr) - }). - Export("LockOnHost"). - NewFunctionBuilder(). - WithParameterNames("scope", "hostModulePtr"). - WithFunc(func(ctx context.Context, mod api.Module, scope uint32, hostModulePtr uint64) { - UnlockOnHost(wazeroRuntime, scope, hostModulePtr) - }). - Export("UnlockOnHost"). - NewFunctionBuilder(). WithParameterNames("hostInstancePtr", "errMessagePtr", "errMessageSize"). WithResultNames("callStatus"). WithFunc(func(ctx context.Context, mod api.Module, hostInstancePtr uint64, errMessagePtr, errMessageSize uint32) uint32 { @@ -172,12 +140,6 @@ func (w *WazeroVM) GetWasmModule(key string) (bool, WasmModule) { func (w *WazeroVM) SetWasmModule(key string, wasmModule WasmModule) { w.mu.Lock() defer w.mu.Unlock() - // the goal is to change the compiled module bytes, and keep module variables map - oldWasmModule, exist := w.modules[key] - if exist { - oldMap := oldWasmModule.GetModuleSharingVariables() - wasmModule.SetModuleSharingVariables(oldMap) - } w.modules[key] = wasmModule } @@ -202,7 +164,7 @@ func (w *WazeroVM) CompileWasmModule(wasmBytes []byte) (WasmModule, error) { return nil, err } - return &WazeroModule{compliedModule: module, wazeroRuntime: w, moduleSharingVariables: make(map[string][]byte)}, nil + return &WazeroModule{compliedModule: module, wazeroRuntime: w}, nil } func (w *WazeroVM) Close() error { @@ -213,24 +175,6 @@ func (w *WazeroVM) Close() error { type WazeroModule struct { wazeroRuntime *WazeroVM compliedModule wazero.CompiledModule - - mu sync.Mutex - moduleSharingVariables map[string][]byte - - moduleMu sync.Mutex - tmp int -} - -func (mod *WazeroModule) GetModuleSharingVariables() map[string][]byte { - mod.mu.Lock() - defer mod.mu.Unlock() - return mod.moduleSharingVariables -} - -func (mod *WazeroModule) SetModuleSharingVariables(m map[string][]byte) { - mod.mu.Lock() - defer mod.mu.Unlock() - mod.moduleSharingVariables = m } func (mod *WazeroModule) NewInstance(qre *QueryExecutor) (WasmInstance, error) { From 254fea41e017d21238a71297fe0f05f77340c4d6 Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 16:29:28 +0800 Subject: [PATCH 04/14] feat: update wasm plugin --- go/vt/vttablet/tabletserver/wasm_plugin_common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_common.go b/go/vt/vttablet/tabletserver/wasm_plugin_common.go index 1d0468151a..31e6a83046 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_common.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_common.go @@ -1,6 +1,6 @@ package tabletserver -const AbiVersion = "v1alpha2" +const AbiVersion = "v1alpha1" const WAZERO = "wazero" From 3702ab4ea22c799a4a3418aab266022f20fc23b1 Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 16:52:18 +0800 Subject: [PATCH 05/14] feat: update wasm plugin --- go/vt/vttablet/tabletserver/wasm_plugin_controller.go | 2 +- go/vt/vttablet/tabletserver/wasm_plugin_tools.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_controller.go b/go/vt/vttablet/tabletserver/wasm_plugin_controller.go index dedf11c731..bdf5ac803c 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_controller.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_controller.go @@ -53,7 +53,7 @@ func CalcMd5String32(data []byte) string { } func (wpc *WasmPluginController) GetWasmBytesByBinaryName(ctx context.Context, wasmBinaryName string) ([]byte, error) { - query := getQueryByName(wasmBinaryName) + query := generateWasmQueryByName(wasmBinaryName) qr, err := wpc.qe.ExecuteQuery(ctx, query) if err != nil { return nil, fmt.Errorf("get wasm binary by name %s failed : %v", wasmBinaryName, err) diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_tools.go b/go/vt/vttablet/tabletserver/wasm_plugin_tools.go index a20c5f2170..c30afc31a6 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_tools.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_tools.go @@ -5,7 +5,7 @@ import ( "fmt" ) -func getQueryByName(wasmBinaryName string) string { +func generateWasmQueryByName(wasmBinaryName string) string { return fmt.Sprintf("select * from %s where name = '%s'", WasmBinaryTableName, wasmBinaryName) } From 8dbb61a686ebee7c68fe37d1c2010ad1a08cf12c Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 17:53:06 +0800 Subject: [PATCH 06/14] feat: update wasm plugin --- go/vt/vttablet/tabletserver/wasm_plugin_wazero.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index 62206d2f60..072eb5bd55 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -275,13 +275,10 @@ func copyHostBytesIntoGuest(ctx context.Context, mod api.Module, bytes []byte, w if err != nil { return StatusInternalFailure } - // res[0] is the pointer to the allocated memory in guest - buf, ok := mod.Memory().Read(uint32(res[0]), uint32(size)) + ok := mod.Memory().Write(uint32(res[0]), bytes) if !ok { return StatusInternalFailure } - - copy(buf, bytes) ok = mod.Memory().WriteUint32Le(wasmPtrPtr, uint32(res[0])) if !ok { return StatusInternalFailure From 79129884aa9f9f903e27eea5cc8fd33cd19451be Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 19:13:05 +0800 Subject: [PATCH 07/14] feat: update wasm plugin --- go/vt/vttablet/tabletserver/action_builtin.go | 1 + go/vt/vttablet/tabletserver/wasm_plugin_wazero.go | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/action_builtin.go b/go/vt/vttablet/tabletserver/action_builtin.go index 7819d12d09..96faebe73e 100644 --- a/go/vt/vttablet/tabletserver/action_builtin.go +++ b/go/vt/vttablet/tabletserver/action_builtin.go @@ -351,6 +351,7 @@ func (p *WasmPluginAction) BeforeExecution(qre *QueryExecutor) *ActionExecutionR instance, err := module.NewInstance(qre) if err != nil { + //todo wasm: if instance is nil, we will not be able to get the it in AfterExecution. We need to handle this case return &ActionExecutionResponse{Err: err} } diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index 072eb5bd55..34e1996118 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -115,7 +115,12 @@ func (*WazeroVM) GetRuntimeType() string { } func (w *WazeroVM) InitRuntime() error { - w.runtime = wazero.NewRuntimeWithConfig(w.ctx, wazero.NewRuntimeConfig().WithCompilationCache(wazero.NewCompilationCache())) + runtimeConfig := wazero.NewRuntimeConfig(). + WithCompilationCache(wazero.NewCompilationCache()). + WithCloseOnContextDone(true). + WithMemoryLimitPages(16 * 10) //64KB each page, 10 * 16pages = 10MB + + w.runtime = wazero.NewRuntimeWithConfig(w.ctx, runtimeConfig) wasi_snapshot_preview1.MustInstantiate(w.ctx, w.runtime) return exportHostABIV1(w.ctx, w) } @@ -184,7 +189,8 @@ func (mod *WazeroModule) NewInstance(qre *QueryExecutor) (WasmInstance, error) { if mod.compliedModule == nil { return nil, fmt.Errorf("compliedModule is nil in NewInstance") } - instance, err := mod.wazeroRuntime.runtime.InstantiateModule(mod.wazeroRuntime.ctx, mod.compliedModule, wazero.NewModuleConfig().WithName("")) + config := wazero.NewModuleConfig().WithName("") + instance, err := mod.wazeroRuntime.runtime.InstantiateModule(mod.wazeroRuntime.ctx, mod.compliedModule, config) if err != nil { return nil, err } From ae0b8c4cd5771c99a4806173cd179744e3710420 Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 22:07:56 +0800 Subject: [PATCH 08/14] feat: update wasm plugin --- go/vt/vttablet/tabletserver/wasm_plugin_wazero.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index 34e1996118..25c4cc3ce0 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -275,8 +275,8 @@ func copyHostBytesIntoGuest(ctx context.Context, mod api.Module, bytes []byte, w return StatusBadArgument } - // call 'proxy_on_memory_allocate' to allocate memory in guest, 'proxy_on_memory_allocate' is a exported function in guest - alloc := mod.ExportedFunction("proxy_on_memory_allocate") + // call 'malloc' to allocate memory in guest, guest need to free it + alloc := mod.ExportedFunction("malloc") res, err := alloc.Call(ctx, uint64(size)) if err != nil { return StatusInternalFailure From a1c5330508bcbfff18825241fd64849f16762e85 Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 23:24:50 +0800 Subject: [PATCH 09/14] feat: update wasm plugin --- go/vt/vttablet/tabletserver/action_builtin.go | 4 +- go/vt/vttablet/tabletserver/action_stats.go | 26 ++++++++++++ go/vt/vttablet/tabletserver/query_engine.go | 4 ++ go/vt/vttablet/tabletserver/query_executor.go | 12 ++++++ .../tabletserver/wasm_plugin_types.go | 8 ++-- .../tabletserver/wasm_plugin_wazero.go | 40 ++++++++----------- .../wescale_filter_request_handler.go | 11 +++-- 7 files changed, 68 insertions(+), 37 deletions(-) create mode 100644 go/vt/vttablet/tabletserver/action_stats.go diff --git a/go/vt/vttablet/tabletserver/action_builtin.go b/go/vt/vttablet/tabletserver/action_builtin.go index 96faebe73e..553b489b55 100644 --- a/go/vt/vttablet/tabletserver/action_builtin.go +++ b/go/vt/vttablet/tabletserver/action_builtin.go @@ -337,13 +337,13 @@ func (args *WasmPluginActionArgs) Parse(stringParams string) (ActionArgs, error) func (p *WasmPluginAction) BeforeExecution(qre *QueryExecutor) *ActionExecutionResponse { controller := qre.tsv.qe.wasmPluginController - ok, module := controller.VM.GetWasmModule(p.Args.WasmBinaryName) + ok, module := controller.VM.GetWasmModule(p.GetRule().Name) if !ok { wasmBytes, err := controller.GetWasmBytesByBinaryName(qre.ctx, p.Args.WasmBinaryName) if err != nil { return &ActionExecutionResponse{Err: err} } - module, err = controller.VM.InitWasmModule(p.Args.WasmBinaryName, wasmBytes) + module, err = controller.VM.InitWasmModule(p.GetRule().Name, wasmBytes) if err != nil { return &ActionExecutionResponse{Err: err} } diff --git a/go/vt/vttablet/tabletserver/action_stats.go b/go/vt/vttablet/tabletserver/action_stats.go new file mode 100644 index 0000000000..99b6bfc3ce --- /dev/null +++ b/go/vt/vttablet/tabletserver/action_stats.go @@ -0,0 +1,26 @@ +package tabletserver + +import ( + "time" + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/servenv" +) + +type ActionStats struct { + FilterBeforeExecutionTiming *servenv.TimingsWrapper + FilterAfterExecutionTiming *servenv.TimingsWrapper + FilterErrorCounts *stats.CountersWithSingleLabel + QPSRates *stats.Rates + WasmMemorySize *stats.CountersWithMultiLabels +} + +func NewActionStats(exporter *servenv.Exporter) *ActionStats { + stats := &ActionStats{ + FilterBeforeExecutionTiming: exporter.NewTimings("FilterBeforeExecution", "Filter before execution timings", "Name"), + FilterAfterExecutionTiming: exporter.NewTimings("FilterAfterExecution", "Filter before execution timings", "Name"), + FilterErrorCounts: exporter.NewCountersWithSingleLabel("FilterErrorCounts", "filter error counts", "Name"), + WasmMemorySize: exporter.NewCountersWithMultiLabels("WasmMemorySize", "Wasm memory size", []string{"Name", "BeforeOrAfter"}), + } + stats.QPSRates = exporter.NewRates("FilterQps", stats.FilterBeforeExecutionTiming, 15*60/5, 5*time.Second) + return stats +} diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 2011dcb1d7..086e21c59a 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -187,6 +187,8 @@ type QueryEngine struct { // stats queryCounts, queryTimes, queryErrorCounts, queryRowsAffected, queryRowsReturned *stats.CountersWithMultiLabels + // actionStats for filters + actionStats *ActionStats // Loggers accessCheckerLogger *logutil.ThrottledLogger @@ -281,6 +283,8 @@ func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine { qe.queryRowsReturned = env.Exporter().NewCountersWithMultiLabels("QueryRowsReturned", "query rows returned", []string{"Table", "Plan"}) qe.queryErrorCounts = env.Exporter().NewCountersWithMultiLabels("QueryErrorCounts", "query error counts", []string{"Table", "Plan"}) + qe.actionStats = NewActionStats(env.Exporter()) + env.Exporter().HandleFunc("/debug/ccl", qe.concurrencyController.ServeHTTP) env.Exporter().HandleFunc("/debug/hotrows", qe.txSerializer.ServeHTTP) env.Exporter().HandleFunc("/debug/tablet_plans", qe.handleHTTPQueryPlans) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 757ea97150..ce9d1e10e6 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -612,8 +612,14 @@ func (qre *QueryExecutor) runActionListBeforeExecution() (*sqltypes.Result, erro } for _, a := range qre.matchedActionList { if !a.GetSkipFlag() { + startTime := time.Now() + // execute the filter action resp := a.BeforeExecution(qre) + qre.tsv.qe.actionStats.FilterBeforeExecutionTiming.Add(a.GetRule().Name, time.Since(startTime)) qre.calledActionList = append(qre.calledActionList, a) + if resp.Err != nil { + qre.tsv.qe.actionStats.FilterErrorCounts.Add(a.GetRule().Name, 1) + } if resp.Reply != nil || resp.Err != nil { return resp.Reply, resp.Err } @@ -631,7 +637,13 @@ func (qre *QueryExecutor) runActionListAfterExecution(reply *sqltypes.Result, er for i := len(qre.calledActionList) - 1; i >= 0; i-- { a := qre.calledActionList[i] + startTime := time.Now() + // execute the filter action resp := a.AfterExecution(qre, newReply, newErr) + qre.tsv.qe.actionStats.FilterAfterExecutionTiming.Add(a.GetRule().Name, time.Since(startTime)) + if resp.Err != nil { + qre.tsv.qe.actionStats.FilterErrorCounts.Add(a.GetRule().Name, 1) + } newReply, newErr = resp.Reply, resp.Err } return newReply, newErr diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_types.go b/go/vt/vttablet/tabletserver/wasm_plugin_types.go index 0d2eea4822..ac88e8b101 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_types.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_types.go @@ -5,11 +5,9 @@ import "vitess.io/vitess/go/sqltypes" type WasmVM interface { GetRuntimeType() string InitRuntime() error - GetWasmModule(key string) (bool, WasmModule) - SetWasmModule(key string, wasmModule WasmModule) - InitWasmModule(key string, wasmBytes []byte) (WasmModule, error) - CompileWasmModule(wasmBytes []byte) (WasmModule, error) - ClearWasmModule(key string) + GetWasmModule(filterName string) (bool, WasmModule) + InitWasmModule(filterName string, wasmBytes []byte) (WasmModule, error) + ClearWasmModule(filterName string) Close() error } diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index 25c4cc3ce0..3955408607 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -125,51 +125,41 @@ func (w *WazeroVM) InitRuntime() error { return exportHostABIV1(w.ctx, w) } -func (w *WazeroVM) ClearWasmModule(key string) { +func (w *WazeroVM) ClearWasmModule(filterName string) { w.mu.Lock() defer w.mu.Unlock() - if mod, exist := w.modules[key]; exist { + if mod, exist := w.modules[filterName]; exist { defer mod.Close() } - delete(w.modules, key) + delete(w.modules, filterName) } -func (w *WazeroVM) GetWasmModule(key string) (bool, WasmModule) { +func (w *WazeroVM) GetWasmModule(filterName string) (bool, WasmModule) { w.mu.Lock() defer w.mu.Unlock() - module, exist := w.modules[key] + module, exist := w.modules[filterName] return exist, module } -func (w *WazeroVM) SetWasmModule(key string, wasmModule WasmModule) { +func (w *WazeroVM) InitWasmModule(filterName string, wasmBytes []byte) (WasmModule, error) { w.mu.Lock() defer w.mu.Unlock() - w.modules[key] = wasmModule -} - -func (w *WazeroVM) InitWasmModule(key string, wasmBytes []byte) (WasmModule, error) { - w.mu.Lock() - defer w.mu.Unlock() - module, exist := w.modules[key] + module, exist := w.modules[filterName] if exist { return module, nil } - module, err := w.CompileWasmModule(wasmBytes) + compiled, err := w.runtime.CompileModule(w.ctx, wasmBytes) if err != nil { return nil, err } - w.modules[key] = module - return module, nil -} - -func (w *WazeroVM) CompileWasmModule(wasmBytes []byte) (WasmModule, error) { - module, err := w.runtime.CompileModule(w.ctx, wasmBytes) - if err != nil { - return nil, err + module = &WazeroModule{ + filterName: filterName, + compliedModule: compiled, + wazeroRuntime: w, } - - return &WazeroModule{compliedModule: module, wazeroRuntime: w}, nil + w.modules[filterName] = module + return module, nil } func (w *WazeroVM) Close() error { @@ -178,6 +168,7 @@ func (w *WazeroVM) Close() error { } type WazeroModule struct { + filterName string wazeroRuntime *WazeroVM compliedModule wazero.CompiledModule } @@ -214,6 +205,7 @@ type WazeroInstance struct { func (ins *WazeroInstance) RunWASMPlugin() error { ctx := context.Background() + defer ins.qre.tsv.qe.actionStats.WasmMemorySize.Add([]string{"", ""}, int64(ins.instance.Memory().Size())) wazeroGuestFunc := ins.instance.ExportedFunction("RunBeforeExecutionOnGuest") if wazeroGuestFunc == nil { diff --git a/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go b/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go index 38d479571e..4f793b391e 100644 --- a/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go +++ b/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go @@ -200,7 +200,7 @@ func (qe *QueryEngine) HandleDropFilter(stmt *sqlparser.DropWescaleFilter) (*sql return rst, nil } -func (qe *QueryEngine) removeFilterMemoryData(name, actionArgs string, action rules.Action) { +func (qe *QueryEngine) removeFilterMemoryData(filterName, actionArgs string, action rules.Action) { switch action { case rules.QRWasmPlugin: query, err := customrule.GetSelectByActionArgsSQL(actionArgs) @@ -216,11 +216,11 @@ func (qe *QueryEngine) removeFilterMemoryData(name, actionArgs string, action ru a := &WasmPluginActionArgs{} tmp, _ := a.Parse(actionArgs) a = tmp.(*WasmPluginActionArgs) - qe.wasmPluginController.VM.ClearWasmModule(a.WasmBinaryName) + qe.wasmPluginController.VM.ClearWasmModule(filterName) } case rules.QRConcurrencyControl: - qe.concurrencyController.DeleteQueue(name) + qe.concurrencyController.DeleteQueue(filterName) } } @@ -316,12 +316,11 @@ func CheckAndFormatActionArgs(qe *QueryEngine, filerName, actionType, actionArgs if err != nil { return "", err } - wasmModule, err := qe.wasmPluginController.VM.CompileWasmModule(bytes) + qe.wasmPluginController.VM.ClearWasmModule(filerName) + _, err = qe.wasmPluginController.VM.InitWasmModule(filerName, bytes) if err != nil { return "", fmt.Errorf("err when compiling wasm moulde %v", err) } - // whether create or alter, just set - qe.wasmPluginController.VM.SetWasmModule(wasmPluginArgs.WasmBinaryName, wasmModule) return FormatUserInputStr(actionArgs), nil case rules.QRSkipFilter: From 8359d80e178f29e8cc126516b28f9893a9a93017 Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 23:27:11 +0800 Subject: [PATCH 10/14] feat: update wasm plugin --- go/vt/vttablet/tabletserver/wasm_plugin_wazero.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index 3955408607..775d7db8fc 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -205,7 +205,7 @@ type WazeroInstance struct { func (ins *WazeroInstance) RunWASMPlugin() error { ctx := context.Background() - defer ins.qre.tsv.qe.actionStats.WasmMemorySize.Add([]string{"", ""}, int64(ins.instance.Memory().Size())) + defer ins.qre.tsv.qe.actionStats.WasmMemorySize.Add([]string{ins.module.filterName, "Before"}, int64(ins.instance.Memory().Size())) wazeroGuestFunc := ins.instance.ExportedFunction("RunBeforeExecutionOnGuest") if wazeroGuestFunc == nil { @@ -226,6 +226,7 @@ func (ins *WazeroInstance) RunWASMPlugin() error { func (ins *WazeroInstance) RunWASMPluginAfter() error { ctx := context.Background() + defer ins.qre.tsv.qe.actionStats.WasmMemorySize.Add([]string{ins.module.filterName, "After"}, int64(ins.instance.Memory().Size())) wazeroGuestFunc := ins.instance.ExportedFunction("RunAfterExecutionOnGuest") From 44dce7f5e69f15c85e2c4edd5deefb888216c83d Mon Sep 17 00:00:00 2001 From: earayu Date: Wed, 20 Nov 2024 23:32:58 +0800 Subject: [PATCH 11/14] feat: update wasm plugin --- go/vt/vttablet/tabletserver/action_stats.go | 8 ++++---- go/vt/vttablet/tabletserver/wasm_plugin_wazero.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/tabletserver/action_stats.go b/go/vt/vttablet/tabletserver/action_stats.go index 99b6bfc3ce..3dd1faf8ac 100644 --- a/go/vt/vttablet/tabletserver/action_stats.go +++ b/go/vt/vttablet/tabletserver/action_stats.go @@ -10,8 +10,8 @@ type ActionStats struct { FilterBeforeExecutionTiming *servenv.TimingsWrapper FilterAfterExecutionTiming *servenv.TimingsWrapper FilterErrorCounts *stats.CountersWithSingleLabel - QPSRates *stats.Rates - WasmMemorySize *stats.CountersWithMultiLabels + FilterQPSRates *stats.Rates + FilterWasmMemorySize *stats.CountersWithMultiLabels } func NewActionStats(exporter *servenv.Exporter) *ActionStats { @@ -19,8 +19,8 @@ func NewActionStats(exporter *servenv.Exporter) *ActionStats { FilterBeforeExecutionTiming: exporter.NewTimings("FilterBeforeExecution", "Filter before execution timings", "Name"), FilterAfterExecutionTiming: exporter.NewTimings("FilterAfterExecution", "Filter before execution timings", "Name"), FilterErrorCounts: exporter.NewCountersWithSingleLabel("FilterErrorCounts", "filter error counts", "Name"), - WasmMemorySize: exporter.NewCountersWithMultiLabels("WasmMemorySize", "Wasm memory size", []string{"Name", "BeforeOrAfter"}), + FilterWasmMemorySize: exporter.NewCountersWithMultiLabels("FilterWasmMemorySize", "Wasm memory size", []string{"Name", "BeforeOrAfter"}), } - stats.QPSRates = exporter.NewRates("FilterQps", stats.FilterBeforeExecutionTiming, 15*60/5, 5*time.Second) + stats.FilterQPSRates = exporter.NewRates("FilterQps", stats.FilterBeforeExecutionTiming, 15*60/5, 5*time.Second) return stats } diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index 775d7db8fc..61674531db 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -205,7 +205,7 @@ type WazeroInstance struct { func (ins *WazeroInstance) RunWASMPlugin() error { ctx := context.Background() - defer ins.qre.tsv.qe.actionStats.WasmMemorySize.Add([]string{ins.module.filterName, "Before"}, int64(ins.instance.Memory().Size())) + defer ins.qre.tsv.qe.actionStats.FilterWasmMemorySize.Add([]string{ins.module.filterName, "Before"}, int64(ins.instance.Memory().Size())) wazeroGuestFunc := ins.instance.ExportedFunction("RunBeforeExecutionOnGuest") if wazeroGuestFunc == nil { @@ -226,7 +226,7 @@ func (ins *WazeroInstance) RunWASMPlugin() error { func (ins *WazeroInstance) RunWASMPluginAfter() error { ctx := context.Background() - defer ins.qre.tsv.qe.actionStats.WasmMemorySize.Add([]string{ins.module.filterName, "After"}, int64(ins.instance.Memory().Size())) + defer ins.qre.tsv.qe.actionStats.FilterWasmMemorySize.Add([]string{ins.module.filterName, "After"}, int64(ins.instance.Memory().Size())) wazeroGuestFunc := ins.instance.ExportedFunction("RunAfterExecutionOnGuest") From 5a194a598bb03aaf9e0ce868e704e3ba401369ac Mon Sep 17 00:00:00 2001 From: earayu Date: Thu, 21 Nov 2024 11:35:29 +0800 Subject: [PATCH 12/14] feat: remove skip_filter --- go/vt/vttablet/tabletserver/action.go | 4 - go/vt/vttablet/tabletserver/action_builtin.go | 141 ------------------ go/vt/vttablet/tabletserver/action_factory.go | 2 - go/vt/vttablet/tabletserver/query_executor.go | 22 ++- go/vt/vttablet/tabletserver/rules/rules.go | 5 - .../vttablet/tabletserver/skip_filter_test.go | 78 ---------- .../wescale_filter_request_handler.go | 7 - 7 files changed, 10 insertions(+), 249 deletions(-) delete mode 100644 go/vt/vttablet/tabletserver/skip_filter_test.go diff --git a/go/vt/vttablet/tabletserver/action.go b/go/vt/vttablet/tabletserver/action.go index fdc82c5f45..27de0b31ca 100644 --- a/go/vt/vttablet/tabletserver/action.go +++ b/go/vt/vttablet/tabletserver/action.go @@ -20,10 +20,6 @@ type ActionInterface interface { SetParams(args ActionArgs) error GetRule() *rules.Rule - - GetSkipFlag() bool - - SetSkipFlag(skip bool) } type ActionArgs interface { diff --git a/go/vt/vttablet/tabletserver/action_builtin.go b/go/vt/vttablet/tabletserver/action_builtin.go index 553b489b55..297941d40d 100644 --- a/go/vt/vttablet/tabletserver/action_builtin.go +++ b/go/vt/vttablet/tabletserver/action_builtin.go @@ -3,7 +3,6 @@ package tabletserver import ( "context" "fmt" - "regexp" "time" "github.com/BurntSushi/toml" @@ -20,8 +19,6 @@ type ContinueAction struct { // Action is the action to take if the rule matches Action rules.Action - - skipFlag bool } func (p *ContinueAction) BeforeExecution(_ *QueryExecutor) *ActionExecutionResponse { @@ -55,16 +52,6 @@ type FailAction struct { // Action is the action to take if the rule matches Action rules.Action - - skipFlag bool -} - -func (p *ContinueAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *ContinueAction) SetSkipFlag(skip bool) { - p.skipFlag = skip } func (p *FailAction) BeforeExecution(_ *QueryExecutor) *ActionExecutionResponse { @@ -93,21 +80,11 @@ func (p *FailAction) GetRule() *rules.Rule { return p.Rule } -func (p *FailAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *FailAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - type FailRetryAction struct { Rule *rules.Rule // Action is the action to take if the rule matches Action rules.Action - - skipFlag bool } func (p *FailRetryAction) BeforeExecution(_ *QueryExecutor) *ActionExecutionResponse { @@ -136,21 +113,11 @@ func (p *FailRetryAction) GetRule() *rules.Rule { return p.Rule } -func (p *FailRetryAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *FailRetryAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - type BufferAction struct { Rule *rules.Rule // Action is the action to take if the rule matches Action rules.Action - - skipFlag bool } func (p *BufferAction) BeforeExecution(qre *QueryExecutor) *ActionExecutionResponse { @@ -198,14 +165,6 @@ func (p *BufferAction) GetRule() *rules.Rule { return p.Rule } -func (p *BufferAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *BufferAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - type ConcurrencyControlAction struct { Rule *rules.Rule @@ -213,8 +172,6 @@ type ConcurrencyControlAction struct { Action rules.Action Args *ConcurrencyControlActionArgs - - skipFlag bool } type ConcurrencyControlActionArgs struct { @@ -292,14 +249,6 @@ func (p *ConcurrencyControlAction) GetRule() *rules.Rule { return p.Rule } -func (p *ConcurrencyControlAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *ConcurrencyControlAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - type WasmPluginAction struct { Rule *rules.Rule @@ -307,8 +256,6 @@ type WasmPluginAction struct { Action rules.Action Args *WasmPluginActionArgs - - skipFlag bool } type WasmPluginActionArgs struct { @@ -400,91 +347,3 @@ func (p *WasmPluginAction) SetParams(args ActionArgs) error { func (p *WasmPluginAction) GetRule() *rules.Rule { return p.Rule } - -func (p *WasmPluginAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *WasmPluginAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} - -type SkipFilterAction struct { - Rule *rules.Rule - - // Action is the action to take if the rule matches - Action rules.Action - - Args *SkipFilterActionArgs - - skipFlag bool -} - -type SkipFilterActionArgs struct { - AllowRegexString string `toml:"skip_filter_regex"` - AllowRegex *regexp.Regexp -} - -func (args *SkipFilterActionArgs) Parse(stringParams string) (ActionArgs, error) { - s := &SkipFilterActionArgs{} - if stringParams == "" { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "stringParams is empty when parsing skip filter action args") - } - - userInputTOML := ConvertUserInputToTOML(stringParams) - err := toml.Unmarshal([]byte(userInputTOML), s) - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "error when parsing skip filter action args: %v", err) - } - s.AllowRegex, err = regexp.Compile(fmt.Sprintf("^%s$", s.AllowRegexString)) - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "error when compiling skip filter action args: %v", err) - } - - return s, nil -} - -func (p *SkipFilterAction) BeforeExecution(qre *QueryExecutor) *ActionExecutionResponse { - findSelf := false - for _, a := range qre.matchedActionList { - if a.GetRule().Name == p.GetRule().Name { - findSelf = true - continue - } - if findSelf { - if p.Args.AllowRegex.MatchString(a.GetRule().Name) { - a.SetSkipFlag(true) - } - } - } - return &ActionExecutionResponse{Err: nil} -} - -func (p *SkipFilterAction) AfterExecution(qre *QueryExecutor, reply *sqltypes.Result, err error) *ActionExecutionResponse { - return &ActionExecutionResponse{Reply: reply, Err: err} -} - -func (p *SkipFilterAction) ParseParams(argsStr string) (ActionArgs, error) { - return p.Args.Parse(argsStr) -} - -func (p *SkipFilterAction) SetParams(args ActionArgs) error { - skipFilterArgs, ok := args.(*SkipFilterActionArgs) - if !ok { - return fmt.Errorf("args :%v is not a valid SkipFilterActionArgs)", args) - } - p.Args = skipFilterArgs - return nil -} - -func (p *SkipFilterAction) GetRule() *rules.Rule { - return p.Rule -} - -func (p *SkipFilterAction) GetSkipFlag() bool { - return p.skipFlag -} - -func (p *SkipFilterAction) SetSkipFlag(skip bool) { - p.skipFlag = skip -} diff --git a/go/vt/vttablet/tabletserver/action_factory.go b/go/vt/vttablet/tabletserver/action_factory.go index 190a85da1f..e8baa28b28 100644 --- a/go/vt/vttablet/tabletserver/action_factory.go +++ b/go/vt/vttablet/tabletserver/action_factory.go @@ -60,8 +60,6 @@ func CreateActionInstance(action rules.Action, rule *rules.Rule) (ActionInterfac actInst, err = &ConcurrencyControlAction{Rule: rule, Action: action}, nil case rules.QRWasmPlugin: actInst, err = &WasmPluginAction{Rule: rule, Action: action}, nil - case rules.QRSkipFilter: - actInst, err = &SkipFilterAction{Rule: rule, Action: action}, nil default: log.Errorf("unknown action: %v", action) actInst, err = nil, fmt.Errorf("unknown action: %v", action) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index ce9d1e10e6..94763d6632 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -611,18 +611,16 @@ func (qre *QueryExecutor) runActionListBeforeExecution() (*sqltypes.Result, erro return nil, nil } for _, a := range qre.matchedActionList { - if !a.GetSkipFlag() { - startTime := time.Now() - // execute the filter action - resp := a.BeforeExecution(qre) - qre.tsv.qe.actionStats.FilterBeforeExecutionTiming.Add(a.GetRule().Name, time.Since(startTime)) - qre.calledActionList = append(qre.calledActionList, a) - if resp.Err != nil { - qre.tsv.qe.actionStats.FilterErrorCounts.Add(a.GetRule().Name, 1) - } - if resp.Reply != nil || resp.Err != nil { - return resp.Reply, resp.Err - } + startTime := time.Now() + // execute the filter action + resp := a.BeforeExecution(qre) + qre.tsv.qe.actionStats.FilterBeforeExecutionTiming.Add(a.GetRule().Name, time.Since(startTime)) + qre.calledActionList = append(qre.calledActionList, a) + if resp.Err != nil { + qre.tsv.qe.actionStats.FilterErrorCounts.Add(a.GetRule().Name, 1) + } + if resp.Reply != nil || resp.Err != nil { + return resp.Reply, resp.Err } } return nil, nil diff --git a/go/vt/vttablet/tabletserver/rules/rules.go b/go/vt/vttablet/tabletserver/rules/rules.go index 543bb5d8b7..14269734a4 100644 --- a/go/vt/vttablet/tabletserver/rules/rules.go +++ b/go/vt/vttablet/tabletserver/rules/rules.go @@ -772,7 +772,6 @@ const ( QRBuffer QRConcurrencyControl QRWasmPlugin - QRSkipFilter ) func ParseStringToAction(s string) (Action, error) { @@ -789,8 +788,6 @@ func ParseStringToAction(s string) (Action, error) { return QRConcurrencyControl, nil case "WASM_PLUGIN": return QRWasmPlugin, nil - case "SKIP_FILTER": - return QRSkipFilter, nil default: return QRContinue, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid Action %s", s) } @@ -810,8 +807,6 @@ func (act Action) ToString() string { return "CONCURRENCY_CONTROL" case QRWasmPlugin: return "WASM_PLUGIN" - case QRSkipFilter: - return "SKIP_FILTER" default: return "INVALID" } diff --git a/go/vt/vttablet/tabletserver/skip_filter_test.go b/go/vt/vttablet/tabletserver/skip_filter_test.go deleted file mode 100644 index 961bf0ec3e..0000000000 --- a/go/vt/vttablet/tabletserver/skip_filter_test.go +++ /dev/null @@ -1,78 +0,0 @@ -package tabletserver - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" -) - -func TestSkipFilter(t *testing.T) { - - skipRule := rules.NewActiveQueryRule("skip all filters", "s1", rules.QRSkipFilter) - skipActionArgs := &SkipFilterActionArgs{} - - args, _ := skipActionArgs.Parse(`skip_filter_regex=".*"`) - skipActionArgs = args.(*SkipFilterActionArgs) - skipFilter := &SkipFilterAction{Rule: skipRule, Action: rules.QRSkipFilter, Args: skipActionArgs} - - argsTestRegex, _ := skipActionArgs.Parse(`skip_filter_regex="c.*|d1"`) - skipActionArgsTestRegex := argsTestRegex.(*SkipFilterActionArgs) - skipFilterTestRegex := &SkipFilterAction{Rule: skipRule, Action: rules.QRSkipFilter, Args: skipActionArgsTestRegex} - - tests := []struct { - name string - actionList []ActionInterface - expectedCalledNameList []string - }{ - - { - name: "skip all", - actionList: []ActionInterface{ - skipFilter, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c1", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c2", rules.QRContinue), Action: rules.QRContinue}}, - expectedCalledNameList: []string{"s1"}, - }, - { - name: "none to skip", - actionList: []ActionInterface{ - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c1", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c2", rules.QRContinue), Action: rules.QRContinue}, - skipFilter}, - expectedCalledNameList: []string{"c1", "c2", "s1"}, - }, - { - name: "just skip c2", - actionList: []ActionInterface{ - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c1", rules.QRContinue), Action: rules.QRContinue}, - skipFilter, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c2", rules.QRContinue), Action: rules.QRContinue}}, - expectedCalledNameList: []string{"c1", "s1"}, - }, - { - name: "test regex", - actionList: []ActionInterface{ - skipFilterTestRegex, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c1", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "c2", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "d1", rules.QRContinue), Action: rules.QRContinue}, - &ContinueAction{Rule: rules.NewActiveQueryRule("", "d2", rules.QRContinue), Action: rules.QRContinue}}, - expectedCalledNameList: []string{"s1", "d2"}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - - qre := &QueryExecutor{} - qre.matchedActionList = tt.actionList - _, err := qre.runActionListBeforeExecution() - assert.Nil(t, err) - assert.Equal(t, len(tt.expectedCalledNameList), len(qre.calledActionList)) - for i, name := range tt.expectedCalledNameList { - assert.Equal(t, name, qre.calledActionList[i].GetRule().Name) - } - }) - } -} diff --git a/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go b/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go index 4f793b391e..3ba25f0e6e 100644 --- a/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go +++ b/go/vt/vttablet/tabletserver/wescale_filter_request_handler.go @@ -322,13 +322,6 @@ func CheckAndFormatActionArgs(qe *QueryEngine, filerName, actionType, actionArgs return "", fmt.Errorf("err when compiling wasm moulde %v", err) } - return FormatUserInputStr(actionArgs), nil - case rules.QRSkipFilter: - skipFilterArgs := &SkipFilterActionArgs{} - _, err := skipFilterArgs.Parse(actionArgs) - if err != nil { - return "", err - } return FormatUserInputStr(actionArgs), nil default: if actionArgs != "" { From ea3bfaae1b271b96d5b2fd32d19fb6ff05200a43 Mon Sep 17 00:00:00 2001 From: earayu Date: Mon, 25 Nov 2024 01:23:13 +0800 Subject: [PATCH 13/14] feat: fix testcase --- .../tabletserver/query_executor_filter_test.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_executor_filter_test.go b/go/vt/vttablet/tabletserver/query_executor_filter_test.go index 20b348960b..22b906df58 100644 --- a/go/vt/vttablet/tabletserver/query_executor_filter_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_filter_test.go @@ -66,7 +66,10 @@ func TestQueryExecutor_runActionListBeforeExecution(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - qre := &QueryExecutor{ctx: ctx} + db := setUpQueryExecutorTest(t) + defer db.Close() + tsv := newTestTabletServer(ctx, noFlags, db) + qre := newTestQueryExecutor(ctx, tsv, "select 1", 0) qre.matchedActionList = tt.actionList _, err := qre.runActionListBeforeExecution() tt.wantErr(t, err, "runActionListBeforeExecution()") @@ -129,7 +132,10 @@ func TestQueryExecutor_runActionListAfterExecution(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - qre := &QueryExecutor{ctx: ctx} + db := setUpQueryExecutorTest(t) + defer db.Close() + tsv := newTestTabletServer(ctx, noFlags, db) + qre := newTestQueryExecutor(ctx, tsv, "select 1", 0) qre.matchedActionList = tt.actionList qr := &sqltypes.Result{} var err error @@ -162,7 +168,10 @@ func TestQueryExecutor_actions_can_be_skipped(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - qre := &QueryExecutor{ctx: ctx} + db := setUpQueryExecutorTest(t) + defer db.Close() + tsv := newTestTabletServer(ctx, noFlags, db) + qre := newTestQueryExecutor(ctx, tsv, "select 1", 0) qre.matchedActionList = tt.actionList qr, err := qre.runActionListBeforeExecution() tt.wantErr(t, err, "runActionListBeforeExecution()") From 4d26fa0b956932226ed6a32fee5ddf82280726e9 Mon Sep 17 00:00:00 2001 From: earayu Date: Mon, 25 Nov 2024 23:02:24 +0800 Subject: [PATCH 14/14] feat: use gauge instead of counter for wasm memory size --- go/vt/vttablet/tabletserver/action_stats.go | 4 ++-- go/vt/vttablet/tabletserver/wasm_plugin_wazero.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/action_stats.go b/go/vt/vttablet/tabletserver/action_stats.go index 3dd1faf8ac..07db32a14e 100644 --- a/go/vt/vttablet/tabletserver/action_stats.go +++ b/go/vt/vttablet/tabletserver/action_stats.go @@ -11,7 +11,7 @@ type ActionStats struct { FilterAfterExecutionTiming *servenv.TimingsWrapper FilterErrorCounts *stats.CountersWithSingleLabel FilterQPSRates *stats.Rates - FilterWasmMemorySize *stats.CountersWithMultiLabels + FilterWasmMemorySize *stats.GaugesWithMultiLabels } func NewActionStats(exporter *servenv.Exporter) *ActionStats { @@ -19,7 +19,7 @@ func NewActionStats(exporter *servenv.Exporter) *ActionStats { FilterBeforeExecutionTiming: exporter.NewTimings("FilterBeforeExecution", "Filter before execution timings", "Name"), FilterAfterExecutionTiming: exporter.NewTimings("FilterAfterExecution", "Filter before execution timings", "Name"), FilterErrorCounts: exporter.NewCountersWithSingleLabel("FilterErrorCounts", "filter error counts", "Name"), - FilterWasmMemorySize: exporter.NewCountersWithMultiLabels("FilterWasmMemorySize", "Wasm memory size", []string{"Name", "BeforeOrAfter"}), + FilterWasmMemorySize: exporter.NewGaugesWithMultiLabels("FilterWasmMemorySize", "Wasm memory size", []string{"Name", "BeforeOrAfter"}), } stats.FilterQPSRates = exporter.NewRates("FilterQps", stats.FilterBeforeExecutionTiming, 15*60/5, 5*time.Second) return stats diff --git a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go index 61674531db..0131f61db7 100644 --- a/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go +++ b/go/vt/vttablet/tabletserver/wasm_plugin_wazero.go @@ -205,7 +205,7 @@ type WazeroInstance struct { func (ins *WazeroInstance) RunWASMPlugin() error { ctx := context.Background() - defer ins.qre.tsv.qe.actionStats.FilterWasmMemorySize.Add([]string{ins.module.filterName, "Before"}, int64(ins.instance.Memory().Size())) + defer ins.qre.tsv.qe.actionStats.FilterWasmMemorySize.Set([]string{ins.module.filterName, "Before"}, int64(ins.instance.Memory().Size())) wazeroGuestFunc := ins.instance.ExportedFunction("RunBeforeExecutionOnGuest") if wazeroGuestFunc == nil { @@ -226,7 +226,7 @@ func (ins *WazeroInstance) RunWASMPlugin() error { func (ins *WazeroInstance) RunWASMPluginAfter() error { ctx := context.Background() - defer ins.qre.tsv.qe.actionStats.FilterWasmMemorySize.Add([]string{ins.module.filterName, "After"}, int64(ins.instance.Memory().Size())) + defer ins.qre.tsv.qe.actionStats.FilterWasmMemorySize.Set([]string{ins.module.filterName, "After"}, int64(ins.instance.Memory().Size())) wazeroGuestFunc := ins.instance.ExportedFunction("RunAfterExecutionOnGuest")