Skip to content

Commit

Permalink
hackathon demo commit
Browse files Browse the repository at this point in the history
  • Loading branch information
savme committed Jun 10, 2024
1 parent fb3d9c2 commit 86a698a
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 171 deletions.
10 changes: 8 additions & 2 deletions cli/cmd/sync_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,16 +271,22 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
syncResponseMsg := r.GetMessage()
switch m := syncResponseMsg.(type) {
case *plugin.Sync_Response_Insert:
record, err := plugin.NewRecordFromBytes(m.Insert.Record)
if err != nil {
return fmt.Errorf("failed to get record from bytes: %w", err)
}

table, _ := record.Schema().Metadata().GetValue("cq:table_name")
for _, mod := range dt.Modules() {
out, err := dt.ExecuteModule(ctx, mod, m.Insert.Record)
out, err := dt.ExecuteModule(ctx, mod, table, m.Insert.Record)
if err != nil {
return err
}

m.Insert.Record = out
}

record, err := plugin.NewRecordFromBytes(m.Insert.Record)
record, err = plugin.NewRecordFromBytes(m.Insert.Record)
if err != nil {
return fmt.Errorf("failed to get record from bytes: %w", err)
}
Expand Down
1 change: 0 additions & 1 deletion cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ require (
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/wasmerio/wasmer-go v1.0.4
github.com/yosssi/ace v0.0.5 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,6 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs=
github.com/wasmerio/wasmer-go v1.0.4/go.mod h1:0gzVdSfg6pysA6QVp6iVRPTagC6Wq9pOE8J86WKb2Fk=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
Expand Down
10 changes: 5 additions & 5 deletions cli/internal/data_transform/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
type DataTransformer interface {
Modules() []string
InitializeModule(context.Context, string) error
ExecuteModule(context.Context, string, []byte) ([]byte, error)
ExecuteModule(context.Context, string, string, []byte) ([]byte, error)

Close() error
}
Expand All @@ -20,9 +20,9 @@ func NewNoopDataTransformer() (*NoopDataTransformer, error) {
return &NoopDataTransformer{}, nil
}

func (n *NoopDataTransformer) Close() error { return nil }
func (n *NoopDataTransformer) ExecuteModule(context.Context, string, []byte) ([]byte, error) {
func (*NoopDataTransformer) Close() error { return nil }
func (*NoopDataTransformer) ExecuteModule(context.Context, string, string, []byte) ([]byte, error) {
return nil, nil
}
func (n *NoopDataTransformer) InitializeModule(context.Context, string) error { return nil }
func (n *NoopDataTransformer) Modules() []string { return []string{} }
func (*NoopDataTransformer) InitializeModule(context.Context, string) error { return nil }
func (*NoopDataTransformer) Modules() []string { return []string{} }
145 changes: 0 additions & 145 deletions cli/internal/data_transform/wasmer.go

This file was deleted.

70 changes: 54 additions & 16 deletions cli/internal/data_transform/wazero.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package datatransform
import (
"context"
"fmt"
"log"
"os"
"strings"

"github.com/cloudquery/plugin-sdk/v4/glob"
"github.com/rs/zerolog/log"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
Expand Down Expand Up @@ -51,12 +53,11 @@ func (w *Wazero) InitializeModule(ctx context.Context, path string) error {
builder.WithFunc(func(ctx context.Context, m api.Module, offset, byteCount uint32) {
buf, ok := m.Memory().Read(offset, byteCount)
if !ok {
log.Panicf("Memory.Read(%d, %d) out of range", offset, byteCount)
log.Panic().Msgf("Memory.Read(%d, %d) out of range", offset, byteCount)
}
fmt.Println("[rust] " + string(buf))
}).Export("log")

host.Instantiate(ctx)
log.Info().Str("wasm_transformer", m.Name()).Msg(string(buf))
}).Export("log")

cm, err := w.InstantiateModule(ctx, compiled, wazero.NewModuleConfig().WithArgs("wasm-transform").WithStderr(os.Stderr).WithStdout(os.Stdout))
if err != nil {
Expand All @@ -67,22 +68,61 @@ func (w *Wazero) InitializeModule(ctx context.Context, path string) error {
return nil
}

func (w *Wazero) ExecuteModule(ctx context.Context, path string, rec []byte) ([]byte, error) {
const cqFunctionPrefix = "_cqtransform_"

func (w *Wazero) ExecuteModule(ctx context.Context, path string, table string, rec []byte) ([]byte, error) {
mod, ok := w.modules[path]
if !ok {
return nil, fmt.Errorf("requested unknown module %s", path)
}

fmt.Printf("%s exported memory table:\n", path)
for _, mdef := range mod.ExportedMemoryDefinitions() {
fmt.Println(mdef.ExportNames()[0])
matchingFunctionsWithFilter := map[string]string{}

for _, wf := range mod.ExportedFunctionDefinitions() {
if !strings.HasPrefix(wf.Name(), cqFunctionPrefix) {
continue
}

table := wf.Name()[len(cqFunctionPrefix):]
stop := 0
for idx, ch := range table {
if ch == '@' && len(table) > idx+1 && table[idx+1] == '@' {
stop = idx
break
}
}

if stop == 0 {
continue
}
filterEnd := table[:stop]

matchingFunctionsWithFilter[wf.Name()] = strings.ReplaceAll(filterEnd, "\"", "")
}
fmt.Printf("\n%s exported functions table:\n", path)
for _, fdef := range mod.ExportedFunctionDefinitions() {
fmt.Println(fdef.Name(), fdef.ParamTypes(), fdef.ResultTypes())

if len(matchingFunctionsWithFilter) == 0 {
return rec, nil
}
fmt.Println()

for name, filter := range matchingFunctionsWithFilter {
if !glob.Glob(filter, table) {
continue
}

out, err := doTransform(ctx, mod, name, rec)
if err := checkedExitZero(err); err != nil {
return nil, err
}

if out != nil {
rec = out
}
}

return rec, nil
}

func doTransform(ctx context.Context, mod *WazeroModule, function string, rec []byte) ([]byte, error) {
rv, err := mod.ExportedFunction("allocate").Call(ctx, uint64(len(rec)))
if err != nil {
return nil, err
Expand All @@ -91,13 +131,11 @@ func (w *Wazero) ExecuteModule(ctx context.Context, path string, rec []byte) ([]
if !mod.Memory().Write(uint32(rv[0]), rec) {
return nil, fmt.Errorf("couldn't write to memory offset: %d", rv[0])
}
ret, err := mod.ExportedFunction("cloudquery_transform").Call(ctx,
ret, err := mod.ExportedFunction(function).Call(ctx,
rv[0],
uint64(len(rec)),
)

fmt.Println(ret)

if err := checkedExitZero(err); err != nil {
return nil, err
}
Expand Down

0 comments on commit 86a698a

Please sign in to comment.