From 91cb36981377292de7ef1d81b8a6526b9193b4e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lovro=20Ma=C5=BEgon?= Date: Thu, 15 Feb 2024 21:21:00 +0100 Subject: [PATCH] add utilities for running processor examples and collecting specifications --- .../builtin/examples_exporter_test.go | 88 +++++++ pkg/plugin/processor/builtin/examples_test.go | 115 ++++++++ .../processor/builtin/internal/diff/README.md | 2 +- .../builtin/internal/diff/myers/diff.go | 246 ------------------ .../builtin/internal/diff/myers/diff_test.go | 16 -- pkg/plugin/processor/builtin/processors.json | 1 + 6 files changed, 205 insertions(+), 263 deletions(-) create mode 100644 pkg/plugin/processor/builtin/examples_exporter_test.go create mode 100644 pkg/plugin/processor/builtin/examples_test.go delete mode 100644 pkg/plugin/processor/builtin/internal/diff/myers/diff.go delete mode 100644 pkg/plugin/processor/builtin/internal/diff/myers/diff_test.go create mode 100644 pkg/plugin/processor/builtin/processors.json diff --git a/pkg/plugin/processor/builtin/examples_exporter_test.go b/pkg/plugin/processor/builtin/examples_exporter_test.go new file mode 100644 index 000000000..9307e6957 --- /dev/null +++ b/pkg/plugin/processor/builtin/examples_exporter_test.go @@ -0,0 +1,88 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build export_processors + +package builtin + +import ( + "io" + "log" + "os" + "sort" + "strings" + "testing" + + "github.com/goccy/go-json" +) + +func TestMain(m *testing.M) { + code := m.Run() + if code > 0 { + os.Exit(code) + } + + // tests passed, export the processors + const outputFile = "processors.json" + + f, err := os.OpenFile(outputFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600) + if err != nil { + log.Fatalf("failed to open %s: %v", outputFile, err) + } + defer f.Close() + + exportProcessors(f) +} + +func exportProcessors(output io.Writer) { + sorted := sortProcessors(processors) + + bytes, err := json.MarshalIndent(sorted, "", " ") + if err != nil { + log.Fatalf("failed to marshal processors to JSON: %v", err) + } + + _, err = output.Write(bytes) + if err != nil { + log.Fatalf("failed to write processors to output: %v", err) + } +} + +func sortProcessors(processors map[string]*procInfo) []*procInfo { + names := make([]string, 0, len(processors)) + for k, _ := range processors { + names = append(names, k) + } + sort.Strings(names) + + sorted := make([]*procInfo, len(names)) + for i, name := range names { + // also sort examples for each processor + proc := processors[name] + proc.Examples = sortExamples(proc.Examples) + sorted[i] = proc + } + + return sorted +} + +func sortExamples(examples []example) []example { + sort.Slice(examples, func(i, j int) bool { + if examples[i].Order != examples[j].Order { + return examples[i].Order < examples[j].Order + } + return strings.Compare(examples[i].Description, examples[j].Description) < 0 + }) + return examples +} diff --git a/pkg/plugin/processor/builtin/examples_test.go b/pkg/plugin/processor/builtin/examples_test.go new file mode 100644 index 000000000..e0d20dd32 --- /dev/null +++ b/pkg/plugin/processor/builtin/examples_test.go @@ -0,0 +1,115 @@ +// Copyright © 2024 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:generate go test -count=1 -tags export_processors . + +package builtin + +import ( + "context" + "encoding/json" + "fmt" + "log" + + "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-processor-sdk" + "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/diff" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +// -- HELPERS ------------------------------------------------------------------ + +var processors = map[string]*procInfo{} + +type procInfo struct { + Specification sdk.Specification `json:"specification"` + Examples []example `json:"examples"` +} + +type example struct { + // Order is an optional field that is used to order examples in the + // documentation. If omitted, the example will be ordered by description. + Order int `json:"-"` + Description string `json:"description"` + Config map[string]string `json:"config"` + Have opencdc.Record `json:"have"` + Want sdk.ProcessedRecord `json:"want"` +} + +// RunExample runs the given example with the given processor and logs the +// result. It is intended to be used in example functions. Additionally, it +// stores the processor specification and example in a global map so it can be +// used to generate documentation. +func RunExample(p sdk.Processor, e example) { + spec, err := p.Specification() + if err != nil { + log.Fatalf("failed to fetch specification: %v", err) + } + + pi, ok := processors[spec.Name] + if !ok { + pi = &procInfo{Specification: spec} + processors[spec.Name] = pi + } + + ctx := context.Background() + err = p.Configure(ctx, e.Config) + if err != nil { + log.Fatalf("failed to configure processor: %v", err) + } + + err = p.Open(ctx) + if err != nil { + log.Fatalf("failed to open processor: %v", err) + } + + got := p.Process(ctx, []opencdc.Record{e.Have.Clone()}) + if len(got) != 1 { + log.Fatalf("expected 1 record to be returned, got %d", len(got)) + } + + if d := cmp.Diff(e.Want, got[0], cmpopts.IgnoreUnexported(sdk.SingleRecord{})); d != "" { + log.Fatalf("processed record did not match expectation:\n%v", d) + } + + switch rec := got[0].(type) { + case sdk.SingleRecord: + // produce JSON diff + havePrettyJson, err := json.MarshalIndent(e.Have, "", " ") + if err != nil { + log.Fatalf("failed to marshal test record to JSON: %v", err) + } + + gotPrettyJson, err := json.MarshalIndent(rec, "", " ") + if err != nil { + log.Fatalf("failed to marshal processed record to JSON: %v", err) + } + + edits := diff.Strings(string(havePrettyJson), string(gotPrettyJson)) + unified, err := diff.ToUnified("before", "after", string(havePrettyJson)+"\n", edits, 100) + if err != nil { + log.Fatalf("failed to produce unified diff: %v", err) + } + + fmt.Printf("processor transformed record:\n%s\n", unified) + case sdk.FilterRecord: + fmt.Println("processor filtered record out") + case sdk.ErrorRecord: + fmt.Printf("processor returned error: %s\n", rec.Error) + } + + // append example to processor + pi.Examples = append(pi.Examples, e) +} diff --git a/pkg/plugin/processor/builtin/internal/diff/README.md b/pkg/plugin/processor/builtin/internal/diff/README.md index 09985b6c8..3a57a36db 100644 --- a/pkg/plugin/processor/builtin/internal/diff/README.md +++ b/pkg/plugin/processor/builtin/internal/diff/README.md @@ -3,7 +3,7 @@ This package contains code taken from https://github.com/golang/tools/tree/master/internal/diff on February 15th, 2024. We need the code to create a unified diff between two strings. -The code is left as-is, except two changes: +The code is left as-is, except 3 changes: - The imports were changed to reference the Conduit module path. This was done using the following command: diff --git a/pkg/plugin/processor/builtin/internal/diff/myers/diff.go b/pkg/plugin/processor/builtin/internal/diff/myers/diff.go deleted file mode 100644 index d2b8d1ee6..000000000 --- a/pkg/plugin/processor/builtin/internal/diff/myers/diff.go +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright 2019 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package myers implements the Myers diff algorithm. -package myers - -import ( - "strings" - - "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/diff" -) - -// Sources: -// https://blog.jcoglan.com/2017/02/17/the-myers-diff-algorithm-part-3/ -// https://www.codeproject.com/Articles/42279/%2FArticles%2F42279%2FInvestigating-Myers-diff-algorithm-Part-1-of-2 - -// ComputeEdits returns the diffs of two strings using a simple -// line-based implementation, like [diff.Strings]. -// -// Deprecated: this implementation is moribund. However, when diffs -// appear in marker test expectations, they are the particular diffs -// produced by this implementation. The marker test framework -// asserts diff(orig, got)==wantDiff, but ideally it would compute -// got==apply(orig, wantDiff) so that the notation of the diff -// is immaterial. -func ComputeEdits(before, after string) []diff.Edit { - beforeLines := splitLines(before) - ops := operations(beforeLines, splitLines(after)) - - // Build a table mapping line number to offset. - lineOffsets := make([]int, 0, len(beforeLines)+1) - total := 0 - for i := range beforeLines { - lineOffsets = append(lineOffsets, total) - total += len(beforeLines[i]) - } - lineOffsets = append(lineOffsets, total) // EOF - - edits := make([]diff.Edit, 0, len(ops)) - for _, op := range ops { - start, end := lineOffsets[op.I1], lineOffsets[op.I2] - switch op.Kind { - case opDelete: - // Delete: before[I1:I2] is deleted. - edits = append(edits, diff.Edit{Start: start, End: end}) - case opInsert: - // Insert: after[J1:J2] is inserted at before[I1:I1]. - if content := strings.Join(op.Content, ""); content != "" { - edits = append(edits, diff.Edit{Start: start, End: end, New: content}) - } - } - } - return edits -} - -// opKind is used to denote the type of operation a line represents. -type opKind int - -const ( - opDelete opKind = iota // line deleted from input (-) - opInsert // line inserted into output (+) - opEqual // line present in input and output -) - -func (kind opKind) String() string { - switch kind { - case opDelete: - return "delete" - case opInsert: - return "insert" - case opEqual: - return "equal" - default: - panic("unknown opKind") - } -} - -type operation struct { - Kind opKind - Content []string // content from b - I1, I2 int // indices of the line in a - J1 int // indices of the line in b, J2 implied by len(Content) -} - -// operations returns the list of operations to convert a into b, consolidating -// operations for multiple lines and not including equal lines. -func operations(a, b []string) []*operation { - if len(a) == 0 && len(b) == 0 { - return nil - } - - trace, offset := shortestEditSequence(a, b) - snakes := backtrack(trace, len(a), len(b), offset) - - M, N := len(a), len(b) - - var i int - solution := make([]*operation, len(a)+len(b)) - - add := func(op *operation, i2, j2 int) { - if op == nil { - return - } - op.I2 = i2 - if op.Kind == opInsert { - op.Content = b[op.J1:j2] - } - solution[i] = op - i++ - } - x, y := 0, 0 - for _, snake := range snakes { - if len(snake) < 2 { - continue - } - var op *operation - // delete (horizontal) - for snake[0]-snake[1] > x-y { - if op == nil { - op = &operation{ - Kind: opDelete, - I1: x, - J1: y, - } - } - x++ - if x == M { - break - } - } - add(op, x, y) - op = nil - // insert (vertical) - for snake[0]-snake[1] < x-y { - if op == nil { - op = &operation{ - Kind: opInsert, - I1: x, - J1: y, - } - } - y++ - } - add(op, x, y) - op = nil - // equal (diagonal) - for x < snake[0] { - x++ - y++ - } - if x >= M && y >= N { - break - } - } - return solution[:i] -} - -// backtrack uses the trace for the edit sequence computation and returns the -// "snakes" that make up the solution. A "snake" is a single deletion or -// insertion followed by zero or diagonals. -func backtrack(trace [][]int, x, y, offset int) [][]int { - snakes := make([][]int, len(trace)) - d := len(trace) - 1 - for ; x > 0 && y > 0 && d > 0; d-- { - V := trace[d] - if len(V) == 0 { - continue - } - snakes[d] = []int{x, y} - - k := x - y - - var kPrev int - if k == -d || (k != d && V[k-1+offset] < V[k+1+offset]) { - kPrev = k + 1 - } else { - kPrev = k - 1 - } - - x = V[kPrev+offset] - y = x - kPrev - } - if x < 0 || y < 0 { - return snakes - } - snakes[d] = []int{x, y} - return snakes -} - -// shortestEditSequence returns the shortest edit sequence that converts a into b. -func shortestEditSequence(a, b []string) ([][]int, int) { - M, N := len(a), len(b) - V := make([]int, 2*(N+M)+1) - offset := N + M - trace := make([][]int, N+M+1) - - // Iterate through the maximum possible length of the SES (N+M). - for d := 0; d <= N+M; d++ { - copyV := make([]int, len(V)) - // k lines are represented by the equation y = x - k. We move in - // increments of 2 because end points for even d are on even k lines. - for k := -d; k <= d; k += 2 { - // At each point, we either go down or to the right. We go down if - // k == -d, and we go to the right if k == d. We also prioritize - // the maximum x value, because we prefer deletions to insertions. - var x int - if k == -d || (k != d && V[k-1+offset] < V[k+1+offset]) { - x = V[k+1+offset] // down - } else { - x = V[k-1+offset] + 1 // right - } - - y := x - k - - // Diagonal moves while we have equal contents. - for x < M && y < N && a[x] == b[y] { - x++ - y++ - } - - V[k+offset] = x - - // Return if we've exceeded the maximum values. - if x == M && y == N { - // Makes sure to save the state of the array before returning. - copy(copyV, V) - trace[d] = copyV - return trace, offset - } - } - - // Save the state of the array. - copy(copyV, V) - trace[d] = copyV - } - return nil, 0 -} - -func splitLines(text string) []string { - lines := strings.SplitAfter(text, "\n") - if lines[len(lines)-1] == "" { - lines = lines[:len(lines)-1] - } - return lines -} diff --git a/pkg/plugin/processor/builtin/internal/diff/myers/diff_test.go b/pkg/plugin/processor/builtin/internal/diff/myers/diff_test.go deleted file mode 100644 index 98fb250c9..000000000 --- a/pkg/plugin/processor/builtin/internal/diff/myers/diff_test.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2019 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package myers_test - -import ( - "testing" - - "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/diff/difftest" - "github.com/conduitio/conduit/pkg/plugin/processor/builtin/internal/diff/myers" -) - -func TestDiff(t *testing.T) { - difftest.DiffTest(t, myers.ComputeEdits) -} diff --git a/pkg/plugin/processor/builtin/processors.json b/pkg/plugin/processor/builtin/processors.json new file mode 100644 index 000000000..0637a088a --- /dev/null +++ b/pkg/plugin/processor/builtin/processors.json @@ -0,0 +1 @@ +[] \ No newline at end of file