Skip to content

Commit

Permalink
Data Change Management - Toggle feature (#29)
Browse files Browse the repository at this point in the history
* feat: toggle-feature for schema change

* chore: removing print statements

* unit test added

* refactoring

* refactoring
  • Loading branch information
Vaishnavi190900 authored Apr 30, 2024
1 parent 9d3690a commit 962b9a2
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 26 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type StatsDConfig struct {
type SchemaChangeConfig struct {
KafkaTopic string
Depth int32
Enable bool `default:"false"`
}

// Config Server config
Expand Down
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ statsd:
schemachange:
kafkatopic: "schema_change"
depth: 5

enable: false

47 changes: 27 additions & 20 deletions core/schema/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,29 +123,36 @@ func (s *Service) Create(ctx context.Context, nsName string, schemaName string,
Compatibility: compatibility,
}
versionID := getIDforSchema(nsName, schemaName, sf.ID)
_, prevSchemaData, dataErr := s.GetLatest(ctx, nsName, schemaName)
version, err := s.repo.Create(ctx, nsName, schemaName, mergedMetadata, versionID, sf)
if dataErr == nil {
changeRequest := &changedetector.ChangeRequest{
NamespaceID: nsName,
SchemaName: schemaName,
Version: version,
VersionID: versionID,
OldData: prevSchemaData,
NewData: data,
Depth: s.config.SchemaChange.Depth,
}
go func() {
newCtx, cancel := context.WithTimeout(context.Background(), 1*time.Hour)
defer cancel()
err := s.identifySchemaChangeWithContext(newCtx, changeRequest)
if err != nil {
log.Printf("got error while identifying schema change event %s", err.Error())
if err != nil {
log.Printf("got error while creating schema %s in namespace %s -> %s", schemaName, nsName, err.Error())
}

if s.config.SchemaChange.Enable {
_, prevSchemaData, err2 := s.GetLatest(ctx, nsName, schemaName)
if err2 == nil {
changeRequest := &changedetector.ChangeRequest{
NamespaceID: nsName,
SchemaName: schemaName,
Version: version,
VersionID: versionID,
OldData: prevSchemaData,
NewData: data,
Depth: s.config.SchemaChange.Depth,
}
}()
} else {
log.Printf("got error while getting previous schema data %s", dataErr.Error())
go func() {
newCtx, cancel := context.WithTimeout(context.Background(), 1*time.Hour)
defer cancel()
err := s.identifySchemaChangeWithContext(newCtx, changeRequest)
if err != nil {
log.Printf("got error while identifying schema change event %s", err.Error())
}
}()
} else {
log.Printf("got error while getting previous schema data %s", err2.Error())
}
}

return SchemaInfo{
Version: version,
ID: versionID,
Expand Down
63 changes: 61 additions & 2 deletions core/schema/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ func getSvc() (*schema.Service, *mocks.NamespaceService, *mocks.SchemaProvider,
cache.On("Get", mock.Anything).Return("", false)
cache.On("Set", mock.Anything, mock.Anything, mock.Anything).Return(false)
producer := &mocks.Producer{}
config := &config.Config{}
svc := schema.NewService(schemaRepo, schemaProvider, nsService, cache, newRelic, cdService, producer, config, neRepo)
conf := &config.Config{
SchemaChange: config.SchemaChangeConfig{
Enable: true,
},
}
svc := schema.NewService(schemaRepo, schemaProvider, nsService, cache, newRelic, cdService, producer, conf, neRepo)
return svc, nsService, schemaProvider, schemaRepo, newRelic, cdService, producer, neRepo
}

Expand Down Expand Up @@ -160,6 +164,7 @@ func TestSchemaCreate(t *testing.T) {

t.Run("should identify schema change event and not push to kafka and db when updated schemas is zero", func(t *testing.T) {
svc, nsService, schemaProvider, schemaRepo, newrelic, cdService, producer, neRepo := getSvc()

scFile := &schema.SchemaFile{}
parsedSchema := &mocks.ParsedSchema{}
nsName := "testNamespace"
Expand Down Expand Up @@ -201,6 +206,60 @@ func TestSchemaCreate(t *testing.T) {
assert.True(t, dataCheck)
})

t.Run("should not trigger identify schema change if the feature flag is OFF", func(t *testing.T) {
nsService := &mocks.NamespaceService{}
schemaProvider := &mocks.SchemaProvider{}
schemaRepo := &mocks.SchemaRepository{}
cache := &mocks.SchemaCache{}
newrelic := &mocks2.NewRelic{}
cdService := &mocks.ChangeDetectorService{}
cache.On("Get", mock.Anything).Return("", false)
cache.On("Set", mock.Anything, mock.Anything, mock.Anything).Return(false)
neRepo := &mocks.NotificationEventRepository{}
producer := &mocks.Producer{}
conf := &config.Config{
SchemaChange: config.SchemaChangeConfig{
Enable: false,
},
}
svc := schema.NewService(schemaRepo, schemaProvider, nsService, cache, newrelic, cdService, producer, conf, neRepo)
ctx := context.Background()
scFile := &schema.SchemaFile{}
parsedSchema := &mocks.ParsedSchema{}
nsName := "testNamespace"
data := []byte("data")
nsService.On("Get", mock.Anything, nsName).Return(namespace.Namespace{Format: "protobuf"}, nil)
schemaProvider.On("ParseSchema", "protobuf", data).Return(parsedSchema, nil)
schemaRepo.On("GetLatestVersion", mock.Anything, nsName, "a").Return(int32(3), nil)
schemaRepo.On("Get", mock.Anything, nsName, "a", int32(3)).Return(data, nil)
schemaRepo.On("GetMetadata", mock.Anything, nsName, "a").Return(&schema.Metadata{Format: "protobuf"}, nil)
parsedSchema.On("GetCanonicalValue").Return(scFile)
schemaRepo.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int32(1), nil)

var called, compatibility, metadata, dataCheck bool
newrelic.On("StartGenericSegment", mock.Anything, "Create Schema Info").Return(func() { called = true })
newrelic.On("StartGenericSegment", mock.Anything, "Compatibility checker").Return(func() { compatibility = true })
newrelic.On("StartGenericSegment", mock.Anything, "GetMetaData").Return(func() { metadata = true })
newrelic.On("StartGenericSegment", mock.Anything, "GetData").Return(func() { dataCheck = true })
scInfo, err := svc.Create(ctx, nsName, "a", &schema.Metadata{}, data)
time.Sleep(100 * time.Millisecond)
assert.NoError(t, err)
assert.Equal(t, scInfo.Version, int32(1))

schemaRepo.AssertExpectations(t)
nsService.AssertExpectations(t)
cdService.AssertNotCalled(t, "IdentifySchemaChange")
producer.AssertNotCalled(t, "Write")
neRepo.AssertNotCalled(t, "GetByNameSpaceSchemaVersionAndSuccess")
neRepo.AssertNotCalled(t, "Create")
neRepo.AssertNotCalled(t, "Update")
newrelic.AssertExpectations(t)
assert.True(t, called)
assert.True(t, compatibility)
assert.True(t, metadata)
assert.True(t, dataCheck)
})

t.Run("should return error if unable to get prev latest schema", func(t *testing.T) {
svc, nsService, schemaProvider, schemaRepo, newrelic, _, _, _ := getSvc()
parsedSchema := &mocks.ParsedSchema{}
Expand Down
8 changes: 5 additions & 3 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,22 @@ func Start(cfg config.Config) {
panic(err)
}
newRelic := &newRelic2.NewRelic{}

changeDetectorService := changedetector.NewService(newRelic)

statsDconfig := &statsd.ClientConfig{
statsdConfig := &statsd.ClientConfig{
Address: cfg.StatsD.Address,
Prefix: cfg.StatsD.Prefix,
}
fmt.Printf("Kafka Adress %s", cfg.KafkaProducer.BootstrapServer)
statsDClient, err := statsd.NewClientWithConfig(statsDconfig)
statsdClient, err := statsd.NewClientWithConfig(statsdConfig)
if err != nil {
log.Fatal("Error creating StatsD client:", err)
}
producer := kafka.NewWriter(cfg.KafkaProducer.BootstrapServer, cfg.KafkaProducer.Timeout, cfg.KafkaProducer.Retries, statsDClient)
producer := kafka.NewWriter(cfg.KafkaProducer.BootstrapServer, cfg.KafkaProducer.Timeout, cfg.KafkaProducer.Retries, statsdClient)

notificationEventRepo := postgres.NewNotificationEventRepository(db)

schemaService := schema.NewService(schemaRepository, provider.NewSchemaProvider(), namespaceService, cache, newRelic, changeDetectorService, producer, &cfg, notificationEventRepo)

searchRepository := postgres.NewSearchRepository(db)
Expand Down

0 comments on commit 962b9a2

Please sign in to comment.