diff --git a/pkg/provisioning/errors.go b/pkg/provisioning/errors.go index 75ea22f14..bbb28bd31 100644 --- a/pkg/provisioning/errors.go +++ b/pkg/provisioning/errors.go @@ -17,5 +17,6 @@ package provisioning import "github.com/conduitio/conduit/pkg/foundation/cerrors" var ( - ErrDuplicatedPipelineID = cerrors.New("duplicated pipeline ID") + ErrDuplicatedPipelineID = cerrors.New("duplicated pipeline ID") + ErrNotProvisionedByConfig = cerrors.New("entity was not provisioned by a config file and therefore can't be mutated by the provisioning service") ) diff --git a/pkg/provisioning/service.go b/pkg/provisioning/service.go index cf2c1d4e8..f305ff104 100644 --- a/pkg/provisioning/service.go +++ b/pkg/provisioning/service.go @@ -126,6 +126,28 @@ func (s *Service) Init(ctx context.Context) error { return multierr } +// Delete exposes a way to delete pipelines provisioned using the provisioning +// service. +func (s *Service) Delete(ctx context.Context, id string) error { + pl, err := s.pipelineService.Get(ctx, id) + if err != nil { + return cerrors.Errorf("could not get pipeline %q: %w", id, err) + } + if pl.ProvisionedBy != pipeline.ProvisionTypeConfig { + return ErrNotProvisionedByConfig + } + oldConfig, err := s.Export(ctx, id) + if err != nil { + return cerrors.Errorf("failed to export pipeline: %w", err) + } + actions := s.newActionsBuilder().Build(oldConfig, config.Pipeline{}) + _, err = s.executeActions(ctx, actions) + if err != nil { + return cerrors.Errorf("failed to delete pipeline: %w", err) + } + return nil +} + // getYamlFiles recursively reads folders in the path and collects paths to all // files that end with .yml or .yaml. func (s *Service) getYamlFiles(path string) ([]string, error) { @@ -223,31 +245,22 @@ func (s *Service) provisionPipeline(ctx context.Context, cfg config.Pipeline) er return nil } -func (s *Service) deleteOldPipelines(ctx context.Context, ids []string) []string { +func (s *Service) deleteOldPipelines(ctx context.Context, keepIDs []string) []string { var deletedIDs []string pipelines := s.pipelineService.List(ctx) for id, pl := range pipelines { - if !slices.Contains(ids, id) && pl.ProvisionedBy == pipeline.ProvisionTypeConfig { - oldConfig, err := s.Export(ctx, id) - if err != nil { - s.logger.Warn(ctx). - Err(err). - Str(log.PipelineIDField, id). - Msg("failed to delete a pipeline provisioned by a config file, the pipeline is probably in a broken state, Conduit will try to remove it again next time it runs") - continue - } - actions := s.newActionsBuilder().Build(oldConfig, config.Pipeline{}) - _, err = s.executeActions(ctx, actions) - if err != nil { - s.logger.Warn(ctx). - Err(err). - Str(log.PipelineIDField, id). - Msg("failed to delete a pipeline provisioned by a config file, the pipeline is probably in a broken state, Conduit will try to remove it again next time it runs") - continue - } - - deletedIDs = append(deletedIDs, id) + if slices.Contains(keepIDs, id) || pl.ProvisionedBy != pipeline.ProvisionTypeConfig { + continue + } + err := s.Delete(ctx, id) + if err != nil { + s.logger.Warn(ctx). + Err(err). + Str(log.PipelineIDField, id). + Msg("failed to delete a pipeline provisioned by a config file, the pipeline is probably in a broken state, Conduit will try to remove it again next time it runs") + continue } + deletedIDs = append(deletedIDs, id) } return deletedIDs } diff --git a/pkg/provisioning/service_test.go b/pkg/provisioning/service_test.go index e42306a58..5112c921e 100644 --- a/pkg/provisioning/service_test.go +++ b/pkg/provisioning/service_test.go @@ -102,7 +102,7 @@ var ( } ) -func TestProvision_Create(t *testing.T) { +func TestService_Init_Create(t *testing.T) { is := is.New(t) logger := log.Nop() ctrl := gomock.NewController(t) @@ -135,7 +135,7 @@ func TestProvision_Create(t *testing.T) { is.NoErr(err) } -func TestProvision_Update(t *testing.T) { +func TestService_Init_Update(t *testing.T) { is := is.New(t) logger := log.Nop() ctrl := gomock.NewController(t) @@ -167,7 +167,7 @@ func TestProvision_Update(t *testing.T) { is.NoErr(err) } -func TestProvision_Delete(t *testing.T) { +func TestService_Init_Delete(t *testing.T) { is := is.New(t) logger := log.Nop() ctrl := gomock.NewController(t) @@ -179,7 +179,7 @@ func TestProvision_Delete(t *testing.T) { pipelineService.EXPECT().List(anyCtx).Return(map[string]*pipeline.Instance{oldPipelineInstance.ID: oldPipelineInstance}) // export pipeline - pipelineService.EXPECT().Get(anyCtx, oldPipelineInstance.ID).Return(oldPipelineInstance, nil) + pipelineService.EXPECT().Get(anyCtx, oldPipelineInstance.ID).Return(oldPipelineInstance, nil).Times(2) connService.EXPECT().Get(anyCtx, oldConnector1Instance.ID).Return(oldConnector1Instance, nil) connService.EXPECT().Get(anyCtx, oldConnector2Instance.ID).Return(oldConnector2Instance, nil) procService.EXPECT().Get(anyCtx, oldConnectorProcessorInstance.ID).Return(oldConnectorProcessorInstance, nil) @@ -196,7 +196,7 @@ func TestProvision_Delete(t *testing.T) { is.NoErr(err) } -func TestProvision_NoRollbackOnFailedStart(t *testing.T) { +func TestService_Init_NoRollbackOnFailedStart(t *testing.T) { is := is.New(t) logger := log.Nop() ctrl := gomock.NewController(t) @@ -229,7 +229,7 @@ func TestProvision_NoRollbackOnFailedStart(t *testing.T) { is.True(cerrors.Is(err, wantErr)) } -func TestProvision_RollbackCreate(t *testing.T) { +func TestService_Init_RollbackCreate(t *testing.T) { is := is.New(t) logger := log.Nop() ctrl := gomock.NewController(t) @@ -265,7 +265,7 @@ func TestProvision_RollbackCreate(t *testing.T) { is.True(cerrors.Is(err, wantErr)) } -func TestProvision_RollbackUpdate(t *testing.T) { +func TestService_Init_RollbackUpdate(t *testing.T) { is := is.New(t) logger := log.Nop() ctrl := gomock.NewController(t) @@ -302,7 +302,7 @@ func TestProvision_RollbackUpdate(t *testing.T) { is.True(cerrors.Is(err, wantErr)) } -func TestProvision_MultiplePipelinesDuplicatedPipelineID(t *testing.T) { +func TestService_Init_MultiplePipelinesDuplicatedPipelineID(t *testing.T) { is := is.New(t) logger := log.Nop() ctrl := gomock.NewController(t) @@ -328,7 +328,7 @@ func TestProvision_MultiplePipelinesDuplicatedPipelineID(t *testing.T) { is.True(cerrors.Is(err, ErrDuplicatedPipelineID)) // duplicated pipeline id } -func TestProvision_MultiplePipelines(t *testing.T) { +func TestService_Init_MultiplePipelines(t *testing.T) { is := is.New(t) logger := log.Nop() ctrl := gomock.NewController(t) @@ -366,7 +366,32 @@ func TestProvision_MultiplePipelines(t *testing.T) { is.NoErr(err) } -func TestProvision_IntegrationTestServices(t *testing.T) { +func TestService_Delete(t *testing.T) { + is := is.New(t) + logger := log.Nop() + ctrl := gomock.NewController(t) + + service, pipelineService, connService, procService, plugService := newTestService(ctrl, logger) + + // export pipeline + pipelineService.EXPECT().Get(anyCtx, oldPipelineInstance.ID).Return(oldPipelineInstance, nil).Times(2) + connService.EXPECT().Get(anyCtx, oldConnector1Instance.ID).Return(oldConnector1Instance, nil) + connService.EXPECT().Get(anyCtx, oldConnector2Instance.ID).Return(oldConnector2Instance, nil) + procService.EXPECT().Get(anyCtx, oldConnectorProcessorInstance.ID).Return(oldConnectorProcessorInstance, nil) + procService.EXPECT().Get(anyCtx, oldPipelineProcessorInstance.ID).Return(oldPipelineProcessorInstance, nil) + + // delete pipeline + pipelineService.EXPECT().Delete(anyCtx, oldPipelineInstance.ID).Return(nil) + connService.EXPECT().Delete(anyCtx, oldConnector1Instance.ID, plugService).Return(nil) + connService.EXPECT().Delete(anyCtx, oldConnector2Instance.ID, plugService).Return(nil) + procService.EXPECT().Delete(anyCtx, oldConnectorProcessorInstance.ID).Return(nil) + procService.EXPECT().Delete(anyCtx, oldPipelineProcessorInstance.ID).Return(nil) + + err := service.Delete(context.Background(), oldPipelineInstance.ID) + is.NoErr(err) +} + +func TestService_IntegrationTestServices(t *testing.T) { is := is.New(t) ctx, killAll := context.WithCancel(context.Background()) defer killAll()