Skip to content

Commit

Permalink
wasm support
Browse files Browse the repository at this point in the history
  • Loading branch information
savme committed Jun 5, 2024
1 parent d280509 commit a9f0c26
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 2 deletions.
23 changes: 22 additions & 1 deletion cli/cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

apiAuth "github.com/cloudquery/cloudquery-api-go/auth"
"github.com/cloudquery/cloudquery/cli/internal/auth"
datatransform "github.com/cloudquery/cloudquery/cli/internal/data_transform"
"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
"github.com/cloudquery/plugin-pb-go/managedplugin"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -106,6 +107,8 @@ func sync(cmd *cobra.Command, args []string) error {

sources := specReader.Sources
destinations := specReader.Destinations
transforms := specReader.Transforms

sourcePluginClients := make(managedplugin.Clients, 0)
defer func() {
if err := sourcePluginClients.Terminate(); err != nil {
Expand All @@ -124,6 +127,24 @@ func sync(cmd *cobra.Command, args []string) error {
// in a cloud sync environment, we pass only the relevant environment variables to the plugin
osEnviron := os.Environ()

var dt datatransform.DataTransformer
dt, _ = datatransform.NewNoopDataTransformer()
if len(transforms) > 0 {
wasm, err := datatransform.NewWasmerDataTransformer()
if err != nil {
return fmt.Errorf("failed to initialize a wasm engine: %w", err)
}

for _, spec := range transforms {
if err := wasm.InitializeModule(spec.Path); err != nil {
return err
}
}

dt = wasm
}
defer dt.Close()

for _, source := range sources {
opts := []managedplugin.Option{
managedplugin.WithLogger(log.Logger),
Expand Down Expand Up @@ -265,7 +286,7 @@ func sync(cmd *cobra.Command, args []string) error {
return err
}

if err := syncConnectionV3(ctx, src, dests, backend, invocationUUID.String(), noMigrate, summaryLocation); err != nil {
if err := syncConnectionV3(ctx, src, dests, backend, dt, invocationUUID.String(), noMigrate, summaryLocation); err != nil {
return fmt.Errorf("failed to sync v3 source %s: %w", cl.Name(), err)
}

Expand Down
12 changes: 11 additions & 1 deletion cli/cmd/sync_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/apache/arrow/go/v16/arrow"
"github.com/cloudquery/cloudquery-api-go/auth"
"github.com/cloudquery/cloudquery/cli/internal/api"
datatransform "github.com/cloudquery/cloudquery/cli/internal/data_transform"
"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
"github.com/cloudquery/cloudquery/cli/internal/transformer"
"github.com/cloudquery/plugin-pb-go/managedplugin"
Expand Down Expand Up @@ -52,7 +53,7 @@ func getProgressAPIClient() (*cloudquery_api.ClientWithResponses, error) {
}

// nolint:dupl
func syncConnectionV3(ctx context.Context, source v3source, destinations []v3destination, backend *v3destination, uid string, noMigrate bool, summaryLocation string) error {
func syncConnectionV3(ctx context.Context, source v3source, destinations []v3destination, backend *v3destination, dt datatransform.DataTransformer, uid string, noMigrate bool, summaryLocation string) error {
var mt metrics.Metrics
var exitReason = ExitReasonStopped
tablesForDeleteStale := make(map[string]bool, 0)
Expand Down Expand Up @@ -275,6 +276,15 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
return fmt.Errorf("failed to get record from bytes: %w", err)
}

fmt.Printf("%T\n", dt)
fmt.Println(dt.Modules())
for _, mod := range dt.Modules() {
fmt.Printf("mod=%s dt=%T\n", mod, dt)
if err := dt.ExecuteModule(mod); err != nil {
return err
}
}

atomic.AddInt64(&newResources, record.NumRows())
atomic.AddInt64(&totalResources, record.NumRows())
if remoteProgressReporter != nil {
Expand Down
1 change: 1 addition & 0 deletions cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ require (
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/wasmerio/wasmer-go v1.0.4
github.com/yosssi/ace v0.0.5 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs=
github.com/wasmerio/wasmer-go v1.0.4/go.mod h1:0gzVdSfg6pysA6QVp6iVRPTagC6Wq9pOE8J86WKb2Fk=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
Expand Down
22 changes: 22 additions & 0 deletions cli/internal/data_transform/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package datatransform

type DataTransformer interface {
Modules() []string
InitializeModule(string) error
ExecuteModule(string) error

Close() error
}

type NoopDataTransformer struct{}

var _ DataTransformer = (*NoopDataTransformer)(nil)

func NewNoopDataTransformer() (*NoopDataTransformer, error) {
return &NoopDataTransformer{}, nil
}

func (n *NoopDataTransformer) Close() error { return nil }

Check warning on line 19 in cli/internal/data_transform/transform.go

View workflow job for this annotation

GitHub Actions / cli (ubuntu-latest)

unused-receiver: method receiver 'n' is not referenced in method's body, consider removing or renaming it as _ (revive)
func (n *NoopDataTransformer) ExecuteModule(string) error { return nil }

Check warning on line 20 in cli/internal/data_transform/transform.go

View workflow job for this annotation

GitHub Actions / cli (ubuntu-latest)

unused-receiver: method receiver 'n' is not referenced in method's body, consider removing or renaming it as _ (revive)
func (n *NoopDataTransformer) InitializeModule(string) error { return nil }

Check warning on line 21 in cli/internal/data_transform/transform.go

View workflow job for this annotation

GitHub Actions / cli (ubuntu-latest)

unused-receiver: method receiver 'n' is not referenced in method's body, consider removing or renaming it as _ (revive)
func (n *NoopDataTransformer) Modules() []string { return []string{} }
139 changes: 139 additions & 0 deletions cli/internal/data_transform/wasmer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package datatransform

import (
"errors"
"fmt"
"os"

"github.com/wasmerio/wasmer-go/wasmer"
"golang.org/x/exp/maps"
)

type Wasmer struct {
store *wasmer.Store
modules map[string]*WasiModule
}

type WasiModule struct {
*wasmer.Instance
*wasmer.WasiEnvironment
}

var _ DataTransformer = (*Wasmer)(nil)

func getRuntimeEngineKind() (wasmer.EngineKind, error) {
if wasmer.IsEngineAvailable(wasmer.UNIVERSAL) {
return wasmer.UNIVERSAL, nil
}

if wasmer.IsEngineAvailable(wasmer.DYLIB) {
return wasmer.DYLIB, nil
}

return wasmer.EngineKind(0), errors.New("no available wasm engines")
}

func getRuntimeCompilerKind() (wasmer.CompilerKind, error) {
compilersByPriority := []wasmer.CompilerKind{
wasmer.CRANELIFT,
wasmer.LLVM,
wasmer.SINGLEPASS,
}

for _, kind := range compilersByPriority {
if !wasmer.IsCompilerAvailable(kind) {
continue
}

return kind, nil
}

return wasmer.CompilerKind(0), errors.New("no available wasm compilers")
}

func NewWasmerDataTransformer() (*Wasmer, error) {
engineKind, err := getRuntimeEngineKind()
if err != nil {
return nil, err
}
compilerKind, err := getRuntimeCompilerKind()
if err != nil {
return nil, err
}

engineConfig := wasmer.NewConfig()
switch engineKind {
case wasmer.DYLIB:
engineConfig.UseDylibEngine()
default:
engineConfig.UseJITEngine()
}

switch compilerKind {
case wasmer.CRANELIFT:
engineConfig.UseCraneliftCompiler()
case wasmer.LLVM:
engineConfig.UseLLVMCompiler()
default:
engineConfig.UseSinglepassCompiler()
}

engine := wasmer.NewEngineWithConfig(engineConfig)
store := wasmer.NewStore(engine)
return &Wasmer{store: store, modules: map[string]*WasiModule{}}, nil
}

func (w *Wasmer) Close() error {
w.store.Close()
return nil
}

func (w *Wasmer) Modules() []string { return maps.Keys(w.modules) }

func (w *Wasmer) InitializeModule(path string) error {
content, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("unable to load %s: %w", path, err)
}

mod, err := wasmer.NewModule(w.store, content)
if err != nil {
return err
}

env, err := wasmer.NewWasiStateBuilder(path).CaptureStderr().CaptureStdout().Finalize()
if err != nil {
return err
}
imports, err := env.GenerateImportObject(w.store, mod)
if err != nil {
return err
}

instance, err := wasmer.NewInstance(mod, imports)
if err != nil {
return err
}

w.modules[path] = &WasiModule{Instance: instance, WasiEnvironment: env}
return nil
}

func (w *Wasmer) ExecuteModule(path string) error {
mod, ok := w.modules[path]
if !ok {
return fmt.Errorf("requested unknown module %s", path)
}

start, err := mod.Exports.GetWasiStartFunction()
if err != nil {
return err
}
rv, err := start()
fmt.Println(rv, string(mod.ReadStdout()), string(mod.ReadStderr()))

// if _, ok := err.(*wasmer.TrapError); ok {
// return nil
// }
return err
}
49 changes: 49 additions & 0 deletions cli/internal/data_transform/wazero.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package datatransform

// type Wazero struct {
// wazero.Runtime
// modules map[string]wazero.CompiledModule
// }

// var _ DataTransformer = (*Wazero)(nil)

// func NewWazeroDataTransform(ctx context.Context) (*Wazero, error) {
// cfg := wazero.NewRuntimeConfig().WithCloseOnContextDone(true)
// rt := wazero.NewRuntimeWithConfig(ctx, cfg)

// _, err := wasi_snapshot_preview1.Instantiate(ctx, rt)
// if err != nil {
// return nil, err
// }

// return &Wazero{Runtime: rt, modules: map[string]wazero.CompiledModule{}}, nil
// }

// func (w *Wazero) Close() error { return w.Close() }
// func (w *Wazero) Modules() []string { return maps.Keys(w.modules) }

// func (w *Wazero) InitializeModule(ctx context.Context, path string) error {
// content, err := os.ReadFile(path)
// if err != nil {
// return fmt.Errorf("unable to load %s: %w", path, err)
// }
// mod, err := w.CompileModule(ctx, content)
// if err != nil {
// return err
// }
// w.modules[path] = mod
// return nil
// }

// func (w *Wazero) ExecuteModule(path string) error {
// mod, ok := w.modules[path]
// if !ok {
// return fmt.Errorf("requested unknown module %s", path)
// }

// fn, ok := mod.ExportedFunctions()["main"]
// if !ok {
// return fmt.Errorf("requested unknown function %s in module %s", "main", path)
// }

// }
3 changes: 3 additions & 0 deletions cli/internal/specs/v0/schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions cli/internal/specs/v0/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "errors"

type Transform struct {
Name string `json:"name" jsonschema:"required,minLength=1"`
Path string `json:"path"`
}

func (*Transform) GetWarnings() Warnings {
Expand All @@ -18,5 +19,9 @@ func (t *Transform) Validate() error {
return errors.New("name is required")
}

if t.Path == "" {
return errors.New("path to a wasm module is required")
}

return nil
}
26 changes: 26 additions & 0 deletions cli/test-transform-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
kind: source
spec:
name: "aws"
path: cloudquery/aws
registry: cloudquery
version: v26.10.0
destinations: ["postgresql"]
tables: ["*s3_bucket*"]
spec:
accounts:
- id: "615713231484"
---
kind: destination
spec:
name: "postgresql"
path: cloudquery/postgresql
registry: cloudquery
version: v8.0.5
write_mode: "overwrite-delete-stale" # overwrite-delete-stale, overwrite, append
spec:
connection_string: "postgresql://postgres:12345@localhost:5432/postgres?sslmode=disable"
---
kind: transform
spec:
name: test
path: /Users/alex/Developer/scratch/transform/main.wasm

0 comments on commit a9f0c26

Please sign in to comment.