Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Implement Sandbox notifications processor and publisher #595

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8061ef2
1st version of email publisher
Jul 23, 2023
af73e9a
fix input
Jul 23, 2023
870f382
email feature
Jul 25, 2023
89ccfd2
Update boilerplate version (#589)
flyte-bot Jul 14, 2023
66fb45d
Update boilerplate version (#594)
flyte-bot Jul 26, 2023
b8706bb
sandbox email publisher and processor with test
Jul 27, 2023
c4a0fcf
delete unnecessary comments
Jul 27, 2023
d821aaf
Merge branch 'flyteorg:master' into first-version-email-publihser
Future-Outlier Jul 27, 2023
dc16bb2
update go.sum
Jul 28, 2023
aad4081
Merge branch 'first-version-email-publihser' of https://github.com/Fu…
Jul 28, 2023
8e4c50a
add common.Sandbox type
Jul 29, 2023
e5c12bf
remove emailer setting in factory.go and add mock emailer test in sa…
Jul 30, 2023
be74e6e
update file go.mod and go.sum
Jul 30, 2023
45b050e
Add test for NewNotificationsPublisher and NewNotificationsProcessor …
Aug 1, 2023
742c010
complete test for StartProcessing, Publish and StopProcessing in Test…
Aug 1, 2023
c05a78c
refactor msg channel using singleton pattern and write better unit te…
Aug 4, 2023
44e5e86
refactor the publisher and processor both of their msg channel and re…
Aug 5, 2023
c2a9301
add email error for sandbox processor test, trying to imporove the co…
Aug 9, 2023
22d84e7
improve test code coverage by adding test function in sandbox process…
Aug 9, 2023
85bbe17
make sandbox_processor and sandbox_publisher to 100% code coverage
Aug 9, 2023
19aef7f
Merge branch 'flyteorg:master' into first-version-email-publihser
Future-Outlier Aug 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@
}
emailer = GetEmailer(config, scope)
return implementations.NewGcpProcessor(sub, emailer, scope)
case common.Sandbox:
emailer = GetEmailer(config, scope)
return implementations.NewSandboxProcessor(emailer)

Check warning on line 125 in pkg/async/notifications/factory.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/factory.go#L123-L125

Added lines #L123 - L125 were not covered by tests
case common.Local:
fallthrough
default:
Expand Down Expand Up @@ -171,6 +174,8 @@
panic(err)
}
return implementations.NewPublisher(publisher, scope)
case common.Sandbox:
return implementations.NewSandboxPublisher()

Check warning on line 178 in pkg/async/notifications/factory.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/factory.go#L177-L178

Added lines #L177 - L178 were not covered by tests
case common.Local:
fallthrough
default:
Expand Down
60 changes: 60 additions & 0 deletions pkg/async/notifications/implementations/sandbox_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package implementations

import (
"context"
"time"

"github.com/flyteorg/flyteadmin/pkg/async"
"github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
)

type SandboxProcessor struct {
email interfaces.Emailer
}

func (p *SandboxProcessor) StartProcessing() {
for {
logger.Warningf(context.Background(), "Starting SandBox notifications processor")
err := p.run()
logger.Errorf(context.Background(), "error with running processor err: [%v] ", err)
time.Sleep(async.RetryDelay)
}

Check warning on line 24 in pkg/async/notifications/implementations/sandbox_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_processor.go#L18-L24

Added lines #L18 - L24 were not covered by tests
}

func (p *SandboxProcessor) run() error {
var emailMessage admin.EmailMessage

for {
select {
case msg := <-msgChan:
err := proto.Unmarshal(msg, &emailMessage)
if err != nil {
logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err)
return err
}

Check warning on line 37 in pkg/async/notifications/implementations/sandbox_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_processor.go#L32-L37

Added lines #L32 - L37 were not covered by tests

err = p.email.SendEmail(context.Background(), emailMessage)
if err != nil {
logger.Errorf(context.Background(), "error with sendemail message [%v] ", err)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
return err
}

Check warning on line 43 in pkg/async/notifications/implementations/sandbox_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_processor.go#L39-L43

Added lines #L39 - L43 were not covered by tests
default:
logger.Debugf(context.Background(), "no message to process")
return nil
}
}
}

