Skip to content

Commit

Permalink
JS filters (#486)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Jun 27, 2022
1 parent 092b637 commit 20cd1e5
Show file tree
Hide file tree
Showing 51 changed files with 2,693 additions and 2,455 deletions.
6 changes: 3 additions & 3 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ import (
"google.golang.org/grpc/stats"
"gopkg.in/tomb.v2"

// NB: anonymous import triggers transform registry creation
_ "github.com/conduitio/conduit/pkg/processor/transform/txfbuiltin"
_ "github.com/conduitio/conduit/pkg/processor/transform/txfjs"
// NB: anonymous import triggers processor registry creation
_ "github.com/conduitio/conduit/pkg/processor/procbuiltin"
_ "github.com/conduitio/conduit/pkg/processor/procjs"
)

const (
Expand Down
4 changes: 2 additions & 2 deletions pkg/foundation/metrics/measure/measure.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ var (
"Number of connectors by type.",
[]string{"type"})
ProcessorsGauge = metrics.NewLabeledGauge("conduit_processors",
"Number of processors by type.",
[]string{"processor", "type"})
"Number of processors.",
[]string{"processor"})

ConnectorBytesHistogram = metrics.NewLabeledHistogram("conduit_connector_bytes",
"Number of bytes a connector processed by pipeline name, plugin and type (source, destination).",
Expand Down
8 changes: 4 additions & 4 deletions pkg/orchestrator/mock/orchestrator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type ConnectorService interface {
type ProcessorService interface {
List(ctx context.Context) map[string]*processor.Instance
Get(ctx context.Context, id string) (*processor.Instance, error)
Create(ctx context.Context, id string, name string, t processor.Type, parent processor.Parent, cfg processor.Config) (*processor.Instance, error)
Create(ctx context.Context, id string, name string, parent processor.Parent, cfg processor.Config) (*processor.Instance, error)
Update(ctx context.Context, id string, cfg processor.Config) (*processor.Instance, error)
Delete(ctx context.Context, id string) error
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/orchestrator/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type ProcessorOrchestrator base
func (p *ProcessorOrchestrator) Create(
ctx context.Context,
name string,
t processor.Type,
parent processor.Parent,
cfg processor.Config,
) (*processor.Instance, error) {
Expand All @@ -52,7 +51,7 @@ func (p *ProcessorOrchestrator) Create(
}

// create processor and add to pipeline or connector
proc, err := p.processors.Create(ctx, uuid.NewString(), name, t, parent, cfg)
proc, err := p.processors.Create(ctx, uuid.NewString(), name, parent, cfg)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -172,7 +171,7 @@ func (p *ProcessorOrchestrator) Delete(ctx context.Context, id string) error {
return err
}
r.Append(func() error {
_, err = p.processors.Create(ctx, id, proc.Name, processor.TypeTransform, proc.Parent, proc.Config)
_, err = p.processors.Create(ctx, id, proc.Name, proc.Parent, proc.Config)
return err
})

Expand Down
20 changes: 7 additions & 13 deletions pkg/orchestrator/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func TestProcessorOrchestrator_CreateOnPipeline_Success(t *testing.T) {
gomock.AssignableToTypeOf(ctxType),
gomock.AssignableToTypeOf(""),
want.Name,
processor.TypeTransform,
want.Parent,
want.Config,
).
Expand All @@ -69,7 +68,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_Success(t *testing.T) {
Return(pl, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, want.Name, processor.TypeTransform, want.Parent, want.Config)
got, err := orc.Processors.Create(ctx, want.Name, want.Parent, want.Config)
assert.Ok(t, err)
assert.Equal(t, want, got)
}
Expand All @@ -89,7 +88,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_PipelineNotExist(t *testing.T) {
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, "test-processor", processor.TypeTransform, parent, processor.Config{})
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{})
assert.Error(t, err)
assert.True(t, cerrors.Is(err, wantErr), "errors did not match")
assert.Nil(t, got)
Expand All @@ -113,7 +112,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_PipelineRunning(t *testing.T) {
Return(pl, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, "test-processor", processor.TypeTransform, parent, processor.Config{})
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{})
assert.Error(t, err)
assert.Equal(t, pipeline.ErrPipelineRunning, err)
assert.Nil(t, got)
Expand Down Expand Up @@ -142,14 +141,13 @@ func TestProcessorOrchestrator_CreateOnPipeline_CreateProcessorError(t *testing.
gomock.AssignableToTypeOf(ctxType),
gomock.AssignableToTypeOf(""),
"test-processor",
processor.TypeTransform,
parent,
processor.Config{},
).
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, "test-processor", processor.TypeTransform, parent, processor.Config{})
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{})
assert.Error(t, err)
assert.True(t, cerrors.Is(err, wantErr), "errors did not match")
assert.Nil(t, got)
Expand Down Expand Up @@ -185,7 +183,6 @@ func TestProcessorOrchestrator_CreateOnPipeline_AddProcessorError(t *testing.T)
gomock.AssignableToTypeOf(ctxType),
gomock.AssignableToTypeOf(""),
proc.Name,
processor.TypeTransform,
proc.Parent,
proc.Config,
).
Expand All @@ -199,7 +196,7 @@ func TestProcessorOrchestrator_CreateOnPipeline_AddProcessorError(t *testing.T)
Return(nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, proc.Name, processor.TypeTransform, proc.Parent, proc.Config)
got, err := orc.Processors.Create(ctx, proc.Name, proc.Parent, proc.Config)
assert.Error(t, err)
assert.True(t, cerrors.Is(err, wantErr), "errors did not match")
assert.Nil(t, got)
Expand Down Expand Up @@ -239,7 +236,6 @@ func TestProcessorOrchestrator_CreateOnConnector_Success(t *testing.T) {
gomock.AssignableToTypeOf(ctxType),
gomock.AssignableToTypeOf(""),
want.Name,
processor.TypeTransform,
want.Parent,
want.Config,
).
Expand All @@ -249,7 +245,7 @@ func TestProcessorOrchestrator_CreateOnConnector_Success(t *testing.T) {
Return(conn, nil)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, want.Name, processor.TypeTransform, want.Parent, want.Config)
got, err := orc.Processors.Create(ctx, want.Name, want.Parent, want.Config)
assert.Ok(t, err)
assert.Equal(t, want, got)
}
Expand All @@ -269,7 +265,7 @@ func TestProcessorOrchestrator_CreateOnConnector_ConnectorNotExist(t *testing.T)
Return(nil, wantErr)

