Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate connector name and pipeline description through API #1242

Merged
merged 20 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
03c9b3b
validate connector name when creating a connector through the API
AdamHaffar Oct 17, 2023
fbdad7a
validate pipeline description character limit to be 250 characters
AdamHaffar Oct 17, 2023
05ed470
Merge branch 'main' into adam/validate-connector-names-api
AdamHaffar Oct 17, 2023
7f8a3ba
add validation for name, description and id of pipeline
AdamHaffar Oct 25, 2023
20019ce
add validation for name and id of connectors
AdamHaffar Oct 25, 2023
7da2d57
lint fix
AdamHaffar Oct 25, 2023
74bb0ff
regexp pattern fix
AdamHaffar Oct 25, 2023
47e52a3
regexp pattern reformat to string
AdamHaffar Oct 25, 2023
796092a
resolve pr conversations, use multierr to return err in validateConne…
AdamHaffar Oct 30, 2023
f5107db
Merge branch 'main' into adam/validate-connector-names-api
AdamHaffar Nov 12, 2023
47ec46a
modify regex to correctly match and add tests for validation in conne…
AdamHaffar Nov 13, 2023
8c94f97
Merge remote-tracking branch 'origin/adam/validate-connector-names-ap…
AdamHaffar Nov 13, 2023
9ad0760
modify and add tests for service_test.go
AdamHaffar Nov 14, 2023
8b1a504
update regex to confide with provisioning test
AdamHaffar Nov 14, 2023
52864a3
lint fix
AdamHaffar Nov 14, 2023
1c159ac
Merge branch 'main' into adam/validate-connector-names-api
AdamHaffar Nov 14, 2023
c2d84a7
Update pkg/pipeline/errors.go
AdamHaffar Nov 22, 2023
0d092ee
Merge branch 'main' into adam/validate-connector-names-api
AdamHaffar Nov 23, 2023
49d851b
regex fix
AdamHaffar Nov 29, 2023
e823401
Merge branch 'main' into adam/validate-connector-names-api
AdamHaffar Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/connector/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ var (
ErrInvalidConnectorStateType = cerrors.New("invalid connector state type")
ErrProcessorIDNotFound = cerrors.New("processor ID not found")
ErrConnectorRunning = cerrors.New("connector is running")
ErrInvalidCharacters = cerrors.New("connector ID contains invalid characters")
ErrIDOverLimit = cerrors.New("connector ID is over the character limit (64)")
ErrNameOverLimit = cerrors.New("connector name is over the character limit (64)")
ErrNameMissing = cerrors.New("must provide a connector name")
ErrIDMissing = cerrors.New("must provide a connector ID")
)
2 changes: 1 addition & 1 deletion pkg/connector/persister_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestPersister_WaitsForOpenConnectorsAndFlush(t *testing.T) {
persister.Wait()

// we are testing a delay which is not exact, this is the acceptable margin
maxDelay := delay + time.Millisecond*10
maxDelay := delay + time.Millisecond*11
AdamHaffar marked this conversation as resolved.
Show resolved Hide resolved
if gotDelay := time.Since(timeAtStart); gotDelay > maxDelay {
t.Fatalf("wait delay should be between %s and %s, actual delay: %s", delay, maxDelay, gotDelay)
}
Expand Down
35 changes: 34 additions & 1 deletion pkg/connector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package connector

import (
"context"
"regexp"
"strings"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/foundation/multierror"
)

var idRegex = regexp.MustCompile(`^[A-Za-z0-9-_:]+$`)

// Service manages connectors.
type Service struct {
logger log.CtxLogger
Expand Down Expand Up @@ -111,6 +115,11 @@ func (s *Service) Create(
cfg Config,
p ProvisionType,
) (*Instance, error) {
err := s.validateConnector(cfg, id)
if err != nil {
return nil, cerrors.Errorf("connector is invalid: %w", err)
}

// determine the path of the Connector binary
if plugin == "" {
return nil, cerrors.New("must provide a plugin")
Expand Down Expand Up @@ -142,7 +151,7 @@ func (s *Service) Create(
}

// persist instance
err := s.store.Set(ctx, id, conn)
err = s.store.Set(ctx, id, conn)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -275,3 +284,27 @@ func (s *Service) SetState(ctx context.Context, id string, state any) (*Instance

return conn, err
}
func (s *Service) validateConnector(cfg Config, id string) error {
// contains all the errors occurred while provisioning configuration files.
var multierr error

if cfg.Name == "" {
multierr = multierror.Append(multierr, ErrNameMissing)
}
if len(cfg.Name) > 64 {
multierr = multierror.Append(multierr, ErrNameOverLimit)
}
if id == "" {
multierr = multierror.Append(multierr, ErrIDMissing)
} else {
AdamHaffar marked this conversation as resolved.
Show resolved Hide resolved
matched := idRegex.MatchString(id)
if !matched {
multierr = multierror.Append(multierr, ErrInvalidCharacters)
}
}
if len(id) > 64 {
multierr = multierror.Append(multierr, ErrIDOverLimit)
}

return multierr
}
125 changes: 123 additions & 2 deletions pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ func TestService_CreateDLQ(t *testing.T) {
TypeDestination,
"test-plugin",
uuid.NewString(),
Config{},
Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
ProvisionTypeDLQ,
)
is.NoErr(err)
Expand Down Expand Up @@ -271,7 +274,8 @@ func TestService_CreateError(t *testing.T) {
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}}
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -290,6 +294,123 @@ func TestService_CreateError(t *testing.T) {
}
}

func TestService_Create_ValidateSuccess(t *testing.T) {
is := is.New(t)
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}

service := NewService(logger, db, nil)

testCases := []struct {
name string
connID string
data Config
}{{
name: "valid config name",
connID: uuid.NewString(),
data: Config{
Name: "Name#@-/_0%$",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "valid connector ID",
connID: "Aa0-_",
data: Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
got, err := service.Create(
ctx,
tt.connID,
TypeSource,
"test-plugin",
uuid.NewString(),
tt.data,
ProvisionTypeAPI,
)
is.True(got != nil)
is.Equal(err, nil)
})
}
}

func TestService_Create_ValidateError(t *testing.T) {
is := is.New(t)
ctx := context.Background()
logger := log.Nop()
db := &inmemory.DB{}

service := NewService(logger, db, nil)

testCases := []struct {
name string
connID string
errType error
data Config
}{{
name: "empty config name",
connID: uuid.NewString(),
errType: ErrNameMissing,
data: Config{
Name: "",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "connector name over 64 characters",
connID: uuid.NewString(),
errType: ErrNameOverLimit,
data: Config{
Name: "aaaaaaaaa1bbbbbbbbb2ccccccccc3ddddddddd4eeeeeeeee5fffffffff6ggggg",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "connector ID over 64 characters",
connID: "aaaaaaaaa1bbbbbbbbb2ccccccccc3ddddddddd4eeeeeeeee5fffffffff6ggggg",
errType: ErrIDOverLimit,
data: Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "invalid characters in connector ID",
connID: "a%bc",
errType: ErrInvalidCharacters,
data: Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}, {
name: "empty connector ID",
connID: "",
errType: ErrIDMissing,
data: Config{
Name: "test-connector",
Settings: map[string]string{"foo": "bar"},
},
}}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
got, err := service.Create(
ctx,
tt.connID,
TypeSource,
"test-plugin",
uuid.NewString(),
tt.data,
ProvisionTypeAPI,
)
is.True(cerrors.Is(err, tt.errType))
is.Equal(got, nil)
})
}
}

