From 86a698a51567d02144e100a955d8111d8f0a4a91 Mon Sep 17 00:00:00 2001 From: Alex Savanovich Date: Mon, 10 Jun 2024 10:41:41 +0200 Subject: [PATCH] hackathon demo commit --- cli/cmd/sync_v3.go | 10 +- cli/go.mod | 1 - cli/go.sum | 2 - cli/internal/data_transform/transform.go | 10 +- cli/internal/data_transform/wasmer.go | 145 ----------------------- cli/internal/data_transform/wazero.go | 70 ++++++++--- 6 files changed, 67 insertions(+), 171 deletions(-) delete mode 100644 cli/internal/data_transform/wasmer.go diff --git a/cli/cmd/sync_v3.go b/cli/cmd/sync_v3.go index 1a9a08742a73f6..e3bddf92e4620f 100644 --- a/cli/cmd/sync_v3.go +++ b/cli/cmd/sync_v3.go @@ -271,8 +271,14 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des syncResponseMsg := r.GetMessage() switch m := syncResponseMsg.(type) { case *plugin.Sync_Response_Insert: + record, err := plugin.NewRecordFromBytes(m.Insert.Record) + if err != nil { + return fmt.Errorf("failed to get record from bytes: %w", err) + } + + table, _ := record.Schema().Metadata().GetValue("cq:table_name") for _, mod := range dt.Modules() { - out, err := dt.ExecuteModule(ctx, mod, m.Insert.Record) + out, err := dt.ExecuteModule(ctx, mod, table, m.Insert.Record) if err != nil { return err } @@ -280,7 +286,7 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des m.Insert.Record = out } - record, err := plugin.NewRecordFromBytes(m.Insert.Record) + record, err = plugin.NewRecordFromBytes(m.Insert.Record) if err != nil { return fmt.Errorf("failed to get record from bytes: %w", err) } diff --git a/cli/go.mod b/cli/go.mod index 7497a26341e0f2..941cc8fdc840f9 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -134,7 +134,6 @@ require ( github.com/valyala/fasttemplate v1.2.2 // indirect github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect - github.com/wasmerio/wasmer-go v1.0.4 github.com/yosssi/ace v0.0.5 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect diff --git a/cli/go.sum b/cli/go.sum index ec12cf8403c3b1..83166a9a8fae73 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -385,8 +385,6 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= -github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs= -github.com/wasmerio/wasmer-go v1.0.4/go.mod h1:0gzVdSfg6pysA6QVp6iVRPTagC6Wq9pOE8J86WKb2Fk= github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc= github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c= diff --git a/cli/internal/data_transform/transform.go b/cli/internal/data_transform/transform.go index 0d4005a4e3a9c6..db57babb1f7b85 100644 --- a/cli/internal/data_transform/transform.go +++ b/cli/internal/data_transform/transform.go @@ -7,7 +7,7 @@ import ( type DataTransformer interface { Modules() []string InitializeModule(context.Context, string) error - ExecuteModule(context.Context, string, []byte) ([]byte, error) + ExecuteModule(context.Context, string, string, []byte) ([]byte, error) Close() error } @@ -20,9 +20,9 @@ func NewNoopDataTransformer() (*NoopDataTransformer, error) { return &NoopDataTransformer{}, nil } -func (n *NoopDataTransformer) Close() error { return nil } -func (n *NoopDataTransformer) ExecuteModule(context.Context, string, []byte) ([]byte, error) { +func (*NoopDataTransformer) Close() error { return nil } +func (*NoopDataTransformer) ExecuteModule(context.Context, string, string, []byte) ([]byte, error) { return nil, nil } -func (n *NoopDataTransformer) InitializeModule(context.Context, string) error { return nil } -func (n *NoopDataTransformer) Modules() []string { return []string{} } +func (*NoopDataTransformer) InitializeModule(context.Context, string) error { return nil } +func (*NoopDataTransformer) Modules() []string { return []string{} } diff --git a/cli/internal/data_transform/wasmer.go b/cli/internal/data_transform/wasmer.go deleted file mode 100644 index 98b3fe20322ff9..00000000000000 --- a/cli/internal/data_transform/wasmer.go +++ /dev/null @@ -1,145 +0,0 @@ -package datatransform - -import ( - "context" - "errors" - "fmt" - "os" - "runtime" - - "github.com/wasmerio/wasmer-go/wasmer" - "golang.org/x/exp/maps" -) - -type Wasmer struct { - store *wasmer.Store - modules map[string]*WasiModule -} - -type WasiModule struct { - *wasmer.Instance - *wasmer.WasiEnvironment -} - -var _ DataTransformer = (*Wasmer)(nil) - -func getRuntimeEngineKind() (wasmer.EngineKind, error) { - if wasmer.IsEngineAvailable(wasmer.UNIVERSAL) { - return wasmer.UNIVERSAL, nil - } - - if wasmer.IsEngineAvailable(wasmer.DYLIB) { - return wasmer.DYLIB, nil - } - - return wasmer.EngineKind(0), errors.New("no available wasm engines") -} - -func getRuntimeCompilerKind() (wasmer.CompilerKind, error) { - compilersByPriority := []wasmer.CompilerKind{ - wasmer.CRANELIFT, - wasmer.LLVM, - wasmer.SINGLEPASS, - } - - for _, kind := range compilersByPriority { - if !wasmer.IsCompilerAvailable(kind) { - continue - } - - return kind, nil - } - - return wasmer.CompilerKind(0), errors.New("no available wasm compilers") -} - -// Deprecated: use wazero -func NewWasmerDataTransformer() (*Wasmer, error) { - engineKind, err := getRuntimeEngineKind() - if err != nil { - return nil, err - } - compilerKind, err := getRuntimeCompilerKind() - if err != nil { - return nil, err - } - - engineConfig := wasmer.NewConfig() - switch engineKind { - case wasmer.DYLIB: - engineConfig.UseDylibEngine() - default: - engineConfig.UseJITEngine() - } - - switch compilerKind { - case wasmer.CRANELIFT: - engineConfig.UseCraneliftCompiler() - case wasmer.LLVM: - engineConfig.UseLLVMCompiler() - default: - engineConfig.UseSinglepassCompiler() - } - - engine := wasmer.NewEngineWithConfig(engineConfig) - store := wasmer.NewStore(engine) - return &Wasmer{store: store, modules: map[string]*WasiModule{}}, nil -} - -func (w *Wasmer) Close() error { - w.store.Close() - return nil -} - -func (w *Wasmer) Modules() []string { return maps.Keys(w.modules) } - -func (w *Wasmer) InitializeModule(_ context.Context, path string) error { - content, err := os.ReadFile(path) - if err != nil { - return fmt.Errorf("unable to load %s: %w", path, err) - } - - mod, err := wasmer.NewModule(w.store, content) - if err != nil { - return err - } - - env, err := wasmer.NewWasiStateBuilder(path).CaptureStderr().CaptureStdout().Finalize() - if err != nil { - return err - } - imports, err := env.GenerateImportObject(w.store, mod) - if err != nil { - return err - } - - instance, err := wasmer.NewInstance(mod, imports) - if err != nil { - return err - } - - w.modules[path] = &WasiModule{Instance: instance, WasiEnvironment: env} - return nil -} - -func (w *Wasmer) ExecuteModule(_ context.Context, path string, _ []byte) ([]byte, error) { - mod, ok := w.modules[path] - if !ok { - return nil, fmt.Errorf("requested unknown module %s", path) - } - runtime.LockOSThread() - defer runtime.UnlockOSThread() - - start, err := mod.Exports.GetWasiStartFunction() - if err != nil { - return nil, err - } - rv, err := start() - fmt.Println(rv, string(mod.ReadStdout()), string(mod.ReadStderr())) - - if _, ok := err.(*wasmer.TrapError); ok { - mod.Close() - return nil, nil - } - return nil, err -} diff --git a/cli/internal/data_transform/wazero.go b/cli/internal/data_transform/wazero.go index 5769cb92a03aee..b16bf7385e91fd 100644 --- a/cli/internal/data_transform/wazero.go +++ b/cli/internal/data_transform/wazero.go @@ -3,9 +3,11 @@ package datatransform import ( "context" "fmt" - "log" "os" + "strings" + "github.com/cloudquery/plugin-sdk/v4/glob" + "github.com/rs/zerolog/log" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/api" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" @@ -51,12 +53,11 @@ func (w *Wazero) InitializeModule(ctx context.Context, path string) error { builder.WithFunc(func(ctx context.Context, m api.Module, offset, byteCount uint32) { buf, ok := m.Memory().Read(offset, byteCount) if !ok { - log.Panicf("Memory.Read(%d, %d) out of range", offset, byteCount) + log.Panic().Msgf("Memory.Read(%d, %d) out of range", offset, byteCount) } - fmt.Println("[rust] " + string(buf)) - }).Export("log") - host.Instantiate(ctx) + log.Info().Str("wasm_transformer", m.Name()).Msg(string(buf)) + }).Export("log") cm, err := w.InstantiateModule(ctx, compiled, wazero.NewModuleConfig().WithArgs("wasm-transform").WithStderr(os.Stderr).WithStdout(os.Stdout)) if err != nil { @@ -67,22 +68,61 @@ func (w *Wazero) InitializeModule(ctx context.Context, path string) error { return nil } -func (w *Wazero) ExecuteModule(ctx context.Context, path string, rec []byte) ([]byte, error) { +const cqFunctionPrefix = "_cqtransform_" + +func (w *Wazero) ExecuteModule(ctx context.Context, path string, table string, rec []byte) ([]byte, error) { mod, ok := w.modules[path] if !ok { return nil, fmt.Errorf("requested unknown module %s", path) } - fmt.Printf("%s exported memory table:\n", path) - for _, mdef := range mod.ExportedMemoryDefinitions() { - fmt.Println(mdef.ExportNames()[0]) + matchingFunctionsWithFilter := map[string]string{} + + for _, wf := range mod.ExportedFunctionDefinitions() { + if !strings.HasPrefix(wf.Name(), cqFunctionPrefix) { + continue + } + + table := wf.Name()[len(cqFunctionPrefix):] + stop := 0 + for idx, ch := range table { + if ch == '@' && len(table) > idx+1 && table[idx+1] == '@' { + stop = idx + break + } + } + + if stop == 0 { + continue + } + filterEnd := table[:stop] + + matchingFunctionsWithFilter[wf.Name()] = strings.ReplaceAll(filterEnd, "\"", "") } - fmt.Printf("\n%s exported functions table:\n", path) - for _, fdef := range mod.ExportedFunctionDefinitions() { - fmt.Println(fdef.Name(), fdef.ParamTypes(), fdef.ResultTypes()) + + if len(matchingFunctionsWithFilter) == 0 { + return rec, nil } - fmt.Println() + for name, filter := range matchingFunctionsWithFilter { + if !glob.Glob(filter, table) { + continue + } + + out, err := doTransform(ctx, mod, name, rec) + if err := checkedExitZero(err); err != nil { + return nil, err + } + + if out != nil { + rec = out + } + } + + return rec, nil +} + +func doTransform(ctx context.Context, mod *WazeroModule, function string, rec []byte) ([]byte, error) { rv, err := mod.ExportedFunction("allocate").Call(ctx, uint64(len(rec))) if err != nil { return nil, err @@ -91,13 +131,11 @@ func (w *Wazero) ExecuteModule(ctx context.Context, path string, rec []byte) ([] if !mod.Memory().Write(uint32(rv[0]), rec) { return nil, fmt.Errorf("couldn't write to memory offset: %d", rv[0]) } - ret, err := mod.ExportedFunction("cloudquery_transform").Call(ctx, + ret, err := mod.ExportedFunction(function).Call(ctx, rv[0], uint64(len(rec)), ) - fmt.Println(ret) - if err := checkedExitZero(err); err != nil { return nil, err }