orc := NewOrchestrator(db, log.Nop(), plsMock, consMock, procsMock, pluginMock)
got, err := orc.Processors.Create(ctx, "test-processor", processor.TypeTransform, parent, processor.Config{})
got, err := orc.Processors.Create(ctx, "test-processor", parent, processor.Config{})
assert.Error(t, err)
assert.True(t, cerrors.Is(err, wantErr), "errors did not match")
assert.Nil(t, got)
Expand Down Expand Up @@ -677,7 +673,6 @@ func TestProcessorOrchestrator_DeleteOnPipeline_RemoveProcessorFail(t *testing.T
gomock.AssignableToTypeOf(ctxType),
want.ID,
want.Name,
processor.TypeTransform,
want.Parent,
want.Config,
).
Expand Down Expand Up @@ -733,7 +728,6 @@ func TestProcessorOrchestrator_DeleteOnConnector_Fail(t *testing.T) {
gomock.AssignableToTypeOf(ctxType),
want.ID,
want.Name,
processor.TypeTransform,
want.Parent,
want.Config,
).
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/stream/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

type ProcessorNode struct {
Name string
Processor processor.Processor
Processor processor.Interface
ProcessorTimer metrics.Timer

base pubSubNodeBase
Expand All @@ -51,7 +51,7 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
}

