Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not re-use connections #189

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,14 @@ func main() {
log.Debug(line)
}

filesClient, err := common.NewFilesComClient(cfg.FilesCom.Key, cfg.FilesCom.Endpoint)
if err != nil {
panic(err)
}

sfClient, err := common.NewSalesforceClient(cfg)
if err != nil {
panic(err)
}

natsClient, err := nats.NewNats("test-cluster", stan.NatsURL(*natsUrl))
if err != nil {
panic(err)
}

m, err := monitor.NewMonitor(filesClient, sfClient, natsClient, cfg, nil)
salesforceClientFactory := &common.BaseSalesforceClientFactory{}
filesComClientFactory := &common.BaseFilesComClientFactory{}
m, err := monitor.NewMonitor(natsClient, cfg, nil, salesforceClientFactory, filesComClientFactory)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ services:
container_name: athena-monitor
image: ghcr.io/canonical/athena-core/athena-monitor:${BRANCH:-main}
volumes:
- ./creds.yaml:/etc/athena/main.yaml
- ./creds-athena.yaml:/etc/athena/main.yaml
- ./athena-monitor.yaml:/etc/athena/monitor.yaml
- ./athena-monitor-directories.yaml:/etc/athena/monitor-directories.yaml
- ./tmp:/tmp/athena
Expand All @@ -39,7 +39,7 @@ services:
container_name: athena-processor
image: ghcr.io/canonical/athena-core/athena-processor:${BRANCH:-main}
volumes:
- ./creds.yaml:/etc/athena/main.yaml
- ./creds-athena.yaml:/etc/athena/main.yaml
- ./athena-processor.yaml:/etc/athena/processor.yaml
- ./athena-processor-upload.yaml:/etc/athena/processor-upload.yaml
- ./tmp:/tmp/athena
Expand Down
10 changes: 10 additions & 0 deletions pkg/common/files-com.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,21 @@ type FilesComClient interface {
Upload(contents, destinationPath string) (*filessdk.File, error)
}

type FilesComClientFactory interface {
NewFilesComClient(apiKey, endpoint string) (FilesComClient, error)
}

type BaseFilesComClient struct {
FilesComClient
ApiClient file.Client
}

type BaseFilesComClientFactory struct{}

func (client *BaseFilesComClientFactory) NewFilesComClient(apiKey, endpoint string) (FilesComClient, error) {
return NewFilesComClient(apiKey, endpoint)
}

