Skip to content

Commit

Permalink
Add creation and modification timestamp to objects (#381)
Browse files Browse the repository at this point in the history
* Add creation and modification timestamp to objects

added CreatedAt and UpdatedAt timestamps for pipelines, connectors and processor
* still working on the tests

* proto conflict

* lint fix

* reviews + fix tests

* address reviews

* rename tn to now
  • Loading branch information
maha-hajja authored May 5, 2022
1 parent fcb7cca commit 31e2fa2
Show file tree
Hide file tree
Showing 23 changed files with 1,228 additions and 829 deletions.
12 changes: 10 additions & 2 deletions pkg/connector/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package connector

import (
"context"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand Down Expand Up @@ -50,11 +51,18 @@ func NewDefaultBuilder(logger log.CtxLogger, persister *Persister, service *plug

func (b *DefaultBuilder) Build(t Type) (Connector, error) {
var c Connector
now := time.Now()
switch t {
case TypeSource:
c = &source{}
c = &source{
XCreatedAt: now,
XUpdatedAt: now,
}
case TypeDestination:
c = &destination{}
c = &destination{
XCreatedAt: now,
XUpdatedAt: now,
}
default:
return nil, ErrInvalidConnectorType
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package connector

import (
"context"
"time"

"github.com/conduitio/conduit/pkg/record"
)
Expand All @@ -42,6 +43,10 @@ type Connector interface {
Config() Config
SetConfig(Config)

CreatedAt() time.Time
UpdatedAt() time.Time
SetUpdatedAt(time.Time)

// IsRunning returns true if the connector is running and ready to accept
// calls to Read or Write (depending on the connector type).
IsRunning() bool
Expand Down
17 changes: 17 additions & 0 deletions pkg/connector/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package connector
import (
"context"
"sync"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
Expand All @@ -36,6 +37,10 @@ type destination struct {
// logger is used for logging and is set when destination is created.
logger log.CtxLogger

// timestamps
XCreatedAt time.Time
XUpdatedAt time.Time

// persister is used for persisting the connector state when it changes.
persister *Persister

Expand Down Expand Up @@ -71,6 +76,18 @@ func (s *destination) SetConfig(d Config) {
s.XConfig = d
}

func (s *destination) CreatedAt() time.Time {
return s.XCreatedAt
}

func (s *destination) UpdatedAt() time.Time {
return s.XUpdatedAt
}

func (s *destination) SetUpdatedAt(t time.Time) {
s.XUpdatedAt = t
}

func (s *destination) State() DestinationState {
return s.XState
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/connector/mock/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (b Builder) NewDestinationMock(id string, d connector.Config) *Destination
m.EXPECT().Type().Return(connector.TypeDestination).AnyTimes()
m.EXPECT().ID().Return(id).AnyTimes()
m.EXPECT().Config().Return(d).AnyTimes()
m.EXPECT().CreatedAt().AnyTimes()
m.EXPECT().UpdatedAt().AnyTimes()
if b.SetupDestination != nil {
b.SetupDestination(m)
}
Expand All @@ -78,6 +80,8 @@ func (b Builder) NewSourceMock(id string, d connector.Config) *Source {
m.EXPECT().Type().Return(connector.TypeSource).AnyTimes()
m.EXPECT().ID().Return(id).AnyTimes()
m.EXPECT().Config().Return(d).AnyTimes()
m.EXPECT().CreatedAt().AnyTimes()
m.EXPECT().UpdatedAt().AnyTimes()
if b.SetupSource != nil {
b.SetupSource(m)
}
Expand Down
81 changes: 81 additions & 0 deletions pkg/connector/mock/connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/connector/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package connector
import (
"context"
"strings"
"time"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/database"
Expand Down Expand Up @@ -147,6 +148,7 @@ func (s *Service) Update(ctx context.Context, id string, data Config) (Connector
}

conn.SetConfig(data)
conn.SetUpdatedAt(time.Now())

// persist conn
err = s.store.Set(ctx, id, conn)
Expand All @@ -167,6 +169,7 @@ func (s *Service) AddProcessor(ctx context.Context, connectorID string, processo
d := conn.Config()
d.ProcessorIDs = append(d.ProcessorIDs, processorID)
conn.SetConfig(d)
conn.SetUpdatedAt(time.Now())

// persist conn
err = s.store.Set(ctx, connectorID, conn)
Expand Down Expand Up @@ -198,6 +201,7 @@ func (s *Service) RemoveProcessor(ctx context.Context, connectorID string, proce

d.ProcessorIDs = d.ProcessorIDs[:processorIndex+copy(d.ProcessorIDs[processorIndex:], d.ProcessorIDs[processorIndex+1:])]
conn.SetConfig(d)
conn.SetUpdatedAt(time.Now())

// persist conn
err = s.store.Set(ctx, connectorID, conn)
Expand Down
9 changes: 9 additions & 0 deletions pkg/connector/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/connector/mock"
Expand Down Expand Up @@ -349,11 +350,19 @@ func TestService_UpdateSuccess(t *testing.T) {
PipelineID: uuid.NewString(),
}

beforeUpdate := time.Now()

connBuilder := mock.Builder{
Ctrl: ctrl,
SetupSource: func(source *mock.Source) {
source.EXPECT().Validate(ctx, want.Settings).Return(nil)
source.EXPECT().SetConfig(want)
source.
EXPECT().
SetUpdatedAt(gomock.AssignableToTypeOf(time.Time{})).
Do(func(got time.Time) {
assert.Equal(t, got.After(beforeUpdate), true)
})
},
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ type source struct {
XID string
XConfig Config
XState SourceState
// timestamps
XCreatedAt time.Time
XUpdatedAt time.Time

// logger is used for logging and is set when source is created.
logger log.CtxLogger
Expand Down Expand Up @@ -73,6 +76,18 @@ func (s *source) SetConfig(d Config) {
s.XConfig = d
}

func (s *source) CreatedAt() time.Time {
return s.XCreatedAt
}

func (s *source) UpdatedAt() time.Time {
return s.XUpdatedAt
}

func (s *source) SetUpdatedAt(t time.Time) {
s.XUpdatedAt = t
}

func (s *source) State() SourceState {
return s.XState
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/pipeline/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package pipeline

import (
"time"

"github.com/conduitio/conduit/pkg/pipeline/stream"
"gopkg.in/tomb.v2"
)
Expand All @@ -35,10 +37,12 @@ type Status int
// can be either Destination or Source. The pipeline sets up its publishers and
// subscribers based on whether the Connector in question is a Destination or a Source.
type Instance struct {
ID string
Config Config
Status Status
Error string
ID string
Config Config
Status Status
Error string
CreatedAt time.Time
UpdatedAt time.Time

ConnectorIDs []string
ProcessorIDs []string
Expand Down
Loading

0 comments on commit 31e2fa2

Please sign in to comment.