func TestService_GetInstanceNotFound(t *testing.T) {
is := is.New(t)
ctx := context.Background()
Expand Down
23 changes: 14 additions & 9 deletions pkg/pipeline/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ package pipeline
import "github.com/conduitio/conduit/pkg/foundation/cerrors"

var (
ErrTimeout = cerrors.New("operation timed out")
ErrGracefulShutdown = cerrors.New("graceful shutdown")
ErrPipelineRunning = cerrors.New("pipeline is running")
ErrPipelineNotRunning = cerrors.New("pipeline not running")
ErrInstanceNotFound = cerrors.New("pipeline instance not found")
ErrNameMissing = cerrors.New("must provide a pipeline name")
ErrNameAlreadyExists = cerrors.New("pipeline name already exists")
ErrConnectorIDNotFound = cerrors.New("connector ID not found")
ErrProcessorIDNotFound = cerrors.New("processor ID not found")
ErrTimeout = cerrors.New("operation timed out")
ErrGracefulShutdown = cerrors.New("graceful shutdown")
ErrPipelineRunning = cerrors.New("pipeline is running")
ErrPipelineNotRunning = cerrors.New("pipeline not running")
ErrInstanceNotFound = cerrors.New("pipeline instance not found")
ErrNameMissing = cerrors.New("must provide a pipeline name")
ErrIDMissing = cerrors.New("must provide a connector ID")
AdamHaffar marked this conversation as resolved.
Show resolved Hide resolved
ErrNameAlreadyExists = cerrors.New("pipeline name already exists")
ErrInvalidCharacters = cerrors.New("pipeline ID contains invalid characters")
ErrNameOverLimit = cerrors.New("pipeline name is over the character limit (64)")
ErrIDOverLimit = cerrors.New("pipeline ID is over the character limit (64)")
ErrDescriptionOverLimit = cerrors.New("pipeline description is over the character limit (8192)")
ErrConnectorIDNotFound = cerrors.New("connector ID not found")
ErrProcessorIDNotFound = cerrors.New("processor ID not found")
)
44 changes: 38 additions & 6 deletions pkg/pipeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,19 @@ package pipeline

import (
"context"
"regexp"
"strings"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/foundation/multierror"
)

var idRegex = regexp.MustCompile(`^[A-Za-z0-9-_:]+$`)

type FailureEvent struct {
// ID is the ID of the pipeline which failed.
ID string
Expand Down Expand Up @@ -111,11 +115,9 @@ func (s *Service) Get(_ context.Context, id string) (*Instance, error) {
// Create will create a new pipeline instance with the given config and return
// it if it was successfully saved to the database.
func (s *Service) Create(ctx context.Context, id string, cfg Config, p ProvisionType) (*Instance, error) {
if cfg.Name == "" {
return nil, ErrNameMissing
}
if s.instanceNames[cfg.Name] {
return nil, ErrNameAlreadyExists
err := s.validatePipeline(cfg, id)
if err != nil {
return nil, cerrors.Errorf("pipeline is invalid: %w", err)
}

t := time.Now()
Expand All @@ -129,7 +131,7 @@ func (s *Service) Create(ctx context.Context, id string, cfg Config, p Provision
DLQ: DefaultDLQ,
}

err := s.store.Set(ctx, pl.ID, pl)
err = s.store.Set(ctx, pl.ID, pl)
if err != nil {
return nil, cerrors.Errorf("failed to save pipeline with ID %q: %w", pl.ID, err)
}
Expand Down Expand Up @@ -326,3 +328,33 @@ func (s *Service) notify(pipelineID string, err error) {
handler(e)
}
}
func (s *Service) validatePipeline(cfg Config, id string) error {
AdamHaffar marked this conversation as resolved.
Show resolved Hide resolved
// contains all the errors occurred while provisioning configuration files.
var multierr error

if cfg.Name == "" {
multierr = multierror.Append(multierr, ErrNameMissing)
}
if s.instanceNames[cfg.Name] {
multierr = multierror.Append(multierr, ErrNameAlreadyExists)
}
if len(cfg.Name) > 64 {
multierr = multierror.Append(multierr, ErrNameOverLimit)
}
if len(cfg.Description) > 8192 {
multierr = multierror.Append(multierr, ErrDescriptionOverLimit)
}
if id == "" {
multierr = multierror.Append(multierr, ErrIDMissing)
} else {
matched := idRegex.MatchString(id)
if !matched {
multierr = multierror.Append(multierr, ErrInvalidCharacters)
}
}
if len(id) > 64 {
multierr = multierror.Append(multierr, ErrIDOverLimit)
}

return multierr
}
Loading
Loading