diff --git a/cmd/monitor/main.go b/cmd/monitor/main.go index 343645a..4ee5134 100644 --- a/cmd/monitor/main.go +++ b/cmd/monitor/main.go @@ -40,22 +40,14 @@ func main() { log.Debug(line) } - filesClient, err := common.NewFilesComClient(cfg.FilesCom.Key, cfg.FilesCom.Endpoint) - if err != nil { - panic(err) - } - - sfClient, err := common.NewSalesforceClient(cfg) - if err != nil { - panic(err) - } - natsClient, err := nats.NewNats("test-cluster", stan.NatsURL(*natsUrl)) if err != nil { panic(err) } - m, err := monitor.NewMonitor(filesClient, sfClient, natsClient, cfg, nil) + salesforceClientFactory := &common.BaseSalesforceClientFactory{} + filesComClientFactory := &common.BaseFilesComClientFactory{} + m, err := monitor.NewMonitor(natsClient, cfg, nil, salesforceClientFactory, filesComClientFactory) if err != nil { panic(err) } diff --git a/docker-compose.yml b/docker-compose.yml index ba7c5ae..8a1f1e6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,7 @@ services: container_name: athena-monitor image: athena/athena-monitor:${BRANCH:-main} volumes: - - ./creds.yaml:/etc/athena/main.yaml + - ./creds-athena.yaml:/etc/athena/main.yaml - ./athena-monitor.yaml:/etc/athena/monitor.yaml - ./athena-monitor-directories.yaml:/etc/athena/monitor-directories.yaml - ./tmp:/tmp/athena @@ -39,7 +39,7 @@ services: container_name: athena-processor image: athena/athena-processor:${BRANCH:-main} volumes: - - ./creds.yaml:/etc/athena/main.yaml + - ./creds-athena.yaml:/etc/athena/main.yaml - ./athena-processor.yaml:/etc/athena/processor.yaml - ./athena-processor-upload.yaml:/etc/athena/processor-upload.yaml - ./tmp:/tmp/athena diff --git a/pkg/common/files-com.go b/pkg/common/files-com.go index e202779..84fc893 100644 --- a/pkg/common/files-com.go +++ b/pkg/common/files-com.go @@ -22,11 +22,21 @@ type FilesComClient interface { Upload(contents, destinationPath string) (*filessdk.File, error) } +type FilesComClientFactory interface { + NewFilesComClient(apiKey, endpoint string) (FilesComClient, error) +} + type BaseFilesComClient struct { FilesComClient ApiClient file.Client } +type BaseFilesComClientFactory struct{} + +func (client *BaseFilesComClientFactory) NewFilesComClient(apiKey, endpoint string) (FilesComClient, error) { + return NewFilesComClient(apiKey, endpoint) +} + func (client *BaseFilesComClient) Upload(contents, destinationPath string) (*filessdk.File, error) { log.Infof("Uploading to '%s'", destinationPath) tmpfile, err := os.CreateTemp("", "upload") diff --git a/pkg/common/salesforce.go b/pkg/common/salesforce.go index 668d25c..0434933 100644 --- a/pkg/common/salesforce.go +++ b/pkg/common/salesforce.go @@ -30,10 +30,16 @@ type SalesforceClient interface { SObject(objectName ...string) *simpleforce.SObject } +type SalesforceClientFactory interface { + NewSalesforceClient(config *config.Config) (SalesforceClient, error) +} + type BaseSalesforceClient struct { *simpleforce.Client } +type BaseSalesforceClientFactory struct{} + func NewSalesforceClient(config *config.Config) (SalesforceClient, error) { log.Infof("Creating new Salesforce client") client := simpleforce.NewClient(config.Salesforce.Endpoint, simpleforce.DefaultClientID, simpleforce.DefaultAPIVersion) @@ -43,6 +49,10 @@ func NewSalesforceClient(config *config.Config) (SalesforceClient, error) { return &BaseSalesforceClient{client}, nil } +func (sf *BaseSalesforceClientFactory) NewSalesforceClient(config *config.Config) (SalesforceClient, error) { + return NewSalesforceClient(config) +} + type Case struct { Id, CaseNumber, AccountId, Customer string } diff --git a/pkg/common/test/client.go b/pkg/common/test/client.go index 5133953..a0a4b4e 100644 --- a/pkg/common/test/client.go +++ b/pkg/common/test/client.go @@ -1,10 +1,12 @@ package test import ( + "time" + files_sdk "github.com/Files-com/files-sdk-go" "github.com/canonical/athena-core/pkg/common" "github.com/canonical/athena-core/pkg/common/db" - "time" + "github.com/canonical/athena-core/pkg/config" ) type SalesforceClient struct { @@ -15,10 +17,22 @@ func (sf *SalesforceClient) GetCaseByNumber(number string) (*common.Case, error) return nil, nil } +type SalesforceClientFactory struct{} + +func (sf *SalesforceClientFactory) NewSalesforceClient(config *config.Config) (common.SalesforceClient, error) { + return &SalesforceClient{}, nil +} + type FilesComClient struct { common.BaseFilesComClient } +type FilesComClientFactory struct{} + +func (fc *FilesComClientFactory) NewFilesComClient(apiKey, endpoint string) (common.FilesComClient, error) { + return &FilesComClient{}, nil +} + var files = []db.File{ {Path: "/uploads/sosreport-testing-1.tar.xz"}, {Path: "/uploads/sosreport-testing-2.tar.xz"}, diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 5c82cbc..25e3793 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -18,12 +18,12 @@ import ( ) type Monitor struct { - Db *gorm.DB // Database connection - Config *config.Config // Configuration instance - FilesClient common.FilesComClient // Files.com client - SalesforceClient common.SalesforceClient // SalesForce client - Provider pubsub.Provider // Messaging provider - mu *sync.Mutex // A mutex + Config *config.Config // Configuration instance + Db *gorm.DB // Database connection + FilesComClientFactory common.FilesComClientFactory // How to create a new Files.com client + mu *sync.Mutex // A mutex + Provider pubsub.Provider // Messaging provider + SalesforceClientFactory common.SalesforceClientFactory // How to create a new Salesforce client } func (m *Monitor) GetMatchingProcessors(filename string, c *common.Case) ([]string, error) { @@ -50,14 +50,18 @@ func (m *Monitor) GetMatchingProcessors(filename string, c *common.Case) ([]stri } } if len(processors) <= 0 { - return nil, fmt.Errorf("No processor found for file=%s", filename) + return nil, fmt.Errorf("no processor found for file=%s", filename) } return processors, nil } func (m *Monitor) GetLatestFiles(dirs []string, duration time.Duration) ([]db.File, error) { log.Debugf("Getting files in %v", dirs) - files, err := m.FilesClient.GetFiles(dirs) + filesClient, err := m.FilesComClientFactory.NewFilesComClient(m.Config.FilesCom.Key, m.Config.FilesCom.Endpoint) + if err != nil { + panic(err) + } + files, err := filesClient.GetFiles(dirs) if err != nil { return nil, err } @@ -74,25 +78,19 @@ func (m *Monitor) GetMatchingProcessorByFile(files []db.File) (map[string][]db.F var sfCase = &common.Case{} var results = make(map[string][]db.File) + salesforceClient, err := m.SalesforceClientFactory.NewSalesforceClient(m.Config) + if err != nil { + panic(err) + } + for _, file := range files { var processors []string caseNumber, err := common.GetCaseNumberFromFilename(file.Path) if err == nil { - sfCase, err = m.SalesforceClient.GetCaseByNumber(caseNumber) + sfCase, err = salesforceClient.GetCaseByNumber(caseNumber) if err != nil { - // The SalesForce connection possibly died on us. Let's try to - // revive it and then try again. - log.Warn("Creating new SF client since current one is failing") - m.SalesforceClient, err = common.NewSalesforceClient(m.Config) - if err != nil { - log.Errorf("Failed to reconnect to salesforce: %s", err) - panic(err) - } - sfCase, err = m.SalesforceClient.GetCaseByNumber(caseNumber) - if err != nil { - log.Error(err) - } + log.Warningf("Failed to get a case from number: '%s'", caseNumber) } } else { log.Warningf("Failed to identify case from filename '%s': %s", file.Path, err) @@ -117,8 +115,9 @@ func (m *Monitor) GetMatchingProcessorByFile(files []db.File) (map[string][]db.F return results, nil } -func NewMonitor(filesClient common.FilesComClient, salesforceClient common.SalesforceClient, provider pubsub.Provider, - cfg *config.Config, dbConn *gorm.DB) (*Monitor, error) { +func NewMonitor(provider pubsub.Provider, cfg *config.Config, dbConn *gorm.DB, + salesforceClientFactory common.SalesforceClientFactory, + filesComClientFactory common.FilesComClientFactory) (*Monitor, error) { var err error if dbConn == nil { dbConn, err = db.GetDBConn(cfg) @@ -128,12 +127,13 @@ func NewMonitor(filesClient common.FilesComClient, salesforceClient common.Sales } return &Monitor{ - Provider: provider, - Db: dbConn, - FilesClient: filesClient, - SalesforceClient: salesforceClient, - Config: cfg, - mu: new(sync.Mutex)}, nil + Config: cfg, + Db: dbConn, + FilesComClientFactory: filesComClientFactory, + mu: new(sync.Mutex), + Provider: provider, + SalesforceClientFactory: salesforceClientFactory, + }, nil } func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) { @@ -155,6 +155,11 @@ func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) { return } + filesClient, err := m.FilesComClientFactory.NewFilesComClient(m.Config.FilesCom.Key, m.Config.FilesCom.Endpoint) + if err != nil { + panic(err) + } + log.Infof("Found %d new files, %d to be processed", len(latestFiles), len(processors)) for processor, files := range processors { for _, file := range files { @@ -175,7 +180,7 @@ func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) { } } log.Debugf("Using temporary base path: %s", basePath) - fileEntry, err := m.FilesClient.Download(&file, basePath) + fileEntry, err := filesClient.Download(&file, basePath) if err != nil { log.Errorf("Failed to download %s: %s - skipping", file.Path, err) continue diff --git a/pkg/monitor/monitor_test.go b/pkg/monitor/monitor_test.go index a75173c..78b8494 100644 --- a/pkg/monitor/monitor_test.go +++ b/pkg/monitor/monitor_test.go @@ -35,7 +35,7 @@ func (s *MonitorTestSuite) SetupTest() { func (s *MonitorTestSuite) TestRunMonitor() { provider := &memory.MemoryProvider{} - monitor, err := NewMonitor(&test.FilesComClient{}, &test.SalesforceClient{}, provider, s.config, s.db) + monitor, err := NewMonitor(provider, s.config, s.db, &test.SalesforceClientFactory{}, &test.FilesComClientFactory{}) assert.Nil(s.T(), err) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() diff --git a/pkg/processor/processor.go b/pkg/processor/processor.go index 50d2649..e23bb4f 100644 --- a/pkg/processor/processor.go +++ b/pkg/processor/processor.go @@ -23,22 +23,22 @@ import ( ) type Processor struct { - Db *gorm.DB Config *config.Config + Db *gorm.DB FilesClient common.FilesComClient - SalesforceClient common.SalesforceClient - Provider pubsub.Provider Hostname string + Provider pubsub.Provider + SalesforceClient common.SalesforceClient } type BaseSubscriber struct { + Config *config.Config Db *gorm.DB + FilesComClient common.FilesComClient + Name string Options pubsub.HandlerOptions Reports map[string]config.Report SalesforceClient common.SalesforceClient - FilesComClient common.FilesComClient - Config *config.Config - Name string } func (s *BaseSubscriber) Setup(c *pubsub.Client) { @@ -46,20 +46,20 @@ func (s *BaseSubscriber) Setup(c *pubsub.Client) { } type ReportToExecute struct { - Name, BaseDir, Subscriber, FileName string File *db.File + Name, BaseDir, Subscriber, FileName string + Output []byte Scripts map[string]string Timeout time.Duration - Output []byte } type ReportRunner struct { Config *config.Config - Reports []ReportToExecute - SalesforceClient common.SalesforceClient + Db *gorm.DB FilescomClient common.FilesComClient Name, Subscriber, Basedir string - Db *gorm.DB + Reports []ReportToExecute + SalesforceClient common.SalesforceClient } func RunWithTimeout(baseDir string, timeout time.Duration, command string) ([]byte, error) { @@ -391,12 +391,13 @@ func NewProcessor(filesClient common.FilesComClient, salesforceClient common.Sal } return &Processor{ + Config: cfg, + Db: dbConn, + FilesClient: filesClient, Hostname: hostname, Provider: provider, - FilesClient: filesClient, SalesforceClient: salesforceClient, - Db: dbConn, - Config: cfg}, nil + }, nil } func (p *Processor) getReportsByTopic(topic string) map[string]config.Report {