Skip to content

Commit

Permalink
Do not re-use connections
Browse files Browse the repository at this point in the history
In order to avoid stale connections to Salesforce and files.com, do not
re-use those connections (the clients) and rather create new client
connections when needed.

Signed-off-by: Nicolas Bock <[email protected]>
  • Loading branch information
nicolasbock committed Sep 11, 2024
1 parent 8d3f247 commit e5987d5
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 45 deletions.
14 changes: 3 additions & 11 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,14 @@ func main() {
log.Debug(line)
}

filesClient, err := common.NewFilesComClient(cfg.FilesCom.Key, cfg.FilesCom.Endpoint)
if err != nil {
panic(err)
}

sfClient, err := common.NewSalesforceClient(cfg)
if err != nil {
panic(err)
}

natsClient, err := nats.NewNats("test-cluster", stan.NatsURL(*natsUrl))
if err != nil {
panic(err)
}

m, err := monitor.NewMonitor(filesClient, sfClient, natsClient, cfg, nil)
salesforceClientFactory := &common.BaseSalesforceClientFactory{}
filesComClientFactory := &common.BaseFilesComClientFactory{}
m, err := monitor.NewMonitor(natsClient, cfg, nil, salesforceClientFactory, filesComClientFactory)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ services:
container_name: athena-monitor
image: ghcr.io/canonical/athena-core/athena-monitor:${BRANCH:-main}
volumes:
- ./creds.yaml:/etc/athena/main.yaml
- ./creds-athena.yaml:/etc/athena/main.yaml
- ./athena-monitor.yaml:/etc/athena/monitor.yaml
- ./athena-monitor-directories.yaml:/etc/athena/monitor-directories.yaml
- ./tmp:/tmp/athena
Expand All @@ -39,7 +39,7 @@ services:
container_name: athena-processor
image: ghcr.io/canonical/athena-core/athena-processor:${BRANCH:-main}
volumes:
- ./creds.yaml:/etc/athena/main.yaml
- ./creds-athena.yaml:/etc/athena/main.yaml
- ./athena-processor.yaml:/etc/athena/processor.yaml
- ./athena-processor-upload.yaml:/etc/athena/processor-upload.yaml
- ./tmp:/tmp/athena
Expand Down
10 changes: 10 additions & 0 deletions pkg/common/files-com.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@ type FilesComClient interface {
Upload(contents, destinationPath string) (*filessdk.File, error)
}

type FilesComClientFactory interface {
NewFilesComClient(apiKey, endpoint string) (FilesComClient, error)
}

type BaseFilesComClient struct {
FilesComClient
ApiClient file.Client
}

type BaseFilesComClientFactory struct{}

func (client *BaseFilesComClientFactory) NewFilesComClient(apiKey, endpoint string) (FilesComClient, error) {
return NewFilesComClient(apiKey, endpoint)
}

