Skip to content

Commit

Permalink
Do not re-use connections
Browse files Browse the repository at this point in the history
Processor Edition

In order to avoid stale connections to Salesforce and files.com, do not
re-use those connections (the clients) and rather create new client
connections when needed.

Issue: canonical#177
Signed-off-by: Nicolas Bock <[email protected]>
  • Loading branch information
nicolasbock committed Sep 13, 2024
1 parent 0fa3247 commit d254d5b
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 84 deletions.
4 changes: 2 additions & 2 deletions Dockerfile-debug
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM ubuntu:24.04

LABEL maintainer="Canonical Sustaining Engineering <edward.hope-morley@canonical.com>"
LABEL org.opencontainers.image.description "Athena Monitor"
LABEL maintainer="Canonical Sustaining Engineering <nicolas.bock@canonical.com>"
LABEL org.opencontainers.image.description "Athena Debug Container"

RUN apt-get update
RUN DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends apt-utils
Expand Down
22 changes: 9 additions & 13 deletions cmd/processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,28 @@ func main() {
log.Debug(line)
}

filesClient, err := common.NewFilesComClient(cfg.FilesCom.Key, cfg.FilesCom.Endpoint)
if err != nil {
panic(err)
}

sfClient, err := common.NewSalesforceClient(cfg)
if err != nil {
panic(err)
}

natsClient, err := nats.NewNats("test-cluster", stan.NatsURL(*natsUrl))
if err != nil {
panic(err)
}

p, err := processor.NewProcessor(filesClient, sfClient, natsClient, cfg, nil)
salesforceClientFactory := &common.BaseSalesforceClientFactory{}
filesComClientFactory := &common.BaseFilesComClientFactory{}

p, err := processor.NewProcessor(filesComClientFactory, salesforceClientFactory, natsClient, cfg, nil)
if err != nil {
panic(err)
}

ctx, cancel := context.WithCancel(context.Background())

