Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Implement Sandbox notifications processor and publisher #595

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8061ef2
1st version of email publisher
Jul 23, 2023
af73e9a
fix input
Jul 23, 2023
870f382
email feature
Jul 25, 2023
89ccfd2
Update boilerplate version (#589)
flyte-bot Jul 14, 2023
66fb45d
Update boilerplate version (#594)
flyte-bot Jul 26, 2023
b8706bb
sandbox email publisher and processor with test
Jul 27, 2023
c4a0fcf
delete unnecessary comments
Jul 27, 2023
d821aaf
Merge branch 'flyteorg:master' into first-version-email-publihser
Future-Outlier Jul 27, 2023
dc16bb2
update go.sum
Jul 28, 2023
aad4081
Merge branch 'first-version-email-publihser' of https://github.com/Fu…
Jul 28, 2023
8e4c50a
add common.Sandbox type
Jul 29, 2023
e5c12bf
remove emailer setting in factory.go and add mock emailer test in sa…
Jul 30, 2023
be74e6e
update file go.mod and go.sum
Jul 30, 2023
45b050e
Add test for NewNotificationsPublisher and NewNotificationsProcessor …
Aug 1, 2023
742c010
complete test for StartProcessing, Publish and StopProcessing in Test…
Aug 1, 2023
c05a78c
refactor msg channel using singleton pattern and write better unit te…
Aug 4, 2023
44e5e86
refactor the publisher and processor both of their msg channel and re…
Aug 5, 2023
c2a9301
add email error for sandbox processor test, trying to imporove the co…
Aug 9, 2023
22d84e7
improve test code coverage by adding test function in sandbox process…
Aug 9, 2023
85bbe17
make sandbox_processor and sandbox_publisher to 100% code coverage
Aug 9, 2023
19aef7f
Merge branch 'flyteorg:master' into first-version-email-publihser
Future-Outlier Aug 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pkg/async/notifications/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package notifications
import (
"context"
"fmt"
"sync"
"time"

"github.com/flyteorg/flyteadmin/pkg/async"
Expand All @@ -27,6 +28,9 @@ const maxRetries = 3

var enable64decoding = false

var msgChan chan []byte
var once sync.Once

type PublisherConfig struct {
TopicName string
}
Expand All @@ -41,6 +45,13 @@ type EmailerConfig struct {
BaseURL string
}

// For sandbox only
func CreateMsgChan() {
once.Do(func() {
msgChan = make(chan []byte)
})
}

func GetEmailer(config runtimeInterfaces.NotificationsConfig, scope promutils.Scope) interfaces.Emailer {
// If an external email service is specified use that instead.
// TODO: Handling of this is messy, see https://github.com/flyteorg/flyte/issues/1063
Expand Down Expand Up @@ -120,6 +131,9 @@ func NewNotificationsProcessor(config runtimeInterfaces.NotificationsConfig, sco
}
emailer = GetEmailer(config, scope)
return implementations.NewGcpProcessor(sub, emailer, scope)
case common.Sandbox:
emailer = GetEmailer(config, scope)
return implementations.NewSandboxProcessor(msgChan, emailer)
case common.Local:
fallthrough
default:
Expand Down Expand Up @@ -171,6 +185,9 @@ func NewNotificationsPublisher(config runtimeInterfaces.NotificationsConfig, sco
panic(err)
}
return implementations.NewPublisher(publisher, scope)
case common.Sandbox:
CreateMsgChan()
return implementations.NewSandboxPublisher(msgChan)
case common.Local:
fallthrough
default:
Expand Down
34 changes: 34 additions & 0 deletions pkg/async/notifications/factory_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
package notifications

import (
"context"
"testing"

"github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/stretchr/testify/assert"
)

var (
scope = promutils.NewScope("test_sandbox_processor")
notificationsConfig = runtimeInterfaces.NotificationsConfig{
Type: "sandbox",
}
testEmail = admin.EmailMessage{
RecipientsEmail: []string{
"[email protected]",
"[email protected]",
},
SenderEmail: "[email protected]",
SubjectLine: "Test email",
Body: "This is a sample email.",
}
)

func TestGetEmailer(t *testing.T) {
defer func() { r := recover(); assert.NotNil(t, r) }()
cfg := runtimeInterfaces.NotificationsConfig{
Expand All @@ -23,3 +42,18 @@ func TestGetEmailer(t *testing.T) {
// shouldn't reach here
t.Errorf("did not panic")
}

func TestNewNotificationPublisherAndProcessor(t *testing.T) {
testSandboxPublisher := NewNotificationsPublisher(notificationsConfig, scope)
assert.IsType(t, testSandboxPublisher, &implementations.SandboxPublisher{})
testSandboxProcessor := NewNotificationsProcessor(notificationsConfig, scope)
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
assert.IsType(t, testSandboxProcessor, &implementations.SandboxProcessor{})

go func() {
testSandboxProcessor.StartProcessing()
}()

assert.Nil(t, testSandboxPublisher.Publish(context.Background(), "TEST_NOTIFICATION", &testEmail))

assert.Nil(t, testSandboxProcessor.StopProcessing())
}
62 changes: 62 additions & 0 deletions pkg/async/notifications/implementations/sandbox_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package implementations

import (
"context"
"time"

"github.com/flyteorg/flyteadmin/pkg/async"
"github.com/flyteorg/flyteadmin/pkg/async/notifications/interfaces"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
)

type SandboxProcessor struct {
email interfaces.Emailer
subChan <-chan []byte
}

func (p *SandboxProcessor) StartProcessing() {
for {
logger.Warningf(context.Background(), "Starting SandBox notifications processor")
err := p.run()
logger.Errorf(context.Background(), "error with running processor err: [%v] ", err)
time.Sleep(async.RetryDelay)
}

Check warning on line 25 in pkg/async/notifications/implementations/sandbox_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_processor.go#L19-L25

Added lines #L19 - L25 were not covered by tests
}

func (p *SandboxProcessor) run() error {
var emailMessage admin.EmailMessage

for {
select {
case msg := <-p.subChan:
err := proto.Unmarshal(msg, &emailMessage)
if err != nil {
logger.Errorf(context.Background(), "error with unmarshalling message [%v]", err)
return err
}

Check warning on line 38 in pkg/async/notifications/implementations/sandbox_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_processor.go#L33-L38

Added lines #L33 - L38 were not covered by tests

err = p.email.SendEmail(context.Background(), emailMessage)
if err != nil {
logger.Errorf(context.Background(), "Error sending an email message for message [%s] with emailM with err: %v", emailMessage.String(), err)
return err
}

Check warning on line 44 in pkg/async/notifications/implementations/sandbox_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_processor.go#L40-L44

Added lines #L40 - L44 were not covered by tests
default:
logger.Debugf(context.Background(), "no message to process")
return nil
}
}
}

func (p *SandboxProcessor) StopProcessing() error {
logger.Debug(context.Background(), "call to sandbox stop processing.")
return nil

Check warning on line 54 in pkg/async/notifications/implementations/sandbox_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_processor.go#L52-L54

Added lines #L52 - L54 were not covered by tests
}

func NewSandboxProcessor(subChan <-chan []byte, emailer interfaces.Emailer) interfaces.Processor {
return &SandboxProcessor{
subChan: subChan,
email: emailer,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package implementations

import (
"context"
"testing"

"github.com/flyteorg/flyteadmin/pkg/async/notifications/mocks"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/stretchr/testify/assert"
)

var mockSandboxEmailer mocks.MockEmailer

func TestSandboxProcessor_StartProcessing(t *testing.T) {
msgChan := make(chan []byte, 1)
testSandboxProcessor := NewSandboxProcessor(msgChan, &mockSandboxEmailer)

sendEmailValidationFunc := func(ctx context.Context, email admin.EmailMessage) error {
assert.Equal(t, testEmail.Body, email.Body)
assert.Equal(t, testEmail.RecipientsEmail, email.RecipientsEmail)
assert.Equal(t, testEmail.SubjectLine, email.SubjectLine)
assert.Equal(t, testEmail.SenderEmail, email.SenderEmail)
return nil
}
mockSandboxEmailer.SetSendEmailFunc(sendEmailValidationFunc)
assert.Nil(t, testSandboxProcessor.(*SandboxProcessor).run())
}
33 changes: 33 additions & 0 deletions pkg/async/notifications/implementations/sandbox_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package implementations

import (
"context"

"github.com/flyteorg/flytestdlib/logger"
"github.com/golang/protobuf/proto"
)

type SandboxPublisher struct {
pubChan chan<- []byte
}

func (p *SandboxPublisher) Publish(ctx context.Context, notificationType string, msg proto.Message) error {
logger.Debugf(ctx, "Publishing the following message [%s]", msg.String())

data, err := proto.Marshal(msg)

if err != nil {
logger.Errorf(ctx, "Failed to publish a message with key [%s] and message [%s] and error: %v", notificationType, msg.String(), err)
return err
}

Check warning on line 22 in pkg/async/notifications/implementations/sandbox_publisher.go

View check run for this annotation

Codecov / codecov/patch

pkg/async/notifications/implementations/sandbox_publisher.go#L20-L22

Added lines #L20 - L22 were not covered by tests

p.pubChan <- data

return nil
}

func NewSandboxPublisher(pubChan chan<- []byte) *SandboxPublisher {
return &SandboxPublisher{
pubChan: pubChan,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package implementations

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSandboxPublisher_Publish(t *testing.T) {
msgChan := make(chan []byte, 1)
publisher := NewSandboxPublisher(msgChan)

err := publisher.Publish(context.Background(), "NOTIFICATION_TYPE", &testEmail)

assert.NotZero(t, len(msgChan))
assert.Nil(t, err)
}
9 changes: 5 additions & 4 deletions pkg/common/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ package common
type CloudProvider = string

const (
AWS CloudProvider = "aws"
GCP CloudProvider = "gcp"
Local CloudProvider = "local"
None CloudProvider = "none"
AWS CloudProvider = "aws"
GCP CloudProvider = "gcp"
Sandbox CloudProvider = "sandbox"
Local CloudProvider = "local"
None CloudProvider = "none"
)
Loading