executeTime := time.Now()
rec, err := n.Processor.Execute(msg.Ctx, msg.Record)
rec, err := n.Processor.Process(msg.Ctx, msg.Record)
n.ProcessorTimer.Update(time.Since(executeTime))
if err != nil {
// Check for Skipped records
Expand All @@ -66,7 +66,7 @@ func (n *ProcessorNode) Run(ctx context.Context) error {
err = msg.Nack(err)
if err != nil {
msg.Drop()
return cerrors.Errorf("error applying transform: %w", err)
return cerrors.Errorf("failed to execute processor: %w", err)
}
// nack was handled successfully, we recovered
continue
Expand Down
12 changes: 6 additions & 6 deletions pkg/pipeline/stream/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestProcessorNode_Success(t *testing.T) {
processor := mock.NewProcessor(ctrl)
processor.
EXPECT().
Execute(ctx, wantRec).
Process(ctx, wantRec).
DoAndReturn(func(_ context.Context, got record.Record) (record.Record, error) {
got.Position = newPosition
return got, nil
Expand Down Expand Up @@ -97,7 +97,7 @@ func TestProcessorNode_ErrorWithoutNackHandler(t *testing.T) {

wantErr := cerrors.New("something bad happened")
processor := mock.NewProcessor(ctrl)
processor.EXPECT().Execute(ctx, gomock.Any()).Return(record.Record{}, wantErr)
processor.EXPECT().Process(ctx, gomock.Any()).Return(record.Record{}, wantErr)

n := ProcessorNode{
Name: "test",
Expand All @@ -117,7 +117,7 @@ func TestProcessorNode_ErrorWithoutNackHandler(t *testing.T) {
}()

err := n.Run(ctx)
assert.True(t, cerrors.Is(err, wantErr), "expected underlying error to be the transform error")
assert.True(t, cerrors.Is(err, wantErr), "expected underlying error to be the processor error")

// after the node stops the out channel should be closed
_, ok := <-out
Expand All @@ -130,7 +130,7 @@ func TestProcessorNode_ErrorWithNackHandler(t *testing.T) {

wantErr := cerrors.New("something bad happened")
processor := mock.NewProcessor(ctrl)
processor.EXPECT().Execute(ctx, gomock.Any()).Return(record.Record{}, wantErr)
processor.EXPECT().Process(ctx, gomock.Any()).Return(record.Record{}, wantErr)

n := ProcessorNode{
Name: "test",
Expand All @@ -144,7 +144,7 @@ func TestProcessorNode_ErrorWithNackHandler(t *testing.T) {

msg := &Message{Ctx: ctx}
msg.RegisterNackHandler(func(msg *Message, err error, next NackHandler) error {
assert.True(t, cerrors.Is(err, wantErr), "expected underlying error to be the transform error")
assert.True(t, cerrors.Is(err, wantErr), "expected underlying error to be the processor error")
return next(msg, err) // the error should be regarded as handled
})
go func() {
Expand All @@ -168,7 +168,7 @@ func TestProcessorNode_Skip(t *testing.T) {

// create a dummy processor
proc := mock.NewProcessor(ctrl)
proc.EXPECT().Execute(ctx, gomock.Any()).Return(record.Record{}, processor.ErrSkipRecord)
proc.EXPECT().Process(ctx, gomock.Any()).Return(record.Record{}, processor.ErrSkipRecord)

n := ProcessorNode{
Name: "test",
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,9 @@ func printerDestination(ctrl *gomock.Controller, logger log.CtxLogger, nodeID st
return destination
}

func counterProcessor(ctrl *gomock.Controller, count *int) processor.Processor {
func counterProcessor(ctrl *gomock.Controller, count *int) processor.Interface {
proc := procmock.NewProcessor(ctrl)
proc.EXPECT().Execute(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r record.Record) (record.Record, error) {
proc.EXPECT().Process(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, r record.Record) (record.Record, error) {
*count++
return r, nil
}).AnyTimes()
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
var GlobalBuilderRegistry = NewBuilderRegistry()

// Builder parses the config and if valid returns a processor, an error otherwise.
type Builder func(Config) (Processor, error)
type Builder func(Config) (Interface, error)

// BuilderRegistry is a registry for registering or looking up processor
// builders. The Register and Get methods are safe for concurrent use.
Expand Down
2 changes: 1 addition & 1 deletion pkg/processor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package processor
import "github.com/conduitio/conduit/pkg/foundation/cerrors"

var (
// ErrSkipRecord is passed by a Processor when it should Ack and skip a Record.
// ErrSkipRecord is passed by a processor when it should Ack and skip a Record.
// It must be separate from a plain error so that we continue instead of marking
// the Pipeline status as degraded.
ErrSkipRecord = cerrors.New("record skipped")
Expand Down
30 changes: 13 additions & 17 deletions pkg/processor/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate mockgen -destination=mock/processor.go -package=mock -mock_names=Processor=Processor . Processor
//go:generate stringer -type=Type -trimprefix Type
//go:generate mockgen -destination=mock/processor.go -package=mock -mock_names=Interface=Processor . Interface
//go:generate stringer -type=ParentType -trimprefix ParentType

package processor
Expand All @@ -25,29 +24,26 @@ import (
"github.com/conduitio/conduit/pkg/record"
)

const (
TypeTransform Type = iota + 1
TypeFilter
)

const (
ParentTypeConnector ParentType = iota + 1
ParentTypePipeline
)

// Type defines the processor type.
type Type int

// ParentType defines the parent type of a processor.
type ParentType int

// Processor is the interface that represents a single message processor that
// Interface is the interface that represents a single message processor that
// can be executed on one record and manipulate it.
type Processor interface {
// Type returns the processor type.
Type() Type
// Execute runs the processor function on a record.
Execute(ctx context.Context, record record.Record) (record.Record, error)
type Interface interface {
// Process runs the processor function on a record.
Process(ctx context.Context, record record.Record) (record.Record, error)
}

// InterfaceFunc is an adapter allowing use of a function as an Interface.
type InterfaceFunc func(context.Context, record.Record) (record.Record, error)

func (p InterfaceFunc) Process(ctx context.Context, record record.Record) (record.Record, error) {
return p(ctx, record)
}

// Instance represents a processor instance.
Expand All @@ -60,7 +56,7 @@ type Instance struct {
Name string
Parent Parent
Config Config
Processor Processor
Processor Interface
}

// Parent represents the connection to the entity a processor is connected to.
Expand Down
Loading

0 comments on commit 20cd1e5

Please sign in to comment.