func (client *BaseFilesComClient) Upload(contents, destinationPath string) (*filessdk.File, error) {
log.Infof("Uploading to '%s'", destinationPath)
tmpfile, err := os.CreateTemp("", "upload")
Expand Down
10 changes: 10 additions & 0 deletions pkg/common/salesforce.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,16 @@ type SalesforceClient interface {
SObject(objectName ...string) *simpleforce.SObject
}

type SalesforceClientFactory interface {
NewSalesforceClient(config *config.Config) (SalesforceClient, error)
}

type BaseSalesforceClient struct {
*simpleforce.Client
}

type BaseSalesforceClientFactory struct{}

func NewSalesforceClient(config *config.Config) (SalesforceClient, error) {
log.Infof("Creating new Salesforce client")
client := simpleforce.NewClient(config.Salesforce.Endpoint, simpleforce.DefaultClientID, simpleforce.DefaultAPIVersion)
Expand All @@ -43,6 +49,10 @@ func NewSalesforceClient(config *config.Config) (SalesforceClient, error) {
return &BaseSalesforceClient{client}, nil
}

func (sf *BaseSalesforceClientFactory) NewSalesforceClient(config *config.Config) (SalesforceClient, error) {
return NewSalesforceClient(config)
}

type Case struct {
Id, CaseNumber, AccountId, Customer string
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/common/test/client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package test

import (
"time"

files_sdk "github.com/Files-com/files-sdk-go"
"github.com/canonical/athena-core/pkg/common"
"github.com/canonical/athena-core/pkg/common/db"
"time"
"github.com/canonical/athena-core/pkg/config"
)

type SalesforceClient struct {
Expand All @@ -15,10 +17,22 @@ func (sf *SalesforceClient) GetCaseByNumber(number string) (*common.Case, error)
return nil, nil
}

type SalesforceClientFactory struct{}

func (sf *SalesforceClientFactory) NewSalesforceClient(config *config.Config) (common.SalesforceClient, error) {
return &SalesforceClient{}, nil
}

type FilesComClient struct {
common.BaseFilesComClient
}

type FilesComClientFactory struct{}

func (fc *FilesComClientFactory) NewFilesComClient(apiKey, endpoint string) (common.FilesComClient, error) {
return &FilesComClient{}, nil
}

var files = []db.File{
{Path: "/uploads/sosreport-testing-1.tar.xz"},
{Path: "/uploads/sosreport-testing-2.tar.xz"},
Expand Down
65 changes: 35 additions & 30 deletions pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
)

type Monitor struct {
Db *gorm.DB // Database connection
Config *config.Config // Configuration instance
FilesClient common.FilesComClient // Files.com client
SalesforceClient common.SalesforceClient // SalesForce client
Provider pubsub.Provider // Messaging provider
mu *sync.Mutex // A mutex
Config *config.Config // Configuration instance
Db *gorm.DB // Database connection
FilesComClientFactory common.FilesComClientFactory // How to create a new Files.com client
mu *sync.Mutex // A mutex
Provider pubsub.Provider // Messaging provider
SalesforceClientFactory common.SalesforceClientFactory // How to create a new Salesforce client
}

func (m *Monitor) GetMatchingProcessors(filename string, c *common.Case) ([]string, error) {
Expand All @@ -50,14 +50,18 @@ func (m *Monitor) GetMatchingProcessors(filename string, c *common.Case) ([]stri
}
}
if len(processors) <= 0 {
return nil, fmt.Errorf("No processor found for file=%s", filename)
return nil, fmt.Errorf("no processor found for file=%s", filename)
}
return processors, nil
}

func (m *Monitor) GetLatestFiles(dirs []string, duration time.Duration) ([]db.File, error) {
log.Debugf("Getting files in %v", dirs)
files, err := m.FilesClient.GetFiles(dirs)
filesClient, err := m.FilesComClientFactory.NewFilesComClient(m.Config.FilesCom.Key, m.Config.FilesCom.Endpoint)
if err != nil {
panic(err)
}
files, err := filesClient.GetFiles(dirs)
if err != nil {
return nil, err
}
Expand All @@ -74,25 +78,19 @@ func (m *Monitor) GetMatchingProcessorByFile(files []db.File) (map[string][]db.F
var sfCase = &common.Case{}
var results = make(map[string][]db.File)

salesforceClient, err := m.SalesforceClientFactory.NewSalesforceClient(m.Config)
if err != nil {
panic(err)
}

for _, file := range files {
var processors []string

caseNumber, err := common.GetCaseNumberFromFilename(file.Path)
if err == nil {
sfCase, err = m.SalesforceClient.GetCaseByNumber(caseNumber)
sfCase, err = salesforceClient.GetCaseByNumber(caseNumber)
if err != nil {
// The SalesForce connection possibly died on us. Let's try to
// revive it and then try again.
log.Warn("Creating new SF client since current one is failing")
m.SalesforceClient, err = common.NewSalesforceClient(m.Config)
if err != nil {
log.Errorf("Failed to reconnect to salesforce: %s", err)
panic(err)
}
sfCase, err = m.SalesforceClient.GetCaseByNumber(caseNumber)
if err != nil {
log.Error(err)
}
log.Warningf("Failed to get a case from number: '%s'", caseNumber)
}
} else {
log.Warningf("Failed to identify case from filename '%s': %s", file.Path, err)
Expand All @@ -117,8 +115,9 @@ func (m *Monitor) GetMatchingProcessorByFile(files []db.File) (map[string][]db.F
return results, nil
}

func NewMonitor(filesClient common.FilesComClient, salesforceClient common.SalesforceClient, provider pubsub.Provider,
cfg *config.Config, dbConn *gorm.DB) (*Monitor, error) {
func NewMonitor(provider pubsub.Provider, cfg *config.Config, dbConn *gorm.DB,
salesforceClientFactory common.SalesforceClientFactory,
filesComClientFactory common.FilesComClientFactory) (*Monitor, error) {
var err error
if dbConn == nil {
dbConn, err = db.GetDBConn(cfg)
Expand All @@ -128,12 +127,13 @@ func NewMonitor(filesClient common.FilesComClient, salesforceClient common.Sales
}

return &Monitor{
Provider: provider,
Db: dbConn,
FilesClient: filesClient,
SalesforceClient: salesforceClient,
Config: cfg,
mu: new(sync.Mutex)}, nil
Config: cfg,
Db: dbConn,
FilesComClientFactory: filesComClientFactory,
mu: new(sync.Mutex),
Provider: provider,
SalesforceClientFactory: salesforceClientFactory,
}, nil
}

func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) {
Expand All @@ -155,6 +155,11 @@ func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) {
return
}

filesClient, err := m.FilesComClientFactory.NewFilesComClient(m.Config.FilesCom.Key, m.Config.FilesCom.Endpoint)
if err != nil {
panic(err)
}

log.Infof("Found %d new files, %d to be processed", len(latestFiles), len(processors))
for processor, files := range processors {
for _, file := range files {
Expand All @@ -175,7 +180,7 @@ func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) {
}
}
log.Debugf("Using temporary base path: %s", basePath)
fileEntry, err := m.FilesClient.Download(&file, basePath)
fileEntry, err := filesClient.Download(&file, basePath)
if err != nil {
log.Errorf("Failed to download %s: %s - skipping", file.Path, err)
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *MonitorTestSuite) SetupTest() {

func (s *MonitorTestSuite) TestRunMonitor() {
provider := &memory.MemoryProvider{}
monitor, err := NewMonitor(&test.FilesComClient{}, &test.SalesforceClient{}, provider, s.config, s.db)
monitor, err := NewMonitor(provider, s.config, s.db, &test.SalesforceClientFactory{}, &test.FilesComClientFactory{})
assert.Nil(s.T(), err)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down

0 comments on commit e5987d5

Please sign in to comment.