From 22eba52f10c872960459a451bb059460daf8d2cc Mon Sep 17 00:00:00 2001 From: Nicolas Bock Date: Fri, 13 Sep 2024 11:42:21 -0600 Subject: [PATCH] fixup! Do not re-use connections --- cmd/processor/main.go | 5 ++++- pkg/processor/processor.go | 39 ++++++++++++++++++++------------- pkg/processor/processor_test.go | 2 ++ 3 files changed, 30 insertions(+), 16 deletions(-) diff --git a/cmd/processor/main.go b/cmd/processor/main.go index abbfd05..3fdc542 100644 --- a/cmd/processor/main.go +++ b/cmd/processor/main.go @@ -66,7 +66,10 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) - if err := p.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient, name, topic string, + if err := p.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient, + filesComClientFactory common.FilesComClientFactory, + salesforceClientFactory common.SalesforceClientFactory, + name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber { log.Infof("Subscribing: %s - to topic: %s", name, topic) return processor.NewBaseSubscriber(fc, sf, name, topic, reports, cfg, dbConn) diff --git a/pkg/processor/processor.go b/pkg/processor/processor.go index 91939b8..8953b75 100644 --- a/pkg/processor/processor.go +++ b/pkg/processor/processor.go @@ -23,12 +23,14 @@ import ( ) type Processor struct { - Config *config.Config - Db *gorm.DB - FilesClient common.FilesComClient - Hostname string - Provider pubsub.Provider - SalesforceClient common.SalesforceClient + Config *config.Config + Db *gorm.DB + FilesClient common.FilesComClient + FilesComClientFactory common.FilesComClientFactory + Hostname string + Provider pubsub.Provider + SalesforceClient common.SalesforceClient + SalesforceClientFactory common.SalesforceClientFactory } type BaseSubscriber struct { @@ -162,7 +164,7 @@ func (runner *ReportRunner) UploadAndSaveReport(report *ReportToExecute, caseNum log.Debugf("Uploading script output %s", dst_fname) uploadedFilePath, err := runner.FilescomClient.Upload(string(output), dst_fname) if err != nil { - return fmt.Errorf("Failed to upload file '%s': %s", dst_fname, err.Error()) + return fmt.Errorf("failed to upload file '%s': %s", dst_fname, err.Error()) } log.Debugf("Successfully uploaded file '%s'", uploadedFilePath.Path) @@ -393,12 +395,14 @@ func NewProcessor(filesClient common.FilesComClient, salesforceClient common.Sal } return &Processor{ - Config: cfg, - Db: dbConn, - FilesClient: filesClient, - Hostname: hostname, - Provider: provider, - SalesforceClient: salesforceClient, + Config: cfg, + Db: dbConn, + FilesClient: filesClient, + FilesComClientFactory: filesComClientFactory, + Hostname: hostname, + Provider: provider, + SalesforceClient: salesforceClient, + SalesforceClientFactory: salesforceClientFactory, }, nil } @@ -541,7 +545,11 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time. } func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient common.FilesComClient, - salesforceClient common.SalesforceClient, name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber) error { + salesforceClient common.SalesforceClient, + filesComClientFactory common.FilesComClientFactory, + salesforceClientFactory common.SalesforceClientFactory, + name, topic string, reports map[string]config.Report, + cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber) error { if ctx == nil { var cancel context.CancelFunc @@ -556,7 +564,8 @@ func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient co }) for event := range p.Config.Processor.SubscribeTo { - go pubsub.Subscribe(newSubscriberFn(p.FilesClient, p.SalesforceClient, p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db)) + go pubsub.Subscribe(newSubscriberFn(p.FilesClient, p.SalesforceClient, p.FilesComClientFactory, p.SalesforceClientFactory, + p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db)) } interval, err := time.ParseDuration(p.Config.Processor.BatchCommentsEvery) diff --git a/pkg/processor/processor_test.go b/pkg/processor/processor_test.go index f88cbce..3851499 100644 --- a/pkg/processor/processor_test.go +++ b/pkg/processor/processor_test.go @@ -68,6 +68,8 @@ func (s *ProcessorTestSuite) TestRunProcessor() { var called = 0 _ = processor.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient, + filesComClientFactory common.FilesComClientFactory, + salesforceClientFactory common.SalesforceClientFactory, name string, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber { var subscriber = MockSubscriber{Options: pubsub.HandlerOptions{ Topic: topic,