func (p *SandboxProcessor) StopProcessing() error {
logger.Debug(context.Background(), "call to sandbox stop processing.")
return nil

Check warning on line 53 in pkg/async/notifications/implementations/sandbox_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_processor.go#L51-L53

Added lines #L51 - L53 were not covered by tests
}

func NewSandboxProcessor(emailer interfaces.Emailer) interfaces.Processor {
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
return &SandboxProcessor{
email: emailer,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package implementations

import (
"context"
"testing"

"github.com/flyteorg/flyteadmin/pkg/async/notifications/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
)

var mockSandboxEmailer mocks.MockEmailer

func TestSandboxProcessor_UnmarshalMessage(t *testing.T) {
var emailMessage admin.EmailMessage
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
err := proto.Unmarshal(msg, &emailMessage)
assert.Nil(t, err)

assert.Equal(t, emailMessage.Body, testEmail.Body)
assert.Equal(t, emailMessage.RecipientsEmail, testEmail.RecipientsEmail)
assert.Equal(t, emailMessage.SubjectLine, testEmail.SubjectLine)
assert.Equal(t, emailMessage.SenderEmail, testEmail.SenderEmail)
}

func TestSandboxProcessor_StartProcessing(t *testing.T) {

testSandboxProcessor := NewSandboxProcessor(&mockSandboxEmailer)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error {
assert.Equal(t, testEmail.Body, email.Body)
assert.Equal(t, testEmail.RecipientsEmail, email.RecipientsEmail)
assert.Equal(t, testEmail.SubjectLine, email.SubjectLine)
assert.Equal(t, testEmail.SenderEmail, email.SenderEmail)
return nil
}
mockSandboxEmailer.SetSendEmailFunc(sendEmailValidationFunc)
assert.Nil(t, testSandboxProcessor.(*SandboxProcessor).run())
}
31 changes: 31 additions & 0 deletions pkg/async/notifications/implementations/sandbox_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package implementations

import (
"context"

"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
)

type SandboxPublisher struct{}

var msgChan = make(chan []byte)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
logger.Debugf(ctx, "Publishing the following message [%s]", msg.String())

data, err := proto.Marshal(msg)

if err != nil {
logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err)
return err
}

Check warning on line 22 in pkg/async/notifications/implementations/sandbox_publisher.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_publisher.go#L20-L22

Added lines #L20 - L22 were not covered by tests

msgChan <- data

return nil
}

func NewSandboxPublisher() *SandboxPublisher {
return &SandboxPublisher{}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package implementations

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSandboxPublisher_Publish(t *testing.T) {
publisher := NewSandboxPublisher()
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

errChan := make(chan string)

go func() {
select {
case <-msgChan:
// if message received, no need to send an error
case <-time.After(time.Second * 5):
errChan <- "No data was received in the channel within the expected time frame"
}
}()

err := publisher.Publish(context.Background(), "NOTIFICATION_TYPE", &testEmail)

// Check if there was an error in the goroutine
select {
case errMsg := <-errChan:
t.Fatal(errMsg)
default:
// no error from the goroutine
}

assert.Nil(t, err)
}
9 changes: 5 additions & 4 deletions pkg/common/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ package common
type CloudProvider = string

const (
AWS CloudProvider = "aws"
GCP CloudProvider = "gcp"
Local CloudProvider = "local"
None CloudProvider = "none"
AWS CloudProvider = "aws"
GCP CloudProvider = "gcp"
Sandbox CloudProvider = "sandbox"
Local CloudProvider = "local"
None CloudProvider = "none"
)
Loading