func (client *BaseFilesComClient) Upload(contents, destinationPath string) (*filessdk.File, error) {
log.Infof("Uploading to '%s'", destinationPath)
tmpfile, err := os.CreateTemp("", "upload")
Expand Down
10 changes: 10 additions & 0 deletions pkg/common/salesforce.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,16 @@ type SalesforceClient interface {
SObject(objectName ...string) *simpleforce.SObject
}

type SalesforceClientFactory interface {
NewSalesforceClient(config *config.Config) (SalesforceClient, error)
}

type BaseSalesforceClient struct {
*simpleforce.Client
}

type BaseSalesforceClientFactory struct{}

func NewSalesforceClient(config *config.Config) (SalesforceClient, error) {
log.Infof("Creating new Salesforce client")
client := simpleforce.NewClient(config.Salesforce.Endpoint, simpleforce.DefaultClientID, simpleforce.DefaultAPIVersion)
Expand All @@ -43,6 +49,10 @@ func NewSalesforceClient(config *config.Config) (SalesforceClient, error) {
return &BaseSalesforceClient{client}, nil
}

func (sf *BaseSalesforceClientFactory) NewSalesforceClient(config *config.Config) (SalesforceClient, error) {
return NewSalesforceClient(config)
}

type Case struct {
Id, CaseNumber, AccountId, Customer string
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/common/test/client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package test

import (
"time"

files_sdk "github.com/Files-com/files-sdk-go"
"github.com/canonical/athena-core/pkg/common"
"github.com/canonical/athena-core/pkg/common/db"
"time"
"github.com/canonical/athena-core/pkg/config"
)

type SalesforceClient struct {
Expand All @@ -15,10 +17,22 @@ func (sf *SalesforceClient) GetCaseByNumber(number string) (*common.Case, error)
return nil, nil
}

type SalesforceClientFactory struct{}

func (sf *SalesforceClientFactory) NewSalesforceClient(config *config.Config) (common.SalesforceClient, error) {
return &SalesforceClient{}, nil
}

type FilesComClient struct {
common.BaseFilesComClient
}

type FilesComClientFactory struct{}

func (fc *FilesComClientFactory) NewFilesComClient(apiKey, endpoint string) (common.FilesComClient, error) {
return &FilesComClient{}, nil
}

var files = []db.File{
{Path: "/uploads/sosreport-testing-1.tar.xz"},
{Path: "/uploads/sosreport-testing-2.tar.xz"},
Expand Down
65 changes: 35 additions & 30 deletions pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
)

type Monitor struct {
Db *gorm.DB // Database connection
Config *config.Config // Configuration instance
FilesClient common.FilesComClient // Files.com client
SalesforceClient common.SalesforceClient // SalesForce client
Provider pubsub.Provider // Messaging provider
mu *sync.Mutex // A mutex
Config *config.Config // Configuration instance
Db *gorm.DB // Database connection
FilesComClientFactory common.FilesComClientFactory // How to create a new Files.com client
mu *sync.Mutex // A mutex
Provider pubsub.Provider // Messaging provider
SalesforceClientFactory common.SalesforceClientFactory // How to create a new Salesforce client
}

func (m *Monitor) GetMatchingProcessors(filename string, c *common.Case) ([]string, error) {
Expand All @@ -50,14 +50,18 @@ func (m *Monitor) GetMatchingProcessors(filename string, c *common.Case) ([]stri
}
}
if len(processors) <= 0 {
return nil, fmt.Errorf("No processor found for file=%s", filename)
return nil, fmt.Errorf("no processor found for file=%s", filename)
}
return processors, nil
}

func (m *Monitor) GetLatestFiles(dirs []string, duration time.Duration) ([]db.File, error) {
log.Debugf("Getting files in %v", dirs)
files, err := m.FilesClient.GetFiles(dirs)
filesClient, err := m.FilesComClientFactory.NewFilesComClient(m.Config.FilesCom.Key, m.Config.FilesCom.Endpoint)
if err != nil {
panic(err)
}
files, err := filesClient.GetFiles(dirs)
if err != nil {
return nil, err
}
Expand All @@ -74,25 +78,19 @@ func (m *Monitor) GetMatchingProcessorByFile(files []db.File) (map[string][]db.F
var sfCase = &common.Case{}
var results = make(map[string][]db.File)

salesforceClient, err := m.SalesforceClientFactory.NewSalesforceClient(m.Config)
if err != nil {
panic(err)
}

for _, file := range files {
var processors []string

caseNumber, err := common.GetCaseNumberFromFilename(file.Path)
if err == nil {
sfCase, err = m.SalesforceClient.GetCaseByNumber(caseNumber)
sfCase, err = salesforceClient.GetCaseByNumber(caseNumber)
if err != nil {
// The SalesForce connection possibly died on us. Let's try to
// revive it and then try again.
log.Warn("Creating new SF client since current one is failing")
m.SalesforceClient, err = common.NewSalesforceClient(m.Config)
if err != nil {
log.Errorf("Failed to reconnect to salesforce: %s", err)
panic(err)
}
sfCase, err = m.SalesforceClient.GetCaseByNumber(caseNumber)
if err != nil {
log.Error(err)
}
log.Warningf("Failed to get a case from number: '%s'", caseNumber)
}
} else {
log.Warningf("Failed to identify case from filename '%s': %s", file.Path, err)
Expand All @@ -117,8 +115,9 @@ func (m *Monitor) GetMatchingProcessorByFile(files []db.File) (map[string][]db.F
return results, nil
}

func NewMonitor(filesClient common.FilesComClient, salesforceClient common.SalesforceClient, provider pubsub.Provider,
cfg *config.Config, dbConn *gorm.DB) (*Monitor, error) {
func NewMonitor(provider pubsub.Provider, cfg *config.Config, dbConn *gorm.DB,
salesforceClientFactory common.SalesforceClientFactory,
filesComClientFactory common.FilesComClientFactory) (*Monitor, error) {
var err error
if dbConn == nil {
dbConn, err = db.GetDBConn(cfg)
Expand All @@ -128,12 +127,13 @@ func NewMonitor(filesClient common.FilesComClient, salesforceClient common.Sales
}

return &Monitor{
Provider: provider,
Db: dbConn,
FilesClient: filesClient,
SalesforceClient: salesforceClient,
Config: cfg,
mu: new(sync.Mutex)}, nil
Config: cfg,
Db: dbConn,
FilesComClientFactory: filesComClientFactory,
mu: new(sync.Mutex),
Provider: provider,
SalesforceClientFactory: salesforceClientFactory,
}, nil
}

func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) {
Expand All @@ -155,6 +155,11 @@ func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) {
return
}

filesClient, err := m.FilesComClientFactory.NewFilesComClient(m.Config.FilesCom.Key, m.Config.FilesCom.Endpoint)
if err != nil {
panic(err)
}

log.Infof("Found %d new files, %d to be processed", len(latestFiles), len(processors))
for processor, files := range processors {
for _, file := range files {
Expand All @@ -175,7 +180,7 @@ func (m *Monitor) PollNewFiles(ctx *context.Context, duration time.Duration) {
}
}
log.Debugf("Using temporary base path: %s", basePath)
fileEntry, err := m.FilesClient.Download(&file, basePath)
fileEntry, err := filesClient.Download(&file, basePath)
if err != nil {
log.Errorf("Failed to download %s: %s - skipping", file.Path, err)
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/monitor/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *MonitorTestSuite) SetupTest() {

func (s *MonitorTestSuite) TestRunMonitor() {
provider := &memory.MemoryProvider{}
monitor, err := NewMonitor(&test.FilesComClient{}, &test.SalesforceClient{}, provider, s.config, s.db)
monitor, err := NewMonitor(provider, s.config, s.db, &test.SalesforceClientFactory{}, &test.FilesComClientFactory{})
assert.Nil(s.T(), err)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
Expand Down
29 changes: 15 additions & 14 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,43 @@ import (
)

type Processor struct {
Db *gorm.DB
Config *config.Config
Db *gorm.DB
FilesClient common.FilesComClient
SalesforceClient common.SalesforceClient
Provider pubsub.Provider
Hostname string
Provider pubsub.Provider
SalesforceClient common.SalesforceClient
}

type BaseSubscriber struct {
Config *config.Config
Db *gorm.DB
FilesComClient common.FilesComClient
Name string
Options pubsub.HandlerOptions
Reports map[string]config.Report
SalesforceClient common.SalesforceClient
FilesComClient common.FilesComClient
Config *config.Config
Name string
}

func (s *BaseSubscriber) Setup(c *pubsub.Client) {
c.On(s.Options)
}

type ReportToExecute struct {
Name, BaseDir, Subscriber, FileName string
File *db.File
Name, BaseDir, Subscriber, FileName string
Output []byte
Scripts map[string]string
Timeout time.Duration
Output []byte
}

type ReportRunner struct {
Config *config.Config
Reports []ReportToExecute
SalesforceClient common.SalesforceClient
Db *gorm.DB
FilescomClient common.FilesComClient
Name, Subscriber, Basedir string
Db *gorm.DB
Reports []ReportToExecute
SalesforceClient common.SalesforceClient
}

func RunWithTimeout(baseDir string, timeout time.Duration, command string) ([]byte, error) {
Expand Down Expand Up @@ -391,12 +391,13 @@ func NewProcessor(filesClient common.FilesComClient, salesforceClient common.Sal
}

return &Processor{
Config: cfg,
Db: dbConn,
FilesClient: filesClient,
Hostname: hostname,
Provider: provider,
FilesClient: filesClient,
SalesforceClient: salesforceClient,
Db: dbConn,
Config: cfg}, nil
}, nil
}

func (p *Processor) getReportsByTopic(topic string) map[string]config.Report {
Expand Down
Loading