if err := p.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient, name, topic string,
if err := p.Run(ctx, func(
filesComClientFactory common.FilesComClientFactory,
salesforceClientFactory common.SalesforceClientFactory,
name, topic string,
reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber {
log.Infof("Subscribing: %s - to topic: %s", name, topic)
return processor.NewBaseSubscriber(fc, sf, name, topic, reports, cfg, dbConn)
return processor.NewBaseSubscriber(filesComClientFactory, salesforceClientFactory, name, topic, reports, cfg, dbConn)
}); err != nil {
panic(err)
}
Expand Down
142 changes: 78 additions & 64 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ import (
)

type Processor struct {
Config *config.Config
Db *gorm.DB
FilesClient common.FilesComClient
Hostname string
Provider pubsub.Provider
SalesforceClient common.SalesforceClient
Config *config.Config
Db *gorm.DB
FilesComClientFactory common.FilesComClientFactory
Hostname string
Provider pubsub.Provider
SalesforceClientFactory common.SalesforceClientFactory
}

type BaseSubscriber struct {
Config *config.Config
Db *gorm.DB
FilesComClient common.FilesComClient
Name string
Options pubsub.HandlerOptions
Reports map[string]config.Report
SalesforceClient common.SalesforceClient
Config *config.Config
Db *gorm.DB
FilesComClientFactory common.FilesComClientFactory
Name string
Options pubsub.HandlerOptions
Reports map[string]config.Report
SalesforceClientFactory common.SalesforceClientFactory
}

func (s *BaseSubscriber) Setup(c *pubsub.Client) {
Expand All @@ -56,10 +56,10 @@ type ReportToExecute struct {
type ReportRunner struct {
Config *config.Config
Db *gorm.DB
FilescomClient common.FilesComClient
FilesComClientFactory common.FilesComClientFactory
Name, Subscriber, Basedir string
Reports []ReportToExecute
SalesforceClient common.SalesforceClient
SalesforceClientFactory common.SalesforceClientFactory
}

func RunWithTimeout(baseDir string, timeout time.Duration, command string) ([]byte, error) {
Expand Down Expand Up @@ -118,36 +118,30 @@ func (runner *ReportRunner) UploadAndSaveReport(report *ReportToExecute, caseNum
log.Debugf("Fetching files for path '%s' from db", filePath)
result := runner.Db.Where("path = ?", filePath).First(&file)
if result.Error != nil {
return fmt.Errorf("File not found with path '%s' in database", filePath)
return fmt.Errorf("file not found with path '%s' in database", filePath)
}

log.Infof("Fetching case with number '%s' from Salesforce", caseNumber)
sfCase, err := runner.SalesforceClient.GetCaseByNumber(caseNumber)
salesforceClient, err := runner.SalesforceClientFactory.NewSalesforceClient(runner.Config)
if err != nil {
// The SalesForce connection possibly died on us. Let's try to
// revive it and then try again.
log.Warn("Creating new SF client since current one is failing")
runner.SalesforceClient, err = common.NewSalesforceClient(runner.Config)
if err != nil {
log.Errorf("Failed to reconnect to salesforce: %s", err)
panic(err)
}
sfCase, err = runner.SalesforceClient.GetCaseByNumber(caseNumber)
if err != nil {
log.Error(err)
return err
}
log.Errorf("failed to get Salesforce connection: %s", err)
return err
}
sfCase, err := salesforceClient.GetCaseByNumber(caseNumber)
if err != nil {
log.Error(err)
return err
}

log.Debugf("Case %s successfully fetched from Salesforce", sfCase)
var newReport = new(db.Report)

newReport.Created = time.Now()
newReport.CaseID = sfCase.Id
newReport.FilePath = file.Path
newReport.Created = time.Now()
newReport.FileID = file.ID
newReport.FileName = filepath.Base(file.Path)
newReport.FilePath = file.Path
newReport.Name = report.Name
newReport.FileID = file.ID
newReport.Subscriber = report.Subscriber

if runner.Config.Processor.ReportsUploadPath == "" {
Expand All @@ -156,13 +150,18 @@ func (runner *ReportRunner) UploadAndSaveReport(report *ReportToExecute, caseNum
uploadPath = path.Join(runner.Config.Processor.ReportsUploadPath, newReport.FileName)
}

filesComClient, err := runner.FilesComClientFactory.NewFilesComClient(runner.Config.FilesCom.Key, runner.Config.FilesCom.Endpoint)
if err != nil {
log.Errorf("failed to get new file.com client: %s", err)
return err
}
log.Debugf("Uploading script output(s) to files.com")
for scriptName, output := range scriptOutputs {
dst_fname := fmt.Sprintf(DefaultReportOutputFormat, uploadPath, report.Name, scriptName)
log.Debugf("Uploading script output %s", dst_fname)
uploadedFilePath, err := runner.FilescomClient.Upload(string(output), dst_fname)
uploadedFilePath, err := filesComClient.Upload(string(output), dst_fname)
if err != nil {
return fmt.Errorf("Failed to upload file '%s': %s", dst_fname, err.Error())
return fmt.Errorf("failed to upload file '%s': %s", dst_fname, err.Error())
}

log.Debugf("Successfully uploaded file '%s'", uploadedFilePath.Path)
Expand Down Expand Up @@ -229,7 +228,10 @@ func renderTemplate(ctx *pongo2.Context, data string) (string, error) {
return out, nil
}

func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceClient, fc common.FilesComClient, subscriber, name string,
func NewReportRunner(cfg *config.Config, dbConn *gorm.DB,
salesforceClientFactory common.SalesforceClientFactory,
filesComClientFactory common.FilesComClientFactory,
subscriber, name string,
file *db.File, reports map[string]config.Report) (*ReportRunner, error) {

var reportRunner ReportRunner
Expand Down Expand Up @@ -259,13 +261,13 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
}
log.Debugf("Moved file to %s", dir)

reportRunner.Config = cfg
reportRunner.Subscriber = subscriber
reportRunner.Name = name
reportRunner.Basedir = dir
reportRunner.Config = cfg
reportRunner.Db = dbConn
reportRunner.SalesforceClient = sf
reportRunner.FilescomClient = fc
reportRunner.FilesComClientFactory = filesComClientFactory
reportRunner.Name = name
reportRunner.SalesforceClientFactory = salesforceClientFactory
reportRunner.Subscriber = subscriber

//TODO: document the template variables
tplContext := pongo2.Context{
Expand All @@ -277,7 +279,7 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
var scripts = make(map[string]string)

for reportName, report := range reports {
log.Debugf("Running %s script(s) (num=%d)", reportName, len(report.Scripts))
log.Debugf("running %d '%s' script(s)", len(report.Scripts), reportName)
for scriptName, script := range report.Scripts {
if script.Run == "" {
log.Errorf("No script provided to run on '%s'", scriptName)
Expand Down Expand Up @@ -313,13 +315,13 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
}

reportToExecute := ReportToExecute{}
reportToExecute.Timeout = timeout
reportToExecute.BaseDir = reportRunner.Basedir
reportToExecute.Subscriber = reportRunner.Subscriber
reportToExecute.Name = reportName
reportToExecute.File = file
reportToExecute.FileName = file.Path
reportToExecute.Name = reportName
reportToExecute.Scripts = scripts
reportToExecute.Subscriber = reportRunner.Subscriber
reportToExecute.Timeout = timeout
reportRunner.Reports = append(reportRunner.Reports, reportToExecute)
}

Expand All @@ -336,7 +338,7 @@ func (runner *ReportRunner) Clean() error {
}

func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.Msg) error {
runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClient, s.FilesComClient, s.Name, s.Options.Topic, file, s.Reports)
runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClientFactory, s.FilesComClientFactory, s.Name, s.Options.Topic, file, s.Reports)
if err != nil {
log.Errorf("Failed to get new runner: %s", err)
msg.Ack()
Expand All @@ -354,7 +356,8 @@ func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.M

const defaultHandlerDeadline = 10 * time.Minute

func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient common.SalesforceClient,
func NewBaseSubscriber(
filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory,
name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) *BaseSubscriber {
var subscriber = BaseSubscriber{
Options: pubsub.HandlerOptions{
Expand All @@ -363,19 +366,21 @@ func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient commo
AutoAck: false,
JSON: true,
Deadline: defaultHandlerDeadline,
}, Reports: reports,
},
Reports: reports,
}

subscriber.FilesComClient = filesClient
subscriber.SalesforceClient = salesforceClient
subscriber.Options.Handler = subscriber.Handler
subscriber.Config = cfg
subscriber.Name = topic
subscriber.Db = dbConn
subscriber.FilesComClientFactory = filesComClientFactory
subscriber.Name = topic
subscriber.Options.Handler = subscriber.Handler
subscriber.SalesforceClientFactory = salesforceClientFactory
return &subscriber
}

func NewProcessor(filesClient common.FilesComClient, salesforceClient common.SalesforceClient,
func NewProcessor(
filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory,
provider pubsub.Provider, cfg *config.Config, dbConn *gorm.DB) (*Processor, error) {
var err error
if dbConn == nil {
Expand All @@ -391,12 +396,12 @@ func NewProcessor(filesClient common.FilesComClient, salesforceClient common.Sal
}

return &Processor{
Config: cfg,
Db: dbConn,
FilesClient: filesClient,
Hostname: hostname,
Provider: provider,
SalesforceClient: salesforceClient,
Config: cfg,
Db: dbConn,
FilesComClientFactory: filesComClientFactory,
Hostname: hostname,
Provider: provider,
SalesforceClientFactory: salesforceClientFactory,
}, nil
}

Expand Down Expand Up @@ -477,6 +482,11 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time.
reportMap[report.Subscriber][report.CaseID][report.Name] = append(reportMap[report.Subscriber][report.CaseID][report.Name], report)
}

salesforceClient, err := p.SalesforceClientFactory.NewSalesforceClient(p.Config)
if err != nil {
log.Errorf("failed to get Salesforce client: %s", err)
return
}
for subscriberName, caseMap := range reportMap {
for caseId, reportsByType := range caseMap {
for _, reports := range reportsByType {
Expand Down Expand Up @@ -515,10 +525,10 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time.
}
var comment *simpleforce.SObject
if p.Config.Salesforce.EnableChatter {
comment = p.SalesforceClient.PostChatter(caseId,
comment = salesforceClient.PostChatter(caseId,
chunkHeader+chunk, subscriber.SFCommentIsPublic)
} else {
comment = p.SalesforceClient.PostComment(caseId,
comment = salesforceClient.PostComment(caseId,
chunkHeader+chunk, subscriber.SFCommentIsPublic)
}
if comment == nil {
Expand All @@ -538,8 +548,11 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time.
}
}

func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient common.FilesComClient,
salesforceClient common.SalesforceClient, name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber) error {
func (p *Processor) Run(ctx context.Context, newSubscriberFn func(
filesComClientFactory common.FilesComClientFactory,
salesforceClientFactory common.SalesforceClientFactory,
name, topic string, reports map[string]config.Report,
cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber) error {

if ctx == nil {
var cancel context.CancelFunc
Expand All @@ -554,7 +567,8 @@ func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient co
})

for event := range p.Config.Processor.SubscribeTo {
go pubsub.Subscribe(newSubscriberFn(p.FilesClient, p.SalesforceClient, p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db))
go pubsub.Subscribe(newSubscriberFn(p.FilesComClientFactory, p.SalesforceClientFactory,
p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db))
}

interval, err := time.ParseDuration(p.Config.Processor.BatchCommentsEvery)
Expand Down
9 changes: 4 additions & 5 deletions pkg/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ func (s *MockSubscriber) Setup(c *pubsub.Client) {
}

func (s *ProcessorTestSuite) TestRunProcessor() {
filesComClient := test.FilesComClient{}
salesforceClient := test.SalesforceClient{}

provider := &memory.MemoryProvider{}
processor, _ := NewProcessor(&filesComClient, &salesforceClient, provider, s.config, s.db)
processor, _ := NewProcessor(&test.FilesComClientFactory{}, &test.SalesforceClientFactory{}, provider, s.config, s.db)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expand All @@ -67,7 +64,9 @@ func (s *ProcessorTestSuite) TestRunProcessor() {

var called = 0

_ = processor.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient,
_ = processor.Run(ctx, func(
filesComClientFactory common.FilesComClientFactory,
salesforceClientFactory common.SalesforceClientFactory,
name string, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber {
var subscriber = MockSubscriber{Options: pubsub.HandlerOptions{
Topic: topic,
Expand Down

0 comments on commit d254d5b

Please sign in to comment.