diff --git a/cmd/conduit/api/client.go b/cmd/conduit/api/client.go index 2b030a04b..66594e04d 100644 --- a/cmd/conduit/api/client.go +++ b/cmd/conduit/api/client.go @@ -25,10 +25,10 @@ import ( ) type Client struct { - conn *grpc.ClientConn - apiv1.PipelineServiceClient - apiv1.ProcessorServiceClient - apiv1.ConnectorServiceClient + conn *grpc.ClientConn + PipelineServiceClient PipelineService + ConnectorServiceClient ConnectorService + ProcessorServiceClient ProcessorService healthgrpc.HealthClient } diff --git a/cmd/conduit/api/mock/connector_service.go b/cmd/conduit/api/mock/connector_service.go new file mode 100644 index 000000000..32b1201b5 --- /dev/null +++ b/cmd/conduit/api/mock/connector_service.go @@ -0,0 +1,103 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit/cmd/conduit/api (interfaces: ConnectorService) +// +// Generated by this command: +// +// mockgen -destination=mock/connector_service.go -package=mock . ConnectorService +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + apiv1 "github.com/conduitio/conduit/proto/api/v1" + gomock "go.uber.org/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockConnectorService is a mock of ConnectorService interface. +type MockConnectorService struct { + ctrl *gomock.Controller + recorder *MockConnectorServiceMockRecorder + isgomock struct{} +} + +// MockConnectorServiceMockRecorder is the mock recorder for MockConnectorService. +type MockConnectorServiceMockRecorder struct { + mock *MockConnectorService +} + +// NewMockConnectorService creates a new mock instance. +func NewMockConnectorService(ctrl *gomock.Controller) *MockConnectorService { + mock := &MockConnectorService{ctrl: ctrl} + mock.recorder = &MockConnectorServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockConnectorService) EXPECT() *MockConnectorServiceMockRecorder { + return m.recorder +} + +// GetConnector mocks base method. +func (m *MockConnectorService) GetConnector(ctx context.Context, in *apiv1.GetConnectorRequest, opts ...grpc.CallOption) (*apiv1.GetConnectorResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetConnector", varargs...) + ret0, _ := ret[0].(*apiv1.GetConnectorResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetConnector indicates an expected call of GetConnector. +func (mr *MockConnectorServiceMockRecorder) GetConnector(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConnector", reflect.TypeOf((*MockConnectorService)(nil).GetConnector), varargs...) +} + +// ListConnectorPlugins mocks base method. +func (m *MockConnectorService) ListConnectorPlugins(ctx context.Context, in *apiv1.ListConnectorPluginsRequest, opts ...grpc.CallOption) (*apiv1.ListConnectorPluginsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListConnectorPlugins", varargs...) + ret0, _ := ret[0].(*apiv1.ListConnectorPluginsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListConnectorPlugins indicates an expected call of ListConnectorPlugins. +func (mr *MockConnectorServiceMockRecorder) ListConnectorPlugins(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListConnectorPlugins", reflect.TypeOf((*MockConnectorService)(nil).ListConnectorPlugins), varargs...) +} + +// ListConnectors mocks base method. +func (m *MockConnectorService) ListConnectors(ctx context.Context, in *apiv1.ListConnectorsRequest, opts ...grpc.CallOption) (*apiv1.ListConnectorsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListConnectors", varargs...) + ret0, _ := ret[0].(*apiv1.ListConnectorsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListConnectors indicates an expected call of ListConnectors. +func (mr *MockConnectorServiceMockRecorder) ListConnectors(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListConnectors", reflect.TypeOf((*MockConnectorService)(nil).ListConnectors), varargs...) +} diff --git a/cmd/conduit/api/mock/pipeline_service.go b/cmd/conduit/api/mock/pipeline_service.go new file mode 100644 index 000000000..f304dcd2c --- /dev/null +++ b/cmd/conduit/api/mock/pipeline_service.go @@ -0,0 +1,103 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit/cmd/conduit/api (interfaces: PipelineService) +// +// Generated by this command: +// +// mockgen -destination=mock/pipeline_service.go -package=mock . PipelineService +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + apiv1 "github.com/conduitio/conduit/proto/api/v1" + gomock "go.uber.org/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockPipelineService is a mock of PipelineService interface. +type MockPipelineService struct { + ctrl *gomock.Controller + recorder *MockPipelineServiceMockRecorder + isgomock struct{} +} + +// MockPipelineServiceMockRecorder is the mock recorder for MockPipelineService. +type MockPipelineServiceMockRecorder struct { + mock *MockPipelineService +} + +// NewMockPipelineService creates a new mock instance. +func NewMockPipelineService(ctrl *gomock.Controller) *MockPipelineService { + mock := &MockPipelineService{ctrl: ctrl} + mock.recorder = &MockPipelineServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPipelineService) EXPECT() *MockPipelineServiceMockRecorder { + return m.recorder +} + +// GetDLQ mocks base method. +func (m *MockPipelineService) GetDLQ(ctx context.Context, in *apiv1.GetDLQRequest, opts ...grpc.CallOption) (*apiv1.GetDLQResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetDLQ", varargs...) + ret0, _ := ret[0].(*apiv1.GetDLQResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetDLQ indicates an expected call of GetDLQ. +func (mr *MockPipelineServiceMockRecorder) GetDLQ(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetDLQ", reflect.TypeOf((*MockPipelineService)(nil).GetDLQ), varargs...) +} + +// GetPipeline mocks base method. +func (m *MockPipelineService) GetPipeline(ctx context.Context, in *apiv1.GetPipelineRequest, opts ...grpc.CallOption) (*apiv1.GetPipelineResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetPipeline", varargs...) + ret0, _ := ret[0].(*apiv1.GetPipelineResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPipeline indicates an expected call of GetPipeline. +func (mr *MockPipelineServiceMockRecorder) GetPipeline(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPipeline", reflect.TypeOf((*MockPipelineService)(nil).GetPipeline), varargs...) +} + +// ListPipelines mocks base method. +func (m *MockPipelineService) ListPipelines(ctx context.Context, in *apiv1.ListPipelinesRequest, opts ...grpc.CallOption) (*apiv1.ListPipelinesResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListPipelines", varargs...) + ret0, _ := ret[0].(*apiv1.ListPipelinesResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListPipelines indicates an expected call of ListPipelines. +func (mr *MockPipelineServiceMockRecorder) ListPipelines(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPipelines", reflect.TypeOf((*MockPipelineService)(nil).ListPipelines), varargs...) +} diff --git a/cmd/conduit/api/mock/processor_service.go b/cmd/conduit/api/mock/processor_service.go new file mode 100644 index 000000000..8390ea1c1 --- /dev/null +++ b/cmd/conduit/api/mock/processor_service.go @@ -0,0 +1,103 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/conduitio/conduit/cmd/conduit/api (interfaces: ProcessorService) +// +// Generated by this command: +// +// mockgen -destination=mock/processor_service.go -package=mock . ProcessorService +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + apiv1 "github.com/conduitio/conduit/proto/api/v1" + gomock "go.uber.org/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockProcessorService is a mock of ProcessorService interface. +type MockProcessorService struct { + ctrl *gomock.Controller + recorder *MockProcessorServiceMockRecorder + isgomock struct{} +} + +// MockProcessorServiceMockRecorder is the mock recorder for MockProcessorService. +type MockProcessorServiceMockRecorder struct { + mock *MockProcessorService +} + +// NewMockProcessorService creates a new mock instance. +func NewMockProcessorService(ctrl *gomock.Controller) *MockProcessorService { + mock := &MockProcessorService{ctrl: ctrl} + mock.recorder = &MockProcessorServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockProcessorService) EXPECT() *MockProcessorServiceMockRecorder { + return m.recorder +} + +// GetProcessor mocks base method. +func (m *MockProcessorService) GetProcessor(ctx context.Context, in *apiv1.GetProcessorRequest, opts ...grpc.CallOption) (*apiv1.GetProcessorResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetProcessor", varargs...) + ret0, _ := ret[0].(*apiv1.GetProcessorResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetProcessor indicates an expected call of GetProcessor. +func (mr *MockProcessorServiceMockRecorder) GetProcessor(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetProcessor", reflect.TypeOf((*MockProcessorService)(nil).GetProcessor), varargs...) +} + +// ListProcessorPlugins mocks base method. +func (m *MockProcessorService) ListProcessorPlugins(ctx context.Context, in *apiv1.ListProcessorPluginsRequest, opts ...grpc.CallOption) (*apiv1.ListProcessorPluginsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListProcessorPlugins", varargs...) + ret0, _ := ret[0].(*apiv1.ListProcessorPluginsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListProcessorPlugins indicates an expected call of ListProcessorPlugins. +func (mr *MockProcessorServiceMockRecorder) ListProcessorPlugins(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListProcessorPlugins", reflect.TypeOf((*MockProcessorService)(nil).ListProcessorPlugins), varargs...) +} + +// ListProcessors mocks base method. +func (m *MockProcessorService) ListProcessors(ctx context.Context, in *apiv1.ListProcessorsRequest, opts ...grpc.CallOption) (*apiv1.ListProcessorsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListProcessors", varargs...) + ret0, _ := ret[0].(*apiv1.ListProcessorsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListProcessors indicates an expected call of ListProcessors. +func (mr *MockProcessorServiceMockRecorder) ListProcessors(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListProcessors", reflect.TypeOf((*MockProcessorService)(nil).ListProcessors), varargs...) +} diff --git a/cmd/conduit/api/services.go b/cmd/conduit/api/services.go new file mode 100644 index 000000000..76cbe2e40 --- /dev/null +++ b/cmd/conduit/api/services.go @@ -0,0 +1,49 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "context" + + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "google.golang.org/grpc" +) + +//go:generate mockgen -destination=mock/connector_service.go -package=mock . ConnectorService + +// ConnectorService defines the methods of the ConnectorServiceClient that are currently used by the CLI. +type ConnectorService interface { + ListConnectors(ctx context.Context, in *apiv1.ListConnectorsRequest, opts ...grpc.CallOption) (*apiv1.ListConnectorsResponse, error) + GetConnector(ctx context.Context, in *apiv1.GetConnectorRequest, opts ...grpc.CallOption) (*apiv1.GetConnectorResponse, error) + ListConnectorPlugins(ctx context.Context, in *apiv1.ListConnectorPluginsRequest, opts ...grpc.CallOption) (*apiv1.ListConnectorPluginsResponse, error) +} + +//go:generate mockgen -destination=mock/pipeline_service.go -package=mock . PipelineService + +// PipelineService defines the methods of the PipelineServiceClient that are currently used by the CLI. +type PipelineService interface { + ListPipelines(ctx context.Context, in *apiv1.ListPipelinesRequest, opts ...grpc.CallOption) (*apiv1.ListPipelinesResponse, error) + GetPipeline(ctx context.Context, in *apiv1.GetPipelineRequest, opts ...grpc.CallOption) (*apiv1.GetPipelineResponse, error) + GetDLQ(ctx context.Context, in *apiv1.GetDLQRequest, opts ...grpc.CallOption) (*apiv1.GetDLQResponse, error) +} + +//go:generate mockgen -destination=mock/processor_service.go -package=mock . ProcessorService + +// ProcessorService defines the methods of the ProcessorServiceClient that are currently used by the CLI. +type ProcessorService interface { + ListProcessors(ctx context.Context, in *apiv1.ListProcessorsRequest, opts ...grpc.CallOption) (*apiv1.ListProcessorsResponse, error) + GetProcessor(ctx context.Context, in *apiv1.GetProcessorRequest, opts ...grpc.CallOption) (*apiv1.GetProcessorResponse, error) + ListProcessorPlugins(ctx context.Context, in *apiv1.ListProcessorPluginsRequest, opts ...grpc.CallOption) (*apiv1.ListProcessorPluginsResponse, error) +} diff --git a/cmd/conduit/internal/print_utils.go b/cmd/conduit/internal/display/utils.go similarity index 82% rename from cmd/conduit/internal/print_utils.go rename to cmd/conduit/internal/display/utils.go index be8214c58..bba65dbac 100644 --- a/cmd/conduit/internal/print_utils.go +++ b/cmd/conduit/internal/display/utils.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package display import ( "fmt" @@ -21,6 +21,7 @@ import ( "github.com/alexeyco/simpletable" configv1 "github.com/conduitio/conduit-commons/proto/config/v1" apiv1 "github.com/conduitio/conduit/proto/api/v1" + "github.com/conduitio/ecdysis" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -52,29 +53,35 @@ func IsEmpty(s string) bool { } // DisplayProcessors prints the processors in a human-readable format -func DisplayProcessors(processors []*apiv1.Processor, indent int) { +func DisplayProcessors(out ecdysis.Output, processors []*apiv1.Processor, indent int) { if len(processors) == 0 { return } - fmt.Printf("%sProcessors:\n", Indentation(indent)) + out.Stdout(fmt.Sprintf("%sProcessors:\n", Indentation(indent))) for _, p := range processors { - fmt.Printf("%s- ID: %s\n", Indentation(indent+1), p.Id) - fmt.Printf("%sPlugin: %s\n", Indentation(indent+2), p.Plugin) + out.Stdout(fmt.Sprintf("%s- ID: %s\n", Indentation(indent+1), p.Id)) + + if !IsEmpty(p.Plugin) { + out.Stdout(fmt.Sprintf("%sPlugin: %s\n", Indentation(indent+2), p.Plugin)) + } if !IsEmpty(p.Condition) { - fmt.Printf("%sCondition: %s\n", Indentation(indent+2), p.Condition) + out.Stdout(fmt.Sprintf("%sCondition: %s\n", Indentation(indent+2), p.Condition)) } - fmt.Printf("%sConfig:\n", Indentation(indent+2)) - for name, value := range p.Config.Settings { - fmt.Printf("%s%s: %s\n", Indentation(indent+3), name, value) + if p.Config != nil { + out.Stdout(fmt.Sprintf("%sConfig:\n", Indentation(indent+2))) + for name, value := range p.Config.Settings { + out.Stdout(fmt.Sprintf("%s%s: %s\n", Indentation(indent+3), name, value)) + } } - fmt.Printf("%sWorkers: %d\n", Indentation(indent+3), p.Config.Workers) - fmt.Printf("%sCreated At: %s\n", Indentation(indent+2), PrintTime(p.CreatedAt)) - fmt.Printf("%sUpdated At: %s\n", Indentation(indent+2), PrintTime(p.UpdatedAt)) + out.Stdout(fmt.Sprintf("%sWorkers: %d\n", Indentation(indent+3), p.Config.Workers)) + + out.Stdout(fmt.Sprintf("%sCreated At: %s\n", Indentation(indent+2), PrintTime(p.CreatedAt))) + out.Stdout(fmt.Sprintf("%sUpdated At: %s\n", Indentation(indent+2), PrintTime(p.UpdatedAt))) } } @@ -107,7 +114,7 @@ func FormatLongString(paragraph string, maxLineLength int) string { return result.String() } -func DisplayConfigParams(cfg map[string]*configv1.Parameter) { +func DisplayConfigParams(out ecdysis.Output, cfg map[string]*configv1.Parameter) { table := simpletable.New() table.Header = &simpletable.Header{ @@ -162,8 +169,7 @@ func DisplayConfigParams(cfg map[string]*configv1.Parameter) { table.Body.Cells = append(table.Body.Cells, r) } - table.SetStyle(simpletable.StyleDefault) - fmt.Println(table.String()) + out.Stdout(table.String() + "\n") } func formatType(input string) string { @@ -197,10 +203,10 @@ func formatValidations(v []*configv1.Validation) string { } // DisplayConnectorConfig prints the connector config in a human-readable format -func DisplayConnectorConfig(cfg *apiv1.Connector_Config, indentation int) { - fmt.Printf("%sConfig:\n", Indentation(indentation)) +func DisplayConnectorConfig(out ecdysis.Output, cfg *apiv1.Connector_Config, indentation int) { + out.Stdout(fmt.Sprintf("%sConfig:\n", Indentation(indentation))) for name, value := range cfg.Settings { - fmt.Printf("%s%s: %s\n", Indentation(indentation+1), name, value) + out.Stdout(fmt.Sprintf("%s%s: %s\n", Indentation(indentation+1), name, value)) } } diff --git a/cmd/conduit/internal/print_utils_test.go b/cmd/conduit/internal/display/utils_test.go similarity index 99% rename from cmd/conduit/internal/print_utils_test.go rename to cmd/conduit/internal/display/utils_test.go index f8cc1081b..9e70c30dc 100644 --- a/cmd/conduit/internal/print_utils_test.go +++ b/cmd/conduit/internal/display/utils_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package display import ( "testing" diff --git a/cmd/conduit/internal/testutils/datetime_helper.go b/cmd/conduit/internal/testutils/datetime_helper.go new file mode 100644 index 000000000..a36c32ab5 --- /dev/null +++ b/cmd/conduit/internal/testutils/datetime_helper.go @@ -0,0 +1,27 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "time" + + "google.golang.org/protobuf/types/known/timestamppb" +) + +// GetDateTime returns a sample timestamppb.Timestamp that can be used by API mocks. +func GetDateTime() *timestamppb.Timestamp { + parsedTime, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:00Z") + return timestamppb.New(parsedTime) +} diff --git a/cmd/conduit/internal/testutils/mock_helpers.go b/cmd/conduit/internal/testutils/mock_helpers.go new file mode 100644 index 000000000..e18bcf691 --- /dev/null +++ b/cmd/conduit/internal/testutils/mock_helpers.go @@ -0,0 +1,136 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "github.com/conduitio/conduit/cmd/conduit/api/mock" + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "go.uber.org/mock/gomock" +) + +// PipelineService -------------------------------------------- + +func MockGetPipeline(mockService *mock.MockPipelineService, pipelineID string, connectorIds, processorIds []string) { + mockService.EXPECT().GetPipeline(gomock.Any(), &apiv1.GetPipelineRequest{ + Id: pipelineID, + }).Return(&apiv1.GetPipelineResponse{ + Pipeline: &apiv1.Pipeline{ + Id: pipelineID, + State: &apiv1.Pipeline_State{Status: apiv1.Pipeline_STATUS_RUNNING}, + Config: &apiv1.Pipeline_Config{ + Name: "Test Pipeline", + Description: "A test pipeline description", + }, + ConnectorIds: connectorIds, + ProcessorIds: processorIds, + CreatedAt: GetDateTime(), + UpdatedAt: GetDateTime(), + }, + }, nil).Times(1) +} + +func MockGetDLQ(mockService *mock.MockPipelineService, pipelineID, plugin string) { + mockService.EXPECT().GetDLQ(gomock.Any(), &apiv1.GetDLQRequest{ + Id: pipelineID, + }).Return(&apiv1.GetDLQResponse{ + Dlq: &apiv1.Pipeline_DLQ{Plugin: plugin}, + }, nil).Times(1) +} + +func MockGetPipelines(mockService *mock.MockPipelineService, pipelines []*apiv1.Pipeline) { + mockService.EXPECT().ListPipelines(gomock.Any(), gomock.Any()).Return(&apiv1.ListPipelinesResponse{ + Pipelines: pipelines, + }, nil).Times(1) +} + +// ProcessorService -------------------------------------------- + +func MockGetProcessor( + mockService *mock.MockProcessorService, + processorID, plugin, condition string, + parent *apiv1.Processor_Parent, + settings map[string]string, +) { + mockService.EXPECT().GetProcessor(gomock.Any(), &apiv1.GetProcessorRequest{ + Id: processorID, + }).Return(&apiv1.GetProcessorResponse{ + Processor: &apiv1.Processor{ + Id: processorID, + Plugin: plugin, + Config: &apiv1.Processor_Config{ + Settings: settings, + }, + Parent: parent, + Condition: condition, + }, + }, nil).Times(1) +} + +func MockGetProcessors(mockService *mock.MockProcessorService, processors []*apiv1.Processor) { + mockService.EXPECT().ListProcessors(gomock.Any(), gomock.Any()).Return(&apiv1.ListProcessorsResponse{ + Processors: processors, + }, nil).Times(1) +} + +func MockGetProcessorPlugins(mockservice *mock.MockProcessorService, name string, plugins []*apiv1.ProcessorPluginSpecifications) { + mockservice.EXPECT().ListProcessorPlugins(gomock.Any(), &apiv1.ListProcessorPluginsRequest{ + Name: name, + }).Return(&apiv1.ListProcessorPluginsResponse{ + Plugins: plugins, + }, nil).Times(1) +} + +// ConnectorService -------------------------------------------- + +func MockGetConnectors(mockService *mock.MockConnectorService, pipelineID string, connectors []*apiv1.Connector) { + mockService.EXPECT().ListConnectors(gomock.Any(), &apiv1.ListConnectorsRequest{ + PipelineId: pipelineID, + }).Return(&apiv1.ListConnectorsResponse{ + Connectors: connectors, + }, nil).Times(1) +} + +func MockGetConnector( + mockService *mock.MockConnectorService, + connectorID, plugin, pipelineID string, + conType apiv1.Connector_Type, + config *apiv1.Connector_Config, + processorIds []string, +) { + mockService.EXPECT().GetConnector(gomock.Any(), &apiv1.GetConnectorRequest{ + Id: connectorID, + }).Return(&apiv1.GetConnectorResponse{ + Connector: &apiv1.Connector{ + Id: connectorID, + Type: conType, + Plugin: plugin, + PipelineId: pipelineID, + Config: config, + ProcessorIds: processorIds, + }, + }, nil).Times(1) +} + +func MockGetConnectorPlugins( + mockService *mock.MockConnectorService, + name string, + plugins []*apiv1.ConnectorPluginSpecifications, +) { + mockService.EXPECT().ListConnectorPlugins(gomock.Any(), &apiv1.ListConnectorPluginsRequest{ + Name: name, + }).Return(&apiv1.ListConnectorPluginsResponse{ + Plugins: plugins, + }, nil).Times(1) +} diff --git a/cmd/conduit/root/connectorplugins/connector_plugins.go b/cmd/conduit/root/connectorplugins/connector_plugins.go index 6e960bb25..c75e4f005 100644 --- a/cmd/conduit/root/connectorplugins/connector_plugins.go +++ b/cmd/conduit/root/connectorplugins/connector_plugins.go @@ -26,7 +26,9 @@ var ( type ConnectorPluginsCommand struct{} -func (c *ConnectorPluginsCommand) Aliases() []string { return []string{"connector-plugin"} } +func (c *ConnectorPluginsCommand) Aliases() []string { + return []string{"connector-plugin", "connectorsplugins", "connectorplugins", "connectorplugin"} +} func (c *ConnectorPluginsCommand) SubCommands() []ecdysis.Command { return []ecdysis.Command{ diff --git a/cmd/conduit/root/connectorplugins/describe.go b/cmd/conduit/root/connectorplugins/describe.go index 0cd0dc8ae..f0b2c1273 100644 --- a/cmd/conduit/root/connectorplugins/describe.go +++ b/cmd/conduit/root/connectorplugins/describe.go @@ -18,9 +18,10 @@ import ( "context" "fmt" + "github.com/conduitio/conduit/cmd/conduit/internal/display" + "github.com/conduitio/conduit/cmd/conduit/api" "github.com/conduitio/conduit/cmd/conduit/cecdysis" - "github.com/conduitio/conduit/cmd/conduit/internal" "github.com/conduitio/conduit/pkg/foundation/cerrors" apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" @@ -31,14 +32,20 @@ var ( _ ecdysis.CommandWithAliases = (*DescribeCommand)(nil) _ ecdysis.CommandWithDocs = (*DescribeCommand)(nil) _ ecdysis.CommandWithArgs = (*DescribeCommand)(nil) + _ ecdysis.CommandWithOutput = (*DescribeCommand)(nil) ) type DescribeArgs struct { - ConnectorPluginID string + connectorPluginID string } type DescribeCommand struct { - args DescribeArgs + args DescribeArgs + output ecdysis.Output +} + +func (c *DescribeCommand) Output(output ecdysis.Output) { + c.output = output } func (c *DescribeCommand) Usage() string { return "describe" } @@ -64,13 +71,13 @@ func (c *DescribeCommand) Args(args []string) error { return cerrors.Errorf("too many arguments") } - c.args.ConnectorPluginID = args[0] + c.args.connectorPluginID = args[0] return nil } func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error { resp, err := client.ConnectorServiceClient.ListConnectorPlugins(ctx, &apiv1.ListConnectorPluginsRequest{ - Name: c.args.ConnectorPluginID, + Name: c.args.connectorPluginID, }) if err != nil { return fmt.Errorf("failed to list connector plguin: %w", err) @@ -80,33 +87,33 @@ func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Cli return nil } - displayConnectorPluginsDescription(resp.Plugins[0]) + displayConnectorPluginsDescription(c.output, resp.Plugins[0]) return nil } -func displayConnectorPluginsDescription(c *apiv1.ConnectorPluginSpecifications) { - if !internal.IsEmpty(c.Name) { - fmt.Printf("Name: %s\n", c.Name) +func displayConnectorPluginsDescription(out ecdysis.Output, c *apiv1.ConnectorPluginSpecifications) { + if !display.IsEmpty(c.Name) { + out.Stdout(fmt.Sprintf("Name: %s\n", c.Name)) } - if !internal.IsEmpty(c.Summary) { - fmt.Printf("Summary: %s\n", c.Summary) + if !display.IsEmpty(c.Summary) { + out.Stdout(fmt.Sprintf("Summary: %s\n", c.Summary)) } - if !internal.IsEmpty(c.Description) { - fmt.Printf("Description: %s\n", c.Description) + if !display.IsEmpty(c.Description) { + out.Stdout(fmt.Sprintf("Description: %s\n", c.Description)) } - if !internal.IsEmpty(c.Author) { - fmt.Printf("Author: %s\n", c.Author) + if !display.IsEmpty(c.Author) { + out.Stdout(fmt.Sprintf("Author: %s\n", c.Author)) } - if !internal.IsEmpty(c.Version) { - fmt.Printf("Version: %s\n", c.Version) + if !display.IsEmpty(c.Version) { + out.Stdout(fmt.Sprintf("Version: %s\n", c.Version)) } if len(c.SourceParams) > 0 { - fmt.Printf("\nSource Parameters:\n") - internal.DisplayConfigParams(c.SourceParams) + out.Stdout("\nSource Parameters:\n") + display.DisplayConfigParams(out, c.SourceParams) } if len(c.DestinationParams) > 0 { - fmt.Printf("\nDestination Parameters:\n") - internal.DisplayConfigParams(c.DestinationParams) + out.Stdout("\nDestination Parameters:\n") + display.DisplayConfigParams(out, c.DestinationParams) } } diff --git a/cmd/conduit/root/connectorplugins/describe_test.go b/cmd/conduit/root/connectorplugins/describe_test.go index cc90aeb9a..926839a71 100644 --- a/cmd/conduit/root/connectorplugins/describe_test.go +++ b/cmd/conduit/root/connectorplugins/describe_test.go @@ -15,9 +15,18 @@ package connectorplugins import ( + "bytes" + "context" "testing" + configv1 "github.com/conduitio/conduit-commons/proto/config/v1" + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "github.com/conduitio/ecdysis" "github.com/matryer/is" + "go.uber.org/mock/gomock" ) func TestDescribeExecutionNoArgs(t *testing.T) { @@ -52,5 +61,84 @@ func TestDescribeExecutionCorrectArgs(t *testing.T) { err := c.Args([]string{connectorPluginID}) is.NoErr(err) - is.Equal(c.args.ConnectorPluginID, connectorPluginID) + is.Equal(c.args.connectorPluginID, connectorPluginID) +} + +func TestDescribeCommand_ExecuteWithClient(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + cmd := &DescribeCommand{args: DescribeArgs{connectorPluginID: "builtin:kafka@v0.11.1"}} + cmd.Output(out) + + mockConnectorService := mock.NewMockConnectorService(ctrl) + + testutils.MockGetConnectorPlugins( + mockConnectorService, + cmd.args.connectorPluginID, + []*apiv1.ConnectorPluginSpecifications{ + { + Name: cmd.args.connectorPluginID, + Summary: "A Kafka source and destination plugin for Conduit.", + Description: "A Kafka source and destination plugin for Conduit, written in Go.", + Author: "Meroxa, Inc.", + Version: "v0.11.1", + SourceParams: map[string]*configv1.Parameter{ + "sdk.schema.extract.type": { + Type: configv1.Parameter_Type(apiv1.PluginSpecifications_Parameter_TYPE_STRING), + Description: "The type of the payload schema.", + Default: "avro", + Validations: []*configv1.Validation{ + {Type: configv1.Validation_TYPE_INCLUSION}, + }, + }, + }, + DestinationParams: map[string]*configv1.Parameter{ + "sdk.record.format": { + Type: configv1.Parameter_Type(apiv1.PluginSpecifications_Parameter_TYPE_UNSPECIFIED), + Description: "The format of the output record.", + Default: "opencdc/json", + Validations: []*configv1.Validation{ + {Type: configv1.Validation_TYPE_INCLUSION}, + }, + }, + }, + }, + }) + + client := &api.Client{ + ConnectorServiceClient: mockConnectorService, + } + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "Name: builtin:kafka@v0.11.1\n"+ + "Summary: A Kafka source and destination plugin for Conduit.\n"+ + "Description: A Kafka source and destination plugin for Conduit, written in Go.\n"+ + "Author: Meroxa, Inc.\n"+ + "Version: v0.11.1\n"+ + "\n"+ + "Source Parameters:\n"+ + "+-------------------------+--------+---------------------------------+---------+-------------+\n"+ + "| NAME | TYPE | DESCRIPTION | DEFAULT | VALIDATIONS |\n"+ + "+-------------------------+--------+---------------------------------+---------+-------------+\n"+ + "| sdk.schema.extract.type | string | The type of the payload schema. | avro | [inclusion] |\n"+ + "+-------------------------+--------+---------------------------------+---------+-------------+\n"+ + "\n"+ + "Destination Parameters:\n"+ + "+-------------------+-------------+----------------------------------+--------------+-------------+\n"+ + "| NAME | TYPE | DESCRIPTION | DEFAULT | VALIDATIONS |\n"+ + "+-------------------+-------------+----------------------------------+--------------+-------------+\n"+ + "| sdk.record.format | unspecified | The format of the output record. | opencdc/json | [inclusion] |\n"+ + "+-------------------+-------------+----------------------------------+--------------+-------------+\n") } diff --git a/cmd/conduit/root/connectorplugins/list.go b/cmd/conduit/root/connectorplugins/list.go index c6eadd863..b3a714d7e 100644 --- a/cmd/conduit/root/connectorplugins/list.go +++ b/cmd/conduit/root/connectorplugins/list.go @@ -31,6 +31,7 @@ var ( _ ecdysis.CommandWithAliases = (*ListCommand)(nil) _ ecdysis.CommandWithDocs = (*ListCommand)(nil) _ ecdysis.CommandWithFlags = (*ListCommand)(nil) + _ ecdysis.CommandWithOutput = (*ListCommand)(nil) ) type ListFlags struct { @@ -38,7 +39,12 @@ type ListFlags struct { } type ListCommand struct { - flags ListFlags + flags ListFlags + output ecdysis.Output +} + +func (c *ListCommand) Output(output ecdysis.Output) { + c.output = output } func (c *ListCommand) Flags() []ecdysis.Flag { @@ -71,14 +77,14 @@ func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) return resp.Plugins[i].Name < resp.Plugins[j].Name }) - displayConnectorPlugins(resp.Plugins) + c.output.Stdout(getConnectorPluginsTable(resp.Plugins) + "\n") return nil } -func displayConnectorPlugins(connectorPlugins []*apiv1.ConnectorPluginSpecifications) { +func getConnectorPluginsTable(connectorPlugins []*apiv1.ConnectorPluginSpecifications) string { if len(connectorPlugins) == 0 { - return + return "" } table := simpletable.New() @@ -98,6 +104,5 @@ func displayConnectorPlugins(connectorPlugins []*apiv1.ConnectorPluginSpecificat table.Body.Cells = append(table.Body.Cells, r) } - table.SetStyle(simpletable.StyleDefault) - fmt.Println(table.String()) + return table.String() } diff --git a/cmd/conduit/root/connectorplugins/list_test.go b/cmd/conduit/root/connectorplugins/list_test.go index 4580d7bd9..fb567bedc 100644 --- a/cmd/conduit/root/connectorplugins/list_test.go +++ b/cmd/conduit/root/connectorplugins/list_test.go @@ -15,11 +15,19 @@ package connectorplugins import ( + "bytes" + "context" + "strings" "testing" + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" "github.com/matryer/is" "github.com/spf13/pflag" + "go.uber.org/mock/gomock" ) func TestListCommandFlags(t *testing.T) { @@ -54,3 +62,133 @@ func TestListCommandFlags(t *testing.T) { is.Equal(cf.Usage, f.usage) } } + +func TestListCommandExecuteWithClient_WithFlags(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + cmd := &ListCommand{ + flags: ListFlags{ + Name: "builtin", + }, + } + cmd.Output(out) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockConnectorService(ctrl) + + testutils.MockGetConnectorPlugins( + mockService, + ".*builtin.*", + []*apiv1.ConnectorPluginSpecifications{ + { + Name: "builtin:file@v0.9.0", + Summary: "A file source and destination plugin for Conduit.", + }, + { + Name: "builtin:kafka@v0.11.1", + Summary: "A Kafka source and destination plugin for Conduit, written in Go.", + }, + }) + + client := &api.Client{ConnectorServiceClient: mockService} + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "+-----------------------+-------------------------------------------------------------------+\n"+ + "| NAME | SUMMARY |\n"+ + "+-----------------------+-------------------------------------------------------------------+\n"+ + "| builtin:file@v0.9.0 | A file source and destination plugin for Conduit. |\n"+ + "| builtin:kafka@v0.11.1 | A Kafka source and destination plugin for Conduit, written in Go. |\n"+ + "+-----------------------+-------------------------------------------------------------------+\n") +} + +func TestListCommandExecuteWithClient_NoFlags(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + cmd := &ListCommand{ + flags: ListFlags{}, + } + cmd.Output(out) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockConnectorService(ctrl) + + testutils.MockGetConnectorPlugins( + mockService, + ".*.*", + []*apiv1.ConnectorPluginSpecifications{ + { + Name: "builtin:file@v0.9.0", + Summary: "A file source and destination plugin for Conduit.", + }, + { + Name: "builtin:kafka@v0.11.1", + Summary: "A Kafka source and destination plugin for Conduit, written in Go.", + }, + { + Name: "standalone:chaos@v0.1.1", + Summary: "A chaos destination connector", + }, + }) + + client := &api.Client{ConnectorServiceClient: mockService} + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "+-------------------------+-------------------------------------------------------------------+\n"+ + "| NAME | SUMMARY |\n"+ + "+-------------------------+-------------------------------------------------------------------+\n"+ + "| builtin:file@v0.9.0 | A file source and destination plugin for Conduit. |\n"+ + "| builtin:kafka@v0.11.1 | A Kafka source and destination plugin for Conduit, written in Go. |\n"+ + "| standalone:chaos@v0.1.1 | A chaos destination connector |\n"+ + "+-------------------------+-------------------------------------------------------------------+\n") +} + +func TestListCommandExecuteWithClient_EmptyResponse(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockConnectorService(ctrl) + + testutils.MockGetConnectorPlugins( + mockService, + ".*.*", + []*apiv1.ConnectorPluginSpecifications{}) + + client := &api.Client{ConnectorServiceClient: mockService} + + cmd := &ListCommand{} + cmd.Output(out) + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := strings.TrimSpace(buf.String()) + is.True(len(output) == 0) +} diff --git a/cmd/conduit/root/connectors/describe.go b/cmd/conduit/root/connectors/describe.go index 395696b28..0d2be418f 100644 --- a/cmd/conduit/root/connectors/describe.go +++ b/cmd/conduit/root/connectors/describe.go @@ -18,9 +18,10 @@ import ( "context" "fmt" + "github.com/conduitio/conduit/cmd/conduit/internal/display" + "github.com/conduitio/conduit/cmd/conduit/api" "github.com/conduitio/conduit/cmd/conduit/cecdysis" - "github.com/conduitio/conduit/cmd/conduit/internal" "github.com/conduitio/conduit/pkg/foundation/cerrors" apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" @@ -31,6 +32,7 @@ var ( _ ecdysis.CommandWithAliases = (*DescribeCommand)(nil) _ ecdysis.CommandWithDocs = (*DescribeCommand)(nil) _ ecdysis.CommandWithArgs = (*DescribeCommand)(nil) + _ ecdysis.CommandWithOutput = (*DescribeCommand)(nil) ) type DescribeArgs struct { @@ -38,7 +40,12 @@ type DescribeArgs struct { } type DescribeCommand struct { - args DescribeArgs + args DescribeArgs + output ecdysis.Output +} + +func (c *DescribeCommand) Output(output ecdysis.Output) { + c.output = output } func (c *DescribeCommand) Usage() string { return "describe" } @@ -88,25 +95,25 @@ func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Cli processors = append(processors, processor.Processor) } - displayConnector(resp.Connector, processors) + displayConnector(c.output, resp.Connector, processors) return nil } -func displayConnector(connector *apiv1.Connector, processors []*apiv1.Processor) { +func displayConnector(out ecdysis.Output, connector *apiv1.Connector, processors []*apiv1.Processor) { if connector == nil { return } - fmt.Printf("ID: %s\n", connector.Id) + out.Stdout(fmt.Sprintf("ID: %s\n", connector.Id)) - fmt.Printf("Type: %s\n", internal.ConnectorTypeToString(connector.Type)) - fmt.Printf("Plugin: %s\n", connector.Plugin) - fmt.Printf("Pipeline ID: %s\n", connector.PipelineId) + out.Stdout(fmt.Sprintf("Type: %s\n", display.ConnectorTypeToString(connector.Type))) + out.Stdout(fmt.Sprintf("Plugin: %s\n", connector.Plugin)) + out.Stdout(fmt.Sprintf("Pipeline ID: %s\n", connector.PipelineId)) - internal.DisplayConnectorConfig(connector.Config, 0) - fmt.Printf("Created At: %s\n", internal.PrintTime(connector.CreatedAt)) - fmt.Printf("Updated At: %s\n", internal.PrintTime(connector.UpdatedAt)) + display.DisplayConnectorConfig(out, connector.Config, 0) + out.Stdout(fmt.Sprintf("Created At: %s\n", display.PrintTime(connector.CreatedAt))) + out.Stdout(fmt.Sprintf("Updated At: %s\n", display.PrintTime(connector.UpdatedAt))) - internal.DisplayProcessors(processors, 0) + display.DisplayProcessors(out, processors, 0) } diff --git a/cmd/conduit/root/connectors/describe_test.go b/cmd/conduit/root/connectors/describe_test.go index 1bfb8d98d..df5a7b6c6 100644 --- a/cmd/conduit/root/connectors/describe_test.go +++ b/cmd/conduit/root/connectors/describe_test.go @@ -15,9 +15,17 @@ package connectors import ( + "bytes" + "context" "testing" + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "github.com/conduitio/ecdysis" "github.com/matryer/is" + "go.uber.org/mock/gomock" ) func TestDescribeExecutionNoArgs(t *testing.T) { @@ -54,3 +62,59 @@ func TestDescribeExecutionCorrectArgs(t *testing.T) { is.NoErr(err) is.Equal(c.args.ConnectorID, connectorID) } + +func TestDescribeCommand_ExecuteWithClient(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + cmd := &DescribeCommand{args: DescribeArgs{ConnectorID: "1"}} + cmd.Output(out) + + mockConnectorService := mock.NewMockConnectorService(ctrl) + testutils.MockGetConnector( + mockConnectorService, cmd.args.ConnectorID, "plugin1", "my-pipeline", apiv1.Connector_TYPE_SOURCE, + &apiv1.Connector_Config{ + Name: "Test Pipeline", + Settings: map[string]string{ + "foo": "bar", + }, + }, []string{"proc3"}) + + mockProcessorService := mock.NewMockProcessorService(ctrl) + testutils.MockGetProcessor( + mockProcessorService, "proc3", "custom.javascript", "", + nil, map[string]string{}) + + client := &api.Client{ + ProcessorServiceClient: mockProcessorService, + ConnectorServiceClient: mockConnectorService, + } + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "ID: 1\n"+ + "Type: source\n"+ + "Plugin: plugin1\n"+ + "Pipeline ID: my-pipeline\n"+ + "Config:\n"+ + " foo: bar\n"+ + "Created At: 1970-01-01T00:00:00Z\n"+ + "Updated At: 1970-01-01T00:00:00Z\n"+ + "Processors:\n"+ + " - ID: proc3\n"+ + " Plugin: custom.javascript\n"+ + " Config:\n"+ + " Workers: 0\n"+ + " Created At: 1970-01-01T00:00:00Z\n"+ + " Updated At: 1970-01-01T00:00:00Z\n") +} diff --git a/cmd/conduit/root/connectors/list.go b/cmd/conduit/root/connectors/list.go index e51ed4cb9..c7f66b2d1 100644 --- a/cmd/conduit/root/connectors/list.go +++ b/cmd/conduit/root/connectors/list.go @@ -22,7 +22,7 @@ import ( "github.com/alexeyco/simpletable" "github.com/conduitio/conduit/cmd/conduit/api" "github.com/conduitio/conduit/cmd/conduit/cecdysis" - "github.com/conduitio/conduit/cmd/conduit/internal" + "github.com/conduitio/conduit/cmd/conduit/internal/display" apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" ) @@ -32,6 +32,7 @@ var ( _ ecdysis.CommandWithAliases = (*ListCommand)(nil) _ ecdysis.CommandWithDocs = (*ListCommand)(nil) _ ecdysis.CommandWithFlags = (*ListCommand)(nil) + _ ecdysis.CommandWithOutput = (*ListCommand)(nil) ) type ListFlags struct { @@ -39,7 +40,12 @@ type ListFlags struct { } type ListCommand struct { - flags ListFlags + flags ListFlags + output ecdysis.Output +} + +func (c *ListCommand) Output(output ecdysis.Output) { + c.output = output } func (c *ListCommand) Flags() []ecdysis.Flag { @@ -71,14 +77,14 @@ func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) return resp.Connectors[i].Id < resp.Connectors[j].Id }) - displayConnectors(resp.Connectors) + c.output.Stdout(getConnectorsTable(resp.Connectors) + "\n") return nil } -func displayConnectors(connectors []*apiv1.Connector) { +func getConnectorsTable(connectors []*apiv1.Connector) string { if len(connectors) == 0 { - return + return "" } table := simpletable.New() @@ -98,14 +104,12 @@ func displayConnectors(connectors []*apiv1.Connector) { r := []*simpletable.Cell{ {Align: simpletable.AlignLeft, Text: c.Id}, {Align: simpletable.AlignLeft, Text: c.Plugin}, - {Align: simpletable.AlignLeft, Text: internal.ConnectorTypeToString(c.Type)}, + {Align: simpletable.AlignLeft, Text: display.ConnectorTypeToString(c.Type)}, {Align: simpletable.AlignLeft, Text: c.PipelineId}, - {Align: simpletable.AlignLeft, Text: internal.PrintTime(c.CreatedAt)}, - {Align: simpletable.AlignLeft, Text: internal.PrintTime(c.UpdatedAt)}, + {Align: simpletable.AlignLeft, Text: display.PrintTime(c.CreatedAt)}, + {Align: simpletable.AlignLeft, Text: display.PrintTime(c.UpdatedAt)}, } - table.Body.Cells = append(table.Body.Cells, r) } - table.SetStyle(simpletable.StyleDefault) - fmt.Println(table.String()) + return table.String() } diff --git a/cmd/conduit/root/connectors/list_test.go b/cmd/conduit/root/connectors/list_test.go index 1659cb710..d25d00830 100644 --- a/cmd/conduit/root/connectors/list_test.go +++ b/cmd/conduit/root/connectors/list_test.go @@ -15,11 +15,19 @@ package connectors import ( + "bytes" + "context" + "strings" "testing" + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" "github.com/matryer/is" "github.com/spf13/pflag" + "go.uber.org/mock/gomock" ) func TestConnectorsListCommandFlags(t *testing.T) { @@ -54,3 +62,130 @@ func TestConnectorsListCommandFlags(t *testing.T) { is.Equal(cf.Usage, f.usage) } } + +func TestListCommandExecuteWithClient_NoFlags(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockConnectorService(ctrl) + + connectors := []*apiv1.Connector{ + { + Id: "conn1", + Type: apiv1.Connector_TYPE_SOURCE, + Plugin: "plugin1", + ProcessorIds: []string{"proc3"}, + PipelineId: "pipeline1", + CreatedAt: testutils.GetDateTime(), + UpdatedAt: testutils.GetDateTime(), + }, + { + Id: "conn2", + Type: apiv1.Connector_TYPE_DESTINATION, + Plugin: "plugin2", + PipelineId: "pipeline2", + CreatedAt: testutils.GetDateTime(), + UpdatedAt: testutils.GetDateTime(), + }, + } + + testutils.MockGetConnectors(mockService, "", connectors) + + client := &api.Client{ + ConnectorServiceClient: mockService, + } + + cmd := &ListCommand{} + cmd.Output(out) + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + is.Equal(output, ""+ + "+-------+---------+-------------+-------------+----------------------+----------------------+\n"+ + "| ID | PLUGIN | TYPE | PIPELINE_ID | CREATED | LAST_UPDATED |\n"+ + "+-------+---------+-------------+-------------+----------------------+----------------------+\n"+ + "| conn1 | plugin1 | source | pipeline1 | 1970-01-01T00:00:00Z | 1970-01-01T00:00:00Z |\n"+ + "| conn2 | plugin2 | destination | pipeline2 | 1970-01-01T00:00:00Z | 1970-01-01T00:00:00Z |\n"+ + "+-------+---------+-------------+-------------+----------------------+----------------------+\n") +} + +func TestListCommandExecuteWithClient_WithFlags(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockConnectorService(ctrl) + + connectors := []*apiv1.Connector{ + { + Id: "conn1", + Type: apiv1.Connector_TYPE_SOURCE, + Plugin: "plugin1", + ProcessorIds: []string{"proc3"}, + PipelineId: "pipeline1", + CreatedAt: testutils.GetDateTime(), + UpdatedAt: testutils.GetDateTime(), + }, + } + + testutils.MockGetConnectors(mockService, "pipeline1", connectors) + + client := &api.Client{ + ConnectorServiceClient: mockService, + } + + cmd := &ListCommand{ + flags: ListFlags{PipelineID: "pipeline1"}, + } + cmd.Output(out) + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "+-------+---------+--------+-------------+----------------------+----------------------+\n"+ + "| ID | PLUGIN | TYPE | PIPELINE_ID | CREATED | LAST_UPDATED |\n"+ + "+-------+---------+--------+-------------+----------------------+----------------------+\n"+ + "| conn1 | plugin1 | source | pipeline1 | 1970-01-01T00:00:00Z | 1970-01-01T00:00:00Z |\n"+ + "+-------+---------+--------+-------------+----------------------+----------------------+\n") +} + +func TestListCommandExecuteWithClient_EmptyResponse(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockConnectorService(ctrl) + + testutils.MockGetConnectors(mockService, "", []*apiv1.Connector{}) + client := &api.Client{ConnectorServiceClient: mockService} + + cmd := &ListCommand{} + cmd.Output(out) + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := strings.TrimSpace(buf.String()) + is.True(len(output) == 0) +} diff --git a/cmd/conduit/root/pipelines/describe.go b/cmd/conduit/root/pipelines/describe.go index fbbf7eafa..dc6044e94 100644 --- a/cmd/conduit/root/pipelines/describe.go +++ b/cmd/conduit/root/pipelines/describe.go @@ -18,9 +18,10 @@ import ( "context" "fmt" + "github.com/conduitio/conduit/cmd/conduit/internal/display" + "github.com/conduitio/conduit/cmd/conduit/api" "github.com/conduitio/conduit/cmd/conduit/cecdysis" - "github.com/conduitio/conduit/cmd/conduit/internal" "github.com/conduitio/conduit/pkg/foundation/cerrors" apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" @@ -31,6 +32,7 @@ var ( _ ecdysis.CommandWithAliases = (*DescribeCommand)(nil) _ ecdysis.CommandWithDocs = (*DescribeCommand)(nil) _ ecdysis.CommandWithArgs = (*DescribeCommand)(nil) + _ ecdysis.CommandWithOutput = (*DescribeCommand)(nil) ) type DescribeArgs struct { @@ -38,7 +40,8 @@ type DescribeArgs struct { } type DescribeCommand struct { - args DescribeArgs + args DescribeArgs + output ecdysis.Output } func (c *DescribeCommand) Docs() ecdysis.Docs { @@ -51,6 +54,10 @@ by Conduit. You can list existing pipelines with the 'conduit pipelines list' co } } +func (c *DescribeCommand) Output(output ecdysis.Output) { + c.output = output +} + func (c *DescribeCommand) Aliases() []string { return []string{"desc"} } func (c *DescribeCommand) Usage() string { return "describe" } @@ -111,7 +118,7 @@ func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Cli } // store processors for each connector - connectorProcesors := make(map[string][]*apiv1.Processor, len(connectorsResp.Connectors)) + connectorProcessors := make(map[string][]*apiv1.Processor, len(connectorsResp.Connectors)) for _, conn := range connectorsResp.Connectors { var processors []*apiv1.Processor @@ -125,7 +132,7 @@ func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Cli } processors = append(processors, processor.Processor) } - connectorProcesors[conn.Id] = processors + connectorProcessors[conn.Id] = processors } dlq, err := client.PipelineServiceClient.GetDLQ(ctx, &apiv1.GetDLQRequest{ @@ -135,7 +142,7 @@ func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Cli return fmt.Errorf("failed to fetch DLQ for pipeline %s: %w", c.args.PipelineID, err) } - err = displayPipeline(pipelineResp.Pipeline, pipelineProcessors, connectors, connectorProcesors, dlq.Dlq) + err = displayPipeline(c.output, pipelineResp.Pipeline, pipelineProcessors, connectors, connectorProcessors, dlq.Dlq) if err != nil { return fmt.Errorf("failed to display pipeline %s: %w", c.args.PipelineID, err) } @@ -143,65 +150,65 @@ func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Cli return nil } -func displayPipeline(pipeline *apiv1.Pipeline, pipelineProcessors []*apiv1.Processor, connectors []*apiv1.Connector, connectorProcessors map[string][]*apiv1.Processor, dlq *apiv1.Pipeline_DLQ) error { +func displayPipeline(out ecdysis.Output, pipeline *apiv1.Pipeline, pipelineProcessors []*apiv1.Processor, connectors []*apiv1.Connector, connectorProcessors map[string][]*apiv1.Processor, dlq *apiv1.Pipeline_DLQ) error { // ID - fmt.Printf("ID: %s\n", pipeline.Id) + out.Stdout(fmt.Sprintf("ID: %s\n", pipeline.Id)) // State if pipeline.State != nil { - fmt.Printf("Status: %s\n", internal.PrintStatusFromProtoString(pipeline.State.Status.String())) + out.Stdout(fmt.Sprintf("Status: %s\n", display.PrintStatusFromProtoString(pipeline.State.Status.String()))) if pipeline.State.Error != "" { - fmt.Printf("Error: %s\n", pipeline.State.Error) + out.Stdout(fmt.Sprintf("Error: %s\n", pipeline.State.Error)) } } // Config if pipeline.Config != nil { - fmt.Printf("Name: %s\n", pipeline.Config.Name) + out.Stdout(fmt.Sprintf("Name: %s\n", pipeline.Config.Name)) // no new line after description, as it's always added // when parsed from the YAML config file - fmt.Printf("Description: %s\n", pipeline.Config.Description) + out.Stdout(fmt.Sprintf("Description: %s\n", pipeline.Config.Description)) } // Connectors - fmt.Println("Sources:") - displayConnectorsAndProcessors(connectors, connectorProcessors, apiv1.Connector_TYPE_SOURCE) + out.Stdout("Sources:\n") + displayConnectorsAndProcessors(out, connectors, connectorProcessors, apiv1.Connector_TYPE_SOURCE) - internal.DisplayProcessors(pipelineProcessors, 0) + display.DisplayProcessors(out, pipelineProcessors, 0) - fmt.Println("Destinations:") - displayConnectorsAndProcessors(connectors, connectorProcessors, apiv1.Connector_TYPE_DESTINATION) + out.Stdout("Destinations:\n") + displayConnectorsAndProcessors(out, connectors, connectorProcessors, apiv1.Connector_TYPE_DESTINATION) - displayDLQ(dlq) + displayDLQ(out, dlq) // Timestamps if pipeline.CreatedAt != nil { - fmt.Printf("Created At: %s\n", internal.PrintTime(pipeline.CreatedAt)) + out.Stdout(fmt.Sprintf("Created At: %s\n", display.PrintTime(pipeline.CreatedAt))) } if pipeline.UpdatedAt != nil { - fmt.Printf("Updated At: %s\n", internal.PrintTime(pipeline.UpdatedAt)) + out.Stdout(fmt.Sprintf("Updated At: %s\n", display.PrintTime(pipeline.UpdatedAt))) } return nil } -func displayDLQ(dlq *apiv1.Pipeline_DLQ) { - fmt.Println("Dead-letter queue:") - fmt.Printf("%sPlugin: %s\n", internal.Indentation(1), dlq.Plugin) +func displayDLQ(out ecdysis.Output, dlq *apiv1.Pipeline_DLQ) { + out.Stdout("Dead-letter queue:\n") + out.Stdout(fmt.Sprintf("%sPlugin: %s\n", display.Indentation(1), dlq.Plugin)) } -func displayConnectorsAndProcessors(connectors []*apiv1.Connector, connectorProcessors map[string][]*apiv1.Processor, connType apiv1.Connector_Type) { +func displayConnectorsAndProcessors(out ecdysis.Output, connectors []*apiv1.Connector, connectorProcessors map[string][]*apiv1.Processor, connType apiv1.Connector_Type) { for _, conn := range connectors { if conn.Type == connType { - fmt.Printf("%s- ID: %s\n", internal.Indentation(1), conn.Id) - fmt.Printf("%sPlugin: %s\n", internal.Indentation(2), conn.Plugin) - fmt.Printf("%sPipeline ID: %s\n", internal.Indentation(2), conn.PipelineId) - internal.DisplayConnectorConfig(conn.Config, 2) + out.Stdout(fmt.Sprintf("%s- ID: %s\n", display.Indentation(1), conn.Id)) + out.Stdout(fmt.Sprintf("%sPlugin: %s\n", display.Indentation(2), conn.Plugin)) + out.Stdout(fmt.Sprintf("%sPipeline ID: %s\n", display.Indentation(2), conn.PipelineId)) + display.DisplayConnectorConfig(out, conn.Config, 2) if processors, ok := connectorProcessors[conn.Id]; ok { - internal.DisplayProcessors(processors, 2) + display.DisplayProcessors(out, processors, 2) } - fmt.Printf("%sCreated At: %s\n", internal.Indentation(2), internal.PrintTime(conn.CreatedAt)) - fmt.Printf("%sUpdated At: %s\n", internal.Indentation(2), internal.PrintTime(conn.UpdatedAt)) + out.Stdout(fmt.Sprintf("%sCreated At: %s\n", display.Indentation(2), display.PrintTime(conn.CreatedAt))) + out.Stdout(fmt.Sprintf("%sUpdated At: %s\n", display.Indentation(2), display.PrintTime(conn.UpdatedAt))) } } } diff --git a/cmd/conduit/root/pipelines/describe_test.go b/cmd/conduit/root/pipelines/describe_test.go index a9da82570..a52ab9c33 100644 --- a/cmd/conduit/root/pipelines/describe_test.go +++ b/cmd/conduit/root/pipelines/describe_test.go @@ -15,9 +15,18 @@ package pipelines import ( + "bytes" + "context" "testing" + "github.com/conduitio/ecdysis" "github.com/matryer/is" + "go.uber.org/mock/gomock" + + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" ) func TestDescribeExecutionNoArgs(t *testing.T) { @@ -54,3 +63,126 @@ func TestDescribeExecutionCorrectArgs(t *testing.T) { is.NoErr(err) is.Equal(c.args.PipelineID, pipelineID) } + +func TestDescribeCommandExecuteWithClient(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + cmd := &DescribeCommand{args: DescribeArgs{PipelineID: "1"}} + cmd.Output(out) + + mockPipelineService := mock.NewMockPipelineService(ctrl) + mockProcessorService := mock.NewMockProcessorService(ctrl) + mockConnectorService := mock.NewMockConnectorService(ctrl) + + testutils.MockGetPipeline(mockPipelineService, cmd.args.PipelineID, []string{"conn1", "conn2"}, []string{"proc1", "proc2"}) + + testutils.MockGetProcessor( + mockProcessorService, + "proc1", "field.set", "", + nil, + map[string]string{ + ".Payload.After.department": "finance", + }) + + testutils.MockGetProcessor( + mockProcessorService, + "proc2", "custom.javascript", "", + nil, + map[string]string{}) + + testutils.MockGetConnectors(mockConnectorService, cmd.args.PipelineID, []*apiv1.Connector{ + {Id: "conn1", Type: apiv1.Connector_TYPE_SOURCE, Plugin: "plugin1", ProcessorIds: []string{"proc3"}}, + {Id: "conn2", Type: apiv1.Connector_TYPE_DESTINATION, Plugin: "plugin2"}, + }) + + testutils.MockGetConnector( + mockConnectorService, "conn1", "plugin1", cmd.args.PipelineID, apiv1.Connector_TYPE_SOURCE, + &apiv1.Connector_Config{ + Name: "Test Pipeline", + Settings: map[string]string{ + "foo": "bar", + }, + }, []string{"proc3"}) + + testutils.MockGetProcessor( + mockProcessorService, + "proc3", "custom.javascript", "", + nil, + map[string]string{}) + + testutils.MockGetConnector( + mockConnectorService, "conn2", "plugin2", cmd.args.PipelineID, apiv1.Connector_TYPE_DESTINATION, + &apiv1.Connector_Config{ + Name: "Test Pipeline", + Settings: map[string]string{ + "foo": "bar", + }, + }, []string{"proc3"}) + + testutils.MockGetDLQ(mockPipelineService, cmd.args.PipelineID, "dlq-plugin") + + client := &api.Client{ + PipelineServiceClient: mockPipelineService, + ProcessorServiceClient: mockProcessorService, + ConnectorServiceClient: mockConnectorService, + } + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "ID: 1\n"+ + "Status: running\n"+ + "Name: Test Pipeline\n"+ + "Description: A test pipeline description\n"+ + "Sources:\n"+ + " - ID: conn1\n"+ + " Plugin: plugin1\n"+ + " Pipeline ID: 1\n"+ + " Config:\n"+ + " foo: bar\n"+ + " Processors:\n"+ + " - ID: proc3\n"+ + " Plugin: custom.javascript\n"+ + " Config:\n"+ + " Workers: 0\n"+ + " Created At: 1970-01-01T00:00:00Z\n"+ + " Updated At: 1970-01-01T00:00:00Z\n"+ + " Created At: 1970-01-01T00:00:00Z\n"+ + " Updated At: 1970-01-01T00:00:00Z\n"+ + "Processors:\n"+ + " - ID: proc1\n"+ + " Plugin: field.set\n"+ + " Config:\n"+ + " .Payload.After.department: finance\n"+ + " Workers: 0\n"+ + " Created At: 1970-01-01T00:00:00Z\n"+ + " Updated At: 1970-01-01T00:00:00Z\n"+ + " - ID: proc2\n"+ + " Plugin: custom.javascript\n"+ + " Config:\n"+ + " Workers: 0\n"+ + " Created At: 1970-01-01T00:00:00Z\n"+ + " Updated At: 1970-01-01T00:00:00Z\n"+ + "Destinations:\n"+ + " - ID: conn2\n"+ + " Plugin: plugin2\n"+ + " Pipeline ID: 1\n"+ + " Config:\n"+ + " foo: bar\n"+ + " Created At: 1970-01-01T00:00:00Z\n"+ + " Updated At: 1970-01-01T00:00:00Z\n"+ + "Dead-letter queue:\n"+ + " Plugin: dlq-plugin\n"+ + "Created At: 1970-01-01T00:00:00Z\n"+ + "Updated At: 1970-01-01T00:00:00Z\n") +} diff --git a/cmd/conduit/root/pipelines/list.go b/cmd/conduit/root/pipelines/list.go index 432e74899..b10ec9fc4 100644 --- a/cmd/conduit/root/pipelines/list.go +++ b/cmd/conduit/root/pipelines/list.go @@ -19,10 +19,11 @@ import ( "fmt" "sort" + "github.com/conduitio/conduit/cmd/conduit/internal/display" + "github.com/alexeyco/simpletable" "github.com/conduitio/conduit/cmd/conduit/api" "github.com/conduitio/conduit/cmd/conduit/cecdysis" - "github.com/conduitio/conduit/cmd/conduit/internal" apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" ) @@ -31,9 +32,12 @@ var ( _ cecdysis.CommandWithExecuteWithClient = (*ListCommand)(nil) _ ecdysis.CommandWithAliases = (*ListCommand)(nil) _ ecdysis.CommandWithDocs = (*ListCommand)(nil) + _ ecdysis.CommandWithOutput = (*ListCommand)(nil) ) -type ListCommand struct{} +type ListCommand struct { + output ecdysis.Output +} func (c *ListCommand) Docs() ecdysis.Docs { return ecdysis.Docs{ @@ -45,6 +49,10 @@ be configured via --pipelines.path at the time of running Conduit.`, } } +func (c *ListCommand) Output(output ecdysis.Output) { + c.output = output +} + func (c *ListCommand) Aliases() []string { return []string{"ls"} } func (c *ListCommand) Usage() string { return "list" } @@ -59,14 +67,14 @@ func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) return resp.Pipelines[i].Id < resp.Pipelines[j].Id }) - displayPipelines(resp.Pipelines) + c.output.Stdout(getPipelinesTable(resp.Pipelines) + "\n") return nil } -func displayPipelines(pipelines []*apiv1.Pipeline) { +func getPipelinesTable(pipelines []*apiv1.Pipeline) string { if len(pipelines) == 0 { - return + return "" } table := simpletable.New() @@ -83,13 +91,12 @@ func displayPipelines(pipelines []*apiv1.Pipeline) { for _, p := range pipelines { r := []*simpletable.Cell{ {Align: simpletable.AlignLeft, Text: p.Id}, - {Align: simpletable.AlignLeft, Text: internal.PrintStatusFromProtoString(p.State.Status.String())}, - {Align: simpletable.AlignLeft, Text: internal.PrintTime(p.CreatedAt)}, - {Align: simpletable.AlignLeft, Text: internal.PrintTime(p.UpdatedAt)}, + {Align: simpletable.AlignLeft, Text: display.PrintStatusFromProtoString(p.State.Status.String())}, + {Align: simpletable.AlignLeft, Text: display.PrintTime(p.CreatedAt)}, + {Align: simpletable.AlignLeft, Text: display.PrintTime(p.UpdatedAt)}, } table.Body.Cells = append(table.Body.Cells, r) } - table.SetStyle(simpletable.StyleDefault) - fmt.Println(table.String()) + return table.String() } diff --git a/cmd/conduit/root/pipelines/list_test.go b/cmd/conduit/root/pipelines/list_test.go new file mode 100644 index 000000000..b7cf965fc --- /dev/null +++ b/cmd/conduit/root/pipelines/list_test.go @@ -0,0 +1,100 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipelines + +import ( + "bytes" + "context" + "strings" + "testing" + + "github.com/conduitio/ecdysis" + "github.com/matryer/is" + "go.uber.org/mock/gomock" + + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" +) + +func TestListCommandExecuteWithClient(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockPipelineService(ctrl) + + testutils.MockGetPipelines(mockService, []*apiv1.Pipeline{ + {Id: "1", State: &apiv1.Pipeline_State{Status: apiv1.Pipeline_STATUS_RUNNING}}, + {Id: "2", State: &apiv1.Pipeline_State{Status: apiv1.Pipeline_STATUS_STOPPED}}, + {Id: "3", State: &apiv1.Pipeline_State{Status: apiv1.Pipeline_STATUS_RECOVERING}}, + {Id: "4", State: &apiv1.Pipeline_State{Status: apiv1.Pipeline_STATUS_DEGRADED}}, + }) + + client := &api.Client{ + PipelineServiceClient: mockService, + } + + cmd := &ListCommand{} + cmd.Output(out) + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "+----+------------+----------------------+----------------------+\n"+ + "| ID | STATE | CREATED | LAST_UPDATED |\n"+ + "+----+------------+----------------------+----------------------+\n"+ + "| 1 | running | 1970-01-01T00:00:00Z | 1970-01-01T00:00:00Z |\n"+ + "| 2 | stopped | 1970-01-01T00:00:00Z | 1970-01-01T00:00:00Z |\n"+ + "| 3 | recovering | 1970-01-01T00:00:00Z | 1970-01-01T00:00:00Z |\n"+ + "| 4 | degraded | 1970-01-01T00:00:00Z | 1970-01-01T00:00:00Z |\n"+ + "+----+------------+----------------------+----------------------+\n") +} + +func TestListCommandExecuteWithClient_EmptyResponse(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockPipelineService(ctrl) + + testutils.MockGetPipelines(mockService, []*apiv1.Pipeline{}) + client := &api.Client{ + PipelineServiceClient: mockService, + } + + cmd := &ListCommand{} + cmd.Output(out) + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := strings.TrimSpace(buf.String()) + is.True(len(output) == 0) +} diff --git a/cmd/conduit/root/processorplugins/describe.go b/cmd/conduit/root/processorplugins/describe.go index 130068ad6..eec54d6f9 100644 --- a/cmd/conduit/root/processorplugins/describe.go +++ b/cmd/conduit/root/processorplugins/describe.go @@ -18,9 +18,10 @@ import ( "context" "fmt" + "github.com/conduitio/conduit/cmd/conduit/internal/display" + "github.com/conduitio/conduit/cmd/conduit/api" "github.com/conduitio/conduit/cmd/conduit/cecdysis" - "github.com/conduitio/conduit/cmd/conduit/internal" "github.com/conduitio/conduit/pkg/foundation/cerrors" apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" @@ -31,14 +32,20 @@ var ( _ ecdysis.CommandWithAliases = (*DescribeCommand)(nil) _ ecdysis.CommandWithDocs = (*DescribeCommand)(nil) _ ecdysis.CommandWithArgs = (*DescribeCommand)(nil) + _ ecdysis.CommandWithOutput = (*DescribeCommand)(nil) ) type DescribeArgs struct { - processorPlugin string + processorPluginID string } type DescribeCommand struct { - args DescribeArgs + args DescribeArgs + output ecdysis.Output +} + +func (c *DescribeCommand) Output(output ecdysis.Output) { + c.output = output } func (c *DescribeCommand) Usage() string { return "describe" } @@ -57,20 +64,20 @@ func (c *DescribeCommand) Aliases() []string { return []string{"desc"} } func (c *DescribeCommand) Args(args []string) error { if len(args) == 0 { - return cerrors.Errorf("requires a processor plugin name") + return cerrors.Errorf("requires a processor plugin ID") } if len(args) > 1 { return cerrors.Errorf("too many arguments") } - c.args.processorPlugin = args[0] + c.args.processorPluginID = args[0] return nil } func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Client) error { resp, err := client.ProcessorServiceClient.ListProcessorPlugins(ctx, &apiv1.ListProcessorPluginsRequest{ - Name: c.args.processorPlugin, + Name: c.args.processorPluginID, }) if err != nil { return fmt.Errorf("failed to get processor plugin: %w", err) @@ -80,30 +87,30 @@ func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Cli return nil } - displayConnectorPluginsDescription(resp.Plugins[0]) + displayConnectorPluginsDescription(c.output, resp.Plugins[0]) return nil } -func displayConnectorPluginsDescription(p *apiv1.ProcessorPluginSpecifications) { - if !internal.IsEmpty(p.Name) { - fmt.Printf("Name: %s\n", p.Name) +func displayConnectorPluginsDescription(out ecdysis.Output, p *apiv1.ProcessorPluginSpecifications) { + if !display.IsEmpty(p.Name) { + out.Stdout(fmt.Sprintf("Name: %s\n", p.Name)) } - if !internal.IsEmpty(p.Summary) { - fmt.Printf("Summary: %s\n", p.Summary) + if !display.IsEmpty(p.Summary) { + out.Stdout(fmt.Sprintf("Summary: %s\n", p.Summary)) } - if !internal.IsEmpty(p.Description) { - fmt.Printf("Description: %s\n", p.Description) + if !display.IsEmpty(p.Description) { + out.Stdout(fmt.Sprintf("Description: %s\n", p.Description)) } - if !internal.IsEmpty(p.Author) { - fmt.Printf("Author: %s\n", p.Author) + if !display.IsEmpty(p.Author) { + out.Stdout(fmt.Sprintf("Author: %s\n", p.Author)) } - if !internal.IsEmpty(p.Version) { - fmt.Printf("Version: %s\n", p.Version) + if !display.IsEmpty(p.Version) { + out.Stdout(fmt.Sprintf("Version: %s\n", p.Version)) } if len(p.Parameters) > 0 { - fmt.Println("\nParameters:") - internal.DisplayConfigParams(p.Parameters) + out.Stdout("Parameters:\n") + display.DisplayConfigParams(out, p.Parameters) } } diff --git a/cmd/conduit/root/processorplugins/describe_test.go b/cmd/conduit/root/processorplugins/describe_test.go index 84fbd6007..a1594734b 100644 --- a/cmd/conduit/root/processorplugins/describe_test.go +++ b/cmd/conduit/root/processorplugins/describe_test.go @@ -15,9 +15,18 @@ package processorplugins import ( + "bytes" + "context" "testing" + configv1 "github.com/conduitio/conduit-commons/proto/config/v1" + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "github.com/conduitio/ecdysis" "github.com/matryer/is" + "go.uber.org/mock/gomock" ) func TestDescribeExecutionNoArgs(t *testing.T) { @@ -26,7 +35,7 @@ func TestDescribeExecutionNoArgs(t *testing.T) { c := DescribeCommand{} err := c.Args([]string{}) - expected := "requires a processor plugin name" + expected := "requires a processor plugin ID" is.True(err != nil) is.Equal(err.Error(), expected) @@ -52,5 +61,74 @@ func TestDescribeExecutionCorrectArgs(t *testing.T) { err := c.Args([]string{processorPluginID}) is.NoErr(err) - is.Equal(c.args.processorPlugin, processorPluginID) + is.Equal(c.args.processorPluginID, processorPluginID) +} + +func TestDescribeCommand_ExecuteWithClient(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + cmd := &DescribeCommand{args: DescribeArgs{processorPluginID: "builtin:base64.encode@v0.1.0"}} + cmd.Output(out) + + mockProcessorService := mock.NewMockProcessorService(ctrl) + + testutils.MockGetProcessorPlugins( + mockProcessorService, + cmd.args.processorPluginID, + []*apiv1.ProcessorPluginSpecifications{ + { + Name: cmd.args.processorPluginID, + Summary: "Encode a field to base64", + Description: "The processor will encode the value of the target field to base64 and store the\n" + + "result in the target field. It is not allowed to encode the `.Position` field.\n" + + "If the provided field doesn't exist, the processor will create that field and\n" + + "assign its value. Field is a reference to the target field. " + + "Note that it is not allowed to base64 encode. `.Position` field. ", + Author: "Meroxa, Inc.", + Version: "v0.1.0", + Parameters: map[string]*configv1.Parameter{ + "field": { + Type: configv1.Parameter_Type(apiv1.PluginSpecifications_Parameter_TYPE_STRING), + Description: "Field is a reference to the target field", + Default: "", + Validations: []*configv1.Validation{ + {Type: configv1.Validation_TYPE_REQUIRED}, + {Type: configv1.Validation_TYPE_EXCLUSION}, + }, + }, + }, + }, + }) + + client := &api.Client{ + ProcessorServiceClient: mockProcessorService, + } + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "Name: builtin:base64.encode@v0.1.0\n"+ + "Summary: Encode a field to base64\n"+ + "Description: The processor will encode the value of the target field to base64 and store the\n"+ + "result in the target field. It is not allowed to encode the `.Position` field.\n"+ + "If the provided field doesn't exist, the processor will create that field and\n"+ + "assign its value. Field is a reference to the target field. Note that it is not allowed to base64 encode. `.Position` field. \n"+ + "Author: Meroxa, Inc.\n"+ + "Version: v0.1.0\n"+ + "Parameters:\n"+ + "+-------+--------+------------------------------------------+---------+-------------------------+\n"+ + "| NAME | TYPE | DESCRIPTION | DEFAULT | VALIDATIONS |\n"+ + "+-------+--------+------------------------------------------+---------+-------------------------+\n"+ + "| field | string | Field is a reference to the target field | | [required], [exclusion] |\n"+ + "+-------+--------+------------------------------------------+---------+-------------------------+\n") } diff --git a/cmd/conduit/root/processorplugins/list.go b/cmd/conduit/root/processorplugins/list.go index 7c6a4cf0f..26f75ee25 100644 --- a/cmd/conduit/root/processorplugins/list.go +++ b/cmd/conduit/root/processorplugins/list.go @@ -31,6 +31,7 @@ var ( _ ecdysis.CommandWithDocs = (*ListCommand)(nil) _ ecdysis.CommandWithFlags = (*ListCommand)(nil) _ cecdysis.CommandWithExecuteWithClient = (*ListCommand)(nil) + _ ecdysis.CommandWithOutput = (*ListCommand)(nil) ) type ListFlags struct { @@ -38,7 +39,12 @@ type ListFlags struct { } type ListCommand struct { - flags ListFlags + flags ListFlags + output ecdysis.Output +} + +func (c *ListCommand) Output(output ecdysis.Output) { + c.output = output } func (c *ListCommand) Flags() []ecdysis.Flag { @@ -71,14 +77,14 @@ func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) return resp.Plugins[i].Name < resp.Plugins[j].Name }) - displayProcessorPlugins(resp.Plugins) + c.output.Stdout(getProcessorPluginsTable(resp.Plugins) + "\n") return nil } -func displayProcessorPlugins(processorPlugins []*apiv1.ProcessorPluginSpecifications) { +func getProcessorPluginsTable(processorPlugins []*apiv1.ProcessorPluginSpecifications) string { if len(processorPlugins) == 0 { - return + return "" } table := simpletable.New() @@ -98,6 +104,5 @@ func displayProcessorPlugins(processorPlugins []*apiv1.ProcessorPluginSpecificat table.Body.Cells = append(table.Body.Cells, r) } - table.SetStyle(simpletable.StyleDefault) - fmt.Println(table.String()) + return table.String() } diff --git a/cmd/conduit/root/processorplugins/list_test.go b/cmd/conduit/root/processorplugins/list_test.go index 8e95156f2..29c4cc689 100644 --- a/cmd/conduit/root/processorplugins/list_test.go +++ b/cmd/conduit/root/processorplugins/list_test.go @@ -15,11 +15,19 @@ package processorplugins import ( + "bytes" + "context" + "strings" "testing" + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" "github.com/matryer/is" "github.com/spf13/pflag" + "go.uber.org/mock/gomock" ) func TestListCommandFlags(t *testing.T) { @@ -54,3 +62,129 @@ func TestListCommandFlags(t *testing.T) { is.Equal(cf.Usage, f.usage) } } + +func TestListCommandExecuteWithClient_WithFlags(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + cmd := &ListCommand{ + flags: ListFlags{ + Name: "builtin", + }, + } + cmd.Output(out) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockProcessorService(ctrl) + + testutils.MockGetProcessorPlugins( + mockService, + ".*builtin.*", + []*apiv1.ProcessorPluginSpecifications{ + { + Name: "builtin:avro.decode@v0.1.0", + Summary: "Decodes a field's raw data in the Avro format.", + }, + { + Name: "builtin:avro.encode@v0.1.0", + Summary: "Encodes a record's field into the Avro format.", + }, + }) + + client := &api.Client{ProcessorServiceClient: mockService} + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + is.Equal(output, ""+ + "+----------------------------+------------------------------------------------+\n"+ + "| NAME | SUMMARY |\n"+ + "+----------------------------+------------------------------------------------+\n"+ + "| builtin:avro.decode@v0.1.0 | Decodes a field's raw data in the Avro format. |\n"+ + "| builtin:avro.encode@v0.1.0 | Encodes a record's field into the Avro format. |\n"+ + "+----------------------------+------------------------------------------------+\n") +} + +func TestListCommandExecuteWithClient_NoFlags(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + cmd := &ListCommand{} + cmd.Output(out) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockProcessorService(ctrl) + + testutils.MockGetProcessorPlugins( + mockService, + ".*.*", + []*apiv1.ProcessorPluginSpecifications{ + { + Name: "builtin:avro.decode@v0.1.0", + Summary: "Decodes a field's raw data in the Avro format.", + }, + { + Name: "builtin:avro.encode@v0.1.0", + Summary: "Encodes a record's field into the Avro format.", + }, + { + Name: "standalone:processor-simple", + Summary: "Example of a standalone processor.", + }, + }) + + client := &api.Client{ProcessorServiceClient: mockService} + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "+-----------------------------+------------------------------------------------+\n"+ + "| NAME | SUMMARY |\n"+ + "+-----------------------------+------------------------------------------------+\n"+ + "| builtin:avro.decode@v0.1.0 | Decodes a field's raw data in the Avro format. |\n"+ + "| builtin:avro.encode@v0.1.0 | Encodes a record's field into the Avro format. |\n"+ + "| standalone:processor-simple | Example of a standalone processor. |\n"+ + "+-----------------------------+------------------------------------------------+\n") +} + +func TestListCommandExecuteWithClient_EmptyResponse(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockProcessorService(ctrl) + + testutils.MockGetProcessorPlugins( + mockService, + ".*.*", + []*apiv1.ProcessorPluginSpecifications{}) + + client := &api.Client{ProcessorServiceClient: mockService} + cmd := &ListCommand{} + cmd.Output(out) + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := strings.TrimSpace(buf.String()) + is.True(len(output) == 0) +} diff --git a/cmd/conduit/root/processorplugins/processor_plugins.go b/cmd/conduit/root/processorplugins/processor_plugins.go index fd8a13ba8..08b6b1165 100644 --- a/cmd/conduit/root/processorplugins/processor_plugins.go +++ b/cmd/conduit/root/processorplugins/processor_plugins.go @@ -26,7 +26,9 @@ var ( type ProcessorPluginsCommand struct{} -func (c *ProcessorPluginsCommand) Aliases() []string { return []string{"processor-plugin"} } +func (c *ProcessorPluginsCommand) Aliases() []string { + return []string{"processor-plugin", "processorsplugins", "processorplugins", "processorplugin"} +} func (c *ProcessorPluginsCommand) SubCommands() []ecdysis.Command { return []ecdysis.Command{ diff --git a/cmd/conduit/root/processors/describe.go b/cmd/conduit/root/processors/describe.go index ca82ba19e..0383652b7 100644 --- a/cmd/conduit/root/processors/describe.go +++ b/cmd/conduit/root/processors/describe.go @@ -18,9 +18,10 @@ import ( "context" "fmt" + "github.com/conduitio/conduit/cmd/conduit/internal/display" + "github.com/conduitio/conduit/cmd/conduit/api" "github.com/conduitio/conduit/cmd/conduit/cecdysis" - "github.com/conduitio/conduit/cmd/conduit/internal" "github.com/conduitio/conduit/pkg/foundation/cerrors" apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" @@ -31,6 +32,7 @@ var ( _ ecdysis.CommandWithAliases = (*DescribeCommand)(nil) _ ecdysis.CommandWithDocs = (*DescribeCommand)(nil) _ ecdysis.CommandWithArgs = (*DescribeCommand)(nil) + _ ecdysis.CommandWithOutput = (*DescribeCommand)(nil) ) type DescribeArgs struct { @@ -38,7 +40,12 @@ type DescribeArgs struct { } type DescribeCommand struct { - args DescribeArgs + args DescribeArgs + output ecdysis.Output +} + +func (c *DescribeCommand) Output(output ecdysis.Output) { + c.output = output } func (c *DescribeCommand) Usage() string { return "describe" } @@ -76,28 +83,30 @@ func (c *DescribeCommand) ExecuteWithClient(ctx context.Context, client *api.Cli return fmt.Errorf("failed to get processor: %w", err) } - displayProcessor(resp.Processor) + displayProcessor(c.output, resp.Processor) return nil } -func displayProcessor(p *apiv1.Processor) { - fmt.Printf("ID: %s\n", p.Id) - fmt.Printf("Plugin: %s\n", p.Plugin) +func displayProcessor(out ecdysis.Output, p *apiv1.Processor) { + out.Stdout(fmt.Sprintf("ID: %s\n", p.Id)) + out.Stdout(fmt.Sprintf("Plugin: %s\n", p.Plugin)) - fmt.Printf("Parent: %s (%s)\n", internal.ProcessorParentToString(p.Parent.Type), p.Parent.Id) + if p.Parent != nil { + out.Stdout(fmt.Sprintf("Parent: %s (%s)\n", display.ProcessorParentToString(p.Parent.Type), p.Parent.Id)) + } - if !internal.IsEmpty(p.Condition) { - fmt.Printf("Condition: %s\n", p.Condition) + if !display.IsEmpty(p.Condition) { + out.Stdout(fmt.Sprintf("Condition: %s\n", p.Condition)) } if len(p.Config.Settings) > 0 { - fmt.Println("Config:") + out.Stdout("Config:\n") for name, value := range p.Config.Settings { - fmt.Printf("%s%s: %s\n", internal.Indentation(1), name, value) + out.Stdout(fmt.Sprintf("%s%s: %s\n", display.Indentation(1), name, value)) } } - fmt.Printf("Workers: %d\n", p.Config.Workers) + out.Stdout(fmt.Sprintf("Workers: %d\n", p.Config.Workers)) - fmt.Printf("Created At: %s\n", internal.PrintTime(p.CreatedAt)) - fmt.Printf("Updated At: %s\n", internal.PrintTime(p.UpdatedAt)) + out.Stdout(fmt.Sprintf("Created At: %s\n", display.PrintTime(p.CreatedAt))) + out.Stdout(fmt.Sprintf("Updated At: %s\n", display.PrintTime(p.UpdatedAt))) } diff --git a/cmd/conduit/root/processors/describe_test.go b/cmd/conduit/root/processors/describe_test.go index ab011bf4b..74639182f 100644 --- a/cmd/conduit/root/processors/describe_test.go +++ b/cmd/conduit/root/processors/describe_test.go @@ -15,9 +15,17 @@ package processors import ( + "bytes" + "context" "testing" + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "github.com/conduitio/ecdysis" "github.com/matryer/is" + "go.uber.org/mock/gomock" ) func TestDescribeExecutionNoArgs(t *testing.T) { @@ -54,3 +62,45 @@ func TestDescribeExecutionCorrectArgs(t *testing.T) { is.NoErr(err) is.Equal(c.args.ProcessorID, processorID) } + +func TestDescribeCommand_ExecuteWithClient(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + cmd := &DescribeCommand{args: DescribeArgs{ProcessorID: "processor-id"}} + cmd.Output(out) + + mockProcessorService := mock.NewMockProcessorService(ctrl) + testutils.MockGetProcessor( + mockProcessorService, + cmd.args.ProcessorID, "custom.javascript", `{{ eq .Metadata.filter "true" }}`, + &apiv1.Processor_Parent{ + Type: apiv1.Processor_Parent_TYPE_CONNECTOR, + Id: "source-connector", + }, + map[string]string{}) + + client := &api.Client{ + ProcessorServiceClient: mockProcessorService, + } + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + + is.Equal(output, ""+ + "ID: processor-id\n"+ + "Plugin: custom.javascript\n"+ + "Parent: connector (source-connector)\n"+ + "Condition: {{ eq .Metadata.filter \"true\" }}\n"+ + "Workers: 0\n"+ + "Created At: 1970-01-01T00:00:00Z\n"+ + "Updated At: 1970-01-01T00:00:00Z\n") +} diff --git a/cmd/conduit/root/processors/list.go b/cmd/conduit/root/processors/list.go index 294f7206b..21ffba40f 100644 --- a/cmd/conduit/root/processors/list.go +++ b/cmd/conduit/root/processors/list.go @@ -19,10 +19,11 @@ import ( "fmt" "sort" + "github.com/conduitio/conduit/cmd/conduit/internal/display" + "github.com/alexeyco/simpletable" "github.com/conduitio/conduit/cmd/conduit/api" "github.com/conduitio/conduit/cmd/conduit/cecdysis" - "github.com/conduitio/conduit/cmd/conduit/internal" apiv1 "github.com/conduitio/conduit/proto/api/v1" "github.com/conduitio/ecdysis" ) @@ -31,9 +32,16 @@ var ( _ cecdysis.CommandWithExecuteWithClient = (*ListCommand)(nil) _ ecdysis.CommandWithAliases = (*ListCommand)(nil) _ ecdysis.CommandWithDocs = (*ListCommand)(nil) + _ ecdysis.CommandWithOutput = (*ListCommand)(nil) ) -type ListCommand struct{} +type ListCommand struct { + output ecdysis.Output +} + +func (c *ListCommand) Output(output ecdysis.Output) { + c.output = output +} func (c *ListCommand) Docs() ecdysis.Docs { return ecdysis.Docs{ @@ -58,14 +66,14 @@ func (c *ListCommand) ExecuteWithClient(ctx context.Context, client *api.Client) return resp.Processors[i].Id < resp.Processors[j].Id }) - displayProcessors(resp.Processors) + c.output.Stdout(getProcessorsTable(resp.Processors) + "\n") return nil } -func displayProcessors(processors []*apiv1.Processor) { +func getProcessorsTable(processors []*apiv1.Processor) string { if len(processors) == 0 { - return + return "" } table := simpletable.New() @@ -82,18 +90,17 @@ func displayProcessors(processors []*apiv1.Processor) { } for _, p := range processors { - processorParent := fmt.Sprintf("%s (%s)", internal.ProcessorParentToString(p.Parent.Type), p.Parent.Id) + processorParent := fmt.Sprintf("%s (%s)", display.ProcessorParentToString(p.Parent.Type), p.Parent.Id) r := []*simpletable.Cell{ {Align: simpletable.AlignLeft, Text: p.Id}, {Align: simpletable.AlignLeft, Text: p.Plugin}, {Align: simpletable.AlignLeft, Text: processorParent}, {Align: simpletable.AlignLeft, Text: p.Condition}, - {Align: simpletable.AlignLeft, Text: internal.PrintTime(p.CreatedAt)}, - {Align: simpletable.AlignLeft, Text: internal.PrintTime(p.UpdatedAt)}, + {Align: simpletable.AlignLeft, Text: display.PrintTime(p.CreatedAt)}, + {Align: simpletable.AlignLeft, Text: display.PrintTime(p.UpdatedAt)}, } table.Body.Cells = append(table.Body.Cells, r) } - table.SetStyle(simpletable.StyleDefault) - fmt.Println(table.String()) + return table.String() } diff --git a/cmd/conduit/root/processors/list_test.go b/cmd/conduit/root/processors/list_test.go new file mode 100644 index 000000000..8c90d145c --- /dev/null +++ b/cmd/conduit/root/processors/list_test.go @@ -0,0 +1,119 @@ +// Copyright © 2025 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processors + +import ( + "bytes" + "context" + "strings" + "testing" + + "github.com/conduitio/conduit/cmd/conduit/api" + "github.com/conduitio/conduit/cmd/conduit/api/mock" + "github.com/conduitio/conduit/cmd/conduit/internal/testutils" + apiv1 "github.com/conduitio/conduit/proto/api/v1" + "github.com/conduitio/ecdysis" + "github.com/matryer/is" + "go.uber.org/mock/gomock" +) + +func TestListCommandExecuteWithClient(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockProcessorService(ctrl) + + processors := []*apiv1.Processor{ + { + Id: "processor1", + Parent: &apiv1.Processor_Parent{ + Type: apiv1.Processor_Parent_TYPE_CONNECTOR, + Id: "source-connector", + }, + Plugin: "plugin1", + Condition: `{{ eq .Metadata.filter "true" }}`, + Config: &apiv1.Processor_Config{ + Settings: map[string]string{ + "foo": "bar", + }, + }, + CreatedAt: testutils.GetDateTime(), + UpdatedAt: testutils.GetDateTime(), + }, + { + Id: "processor2", + Parent: &apiv1.Processor_Parent{ + Type: apiv1.Processor_Parent_TYPE_PIPELINE, + Id: "pipeline", + }, + Plugin: "plugin2", + Config: &apiv1.Processor_Config{Settings: map[string]string{}}, + CreatedAt: testutils.GetDateTime(), + UpdatedAt: testutils.GetDateTime(), + }, + } + + testutils.MockGetProcessors(mockService, processors) + + client := &api.Client{ + ProcessorServiceClient: mockService, + } + + cmd := &ListCommand{} + cmd.Output(out) + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := buf.String() + is.Equal(output, ""+ + "+------------+---------+------------------------------+----------------------------------+----------------------+----------------------+\n"+ + "| ID | PLUGIN | PARENT | CONDITION | CREATED | LAST_UPDATED |\n"+ + "+------------+---------+------------------------------+----------------------------------+----------------------+----------------------+\n"+ + "| processor1 | plugin1 | connector (source-connector) | {{ eq .Metadata.filter \"true\" }} | 1970-01-01T00:00:00Z | 1970-01-01T00:00:00Z |\n"+ + "| processor2 | plugin2 | pipeline (pipeline) | | 1970-01-01T00:00:00Z | 1970-01-01T00:00:00Z |\n"+ + "+------------+---------+------------------------------+----------------------------------+----------------------+----------------------+\n") +} + +func TestListCommandExecuteWithClient_EmptyResponse(t *testing.T) { + is := is.New(t) + + buf := new(bytes.Buffer) + out := &ecdysis.DefaultOutput{} + out.Output(buf, nil) + + ctx := context.Background() + ctrl := gomock.NewController(t) + + mockService := mock.NewMockProcessorService(ctrl) + + testutils.MockGetProcessors(mockService, []*apiv1.Processor{}) + client := &api.Client{ProcessorServiceClient: mockService} + + cmd := &ListCommand{} + cmd.Output(out) + + err := cmd.ExecuteWithClient(ctx, client) + is.NoErr(err) + + output := strings.TrimSpace(buf.String()) + is.True(len(output) == 0) +} diff --git a/cmd/conduit/root/root_test.go b/cmd/conduit/root/root_test.go index b09791c48..2541ca4be 100644 --- a/cmd/conduit/root/root_test.go +++ b/cmd/conduit/root/root_test.go @@ -65,7 +65,6 @@ func TestRootCommandExecuteWithVersionFlag(t *testing.T) { is := is.New(t) buf := new(bytes.Buffer) - out := &ecdysis.DefaultOutput{} out.Output(buf, nil)