Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connections #198

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile-debug
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
FROM ubuntu:24.04

LABEL maintainer="Canonical Sustaining Engineering <edward.hope-morley@canonical.com>"
LABEL org.opencontainers.image.description "Athena Monitor"
LABEL maintainer="Canonical Sustaining Engineering <nicolas.bock@canonical.com>"
LABEL org.opencontainers.image.description "Athena Debug Container"

RUN apt-get update
RUN DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends apt-utils
Expand Down
22 changes: 9 additions & 13 deletions cmd/processor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,28 @@ func main() {
log.Debug(line)
}

filesClient, err := common.NewFilesComClient(cfg.FilesCom.Key, cfg.FilesCom.Endpoint)
if err != nil {
panic(err)
}

sfClient, err := common.NewSalesforceClient(cfg)
if err != nil {
panic(err)
}

natsClient, err := nats.NewNats("test-cluster", stan.NatsURL(*natsUrl))
if err != nil {
panic(err)
}

p, err := processor.NewProcessor(filesClient, sfClient, natsClient, cfg, nil)
salesforceClientFactory := &common.BaseSalesforceClientFactory{}
filesComClientFactory := &common.BaseFilesComClientFactory{}

p, err := processor.NewProcessor(filesComClientFactory, salesforceClientFactory, natsClient, cfg, nil)
if err != nil {
panic(err)
}

ctx, cancel := context.WithCancel(context.Background())

if err := p.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient, name, topic string,
if err := p.Run(ctx, func(
filesComClientFactory common.FilesComClientFactory,
salesforceClientFactory common.SalesforceClientFactory,
name, topic string,
reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber {
log.Infof("Subscribing: %s - to topic: %s", name, topic)
return processor.NewBaseSubscriber(fc, sf, name, topic, reports, cfg, dbConn)
return processor.NewBaseSubscriber(filesComClientFactory, salesforceClientFactory, name, topic, reports, cfg, dbConn)
}); err != nil {
panic(err)
}
Expand Down
142 changes: 78 additions & 64 deletions pkg/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ import (
)

type Processor struct {
Config *config.Config
Db *gorm.DB
FilesClient common.FilesComClient
Hostname string
Provider pubsub.Provider
SalesforceClient common.SalesforceClient
Config *config.Config
Db *gorm.DB
FilesComClientFactory common.FilesComClientFactory
Hostname string
Provider pubsub.Provider
SalesforceClientFactory common.SalesforceClientFactory
}

type BaseSubscriber struct {
Config *config.Config
Db *gorm.DB
FilesComClient common.FilesComClient
Name string
Options pubsub.HandlerOptions
Reports map[string]config.Report
SalesforceClient common.SalesforceClient
Config *config.Config
Db *gorm.DB
FilesComClientFactory common.FilesComClientFactory
Name string
Options pubsub.HandlerOptions
Reports map[string]config.Report
SalesforceClientFactory common.SalesforceClientFactory
}

func (s *BaseSubscriber) Setup(c *pubsub.Client) {
Expand All @@ -56,10 +56,10 @@ type ReportToExecute struct {
type ReportRunner struct {
Config *config.Config
Db *gorm.DB
FilescomClient common.FilesComClient
FilesComClientFactory common.FilesComClientFactory
Name, Subscriber, Basedir string
Reports []ReportToExecute
SalesforceClient common.SalesforceClient
SalesforceClientFactory common.SalesforceClientFactory
}

func RunWithTimeout(baseDir string, timeout time.Duration, command string) ([]byte, error) {
Expand Down Expand Up @@ -118,36 +118,30 @@ func (runner *ReportRunner) UploadAndSaveReport(report *ReportToExecute, caseNum
log.Debugf("Fetching files for path '%s' from db", filePath)
result := runner.Db.Where("path = ?", filePath).First(&file)
if result.Error != nil {
return fmt.Errorf("File not found with path '%s' in database", filePath)
return fmt.Errorf("file not found with path '%s' in database", filePath)
}

log.Infof("Fetching case with number '%s' from Salesforce", caseNumber)
sfCase, err := runner.SalesforceClient.GetCaseByNumber(caseNumber)
salesforceClient, err := runner.SalesforceClientFactory.NewSalesforceClient(runner.Config)
if err != nil {
// The SalesForce connection possibly died on us. Let's try to
// revive it and then try again.
log.Warn("Creating new SF client since current one is failing")
runner.SalesforceClient, err = common.NewSalesforceClient(runner.Config)
if err != nil {
log.Errorf("Failed to reconnect to salesforce: %s", err)
panic(err)
}
sfCase, err = runner.SalesforceClient.GetCaseByNumber(caseNumber)
if err != nil {
log.Error(err)
return err
}
log.Errorf("failed to get Salesforce connection: %s", err)
return err
}
sfCase, err := salesforceClient.GetCaseByNumber(caseNumber)
if err != nil {
log.Error(err)
return err
}

log.Debugf("Case %s successfully fetched from Salesforce", sfCase)
var newReport = new(db.Report)

newReport.Created = time.Now()
newReport.CaseID = sfCase.Id
newReport.FilePath = file.Path
newReport.Created = time.Now()
newReport.FileID = file.ID
newReport.FileName = filepath.Base(file.Path)
newReport.FilePath = file.Path
newReport.Name = report.Name
newReport.FileID = file.ID
newReport.Subscriber = report.Subscriber

if runner.Config.Processor.ReportsUploadPath == "" {
Expand All @@ -156,13 +150,18 @@ func (runner *ReportRunner) UploadAndSaveReport(report *ReportToExecute, caseNum
uploadPath = path.Join(runner.Config.Processor.ReportsUploadPath, newReport.FileName)
}

filesComClient, err := runner.FilesComClientFactory.NewFilesComClient(runner.Config.FilesCom.Key, runner.Config.FilesCom.Endpoint)
if err != nil {
log.Errorf("failed to get new file.com client: %s", err)
return err
}
log.Debugf("Uploading script output(s) to files.com")
for scriptName, output := range scriptOutputs {
dst_fname := fmt.Sprintf(DefaultReportOutputFormat, uploadPath, report.Name, scriptName)
log.Debugf("Uploading script output %s", dst_fname)
uploadedFilePath, err := runner.FilescomClient.Upload(string(output), dst_fname)
uploadedFilePath, err := filesComClient.Upload(string(output), dst_fname)
if err != nil {
return fmt.Errorf("Failed to upload file '%s': %s", dst_fname, err.Error())
return fmt.Errorf("failed to upload file '%s': %s", dst_fname, err.Error())
}

log.Debugf("Successfully uploaded file '%s'", uploadedFilePath.Path)
Expand Down Expand Up @@ -229,7 +228,10 @@ func renderTemplate(ctx *pongo2.Context, data string) (string, error) {
return out, nil
}

func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceClient, fc common.FilesComClient, subscriber, name string,
func NewReportRunner(cfg *config.Config, dbConn *gorm.DB,
salesforceClientFactory common.SalesforceClientFactory,
filesComClientFactory common.FilesComClientFactory,
subscriber, name string,
file *db.File, reports map[string]config.Report) (*ReportRunner, error) {

var reportRunner ReportRunner
Expand Down Expand Up @@ -259,13 +261,13 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
}
log.Debugf("Moved file to %s", dir)

reportRunner.Config = cfg
reportRunner.Subscriber = subscriber
reportRunner.Name = name
reportRunner.Basedir = dir
reportRunner.Config = cfg
reportRunner.Db = dbConn
reportRunner.SalesforceClient = sf
reportRunner.FilescomClient = fc
reportRunner.FilesComClientFactory = filesComClientFactory
reportRunner.Name = name
reportRunner.SalesforceClientFactory = salesforceClientFactory
reportRunner.Subscriber = subscriber

//TODO: document the template variables
tplContext := pongo2.Context{
Expand All @@ -277,7 +279,7 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
var scripts = make(map[string]string)

for reportName, report := range reports {
log.Debugf("Running %s script(s) (num=%d)", reportName, len(report.Scripts))
log.Debugf("running %d '%s' script(s)", len(report.Scripts), reportName)
for scriptName, script := range report.Scripts {
if script.Run == "" {
log.Errorf("No script provided to run on '%s'", scriptName)
Expand Down Expand Up @@ -313,13 +315,13 @@ func NewReportRunner(cfg *config.Config, dbConn *gorm.DB, sf common.SalesforceCl
}

reportToExecute := ReportToExecute{}
reportToExecute.Timeout = timeout
reportToExecute.BaseDir = reportRunner.Basedir
reportToExecute.Subscriber = reportRunner.Subscriber
reportToExecute.Name = reportName
reportToExecute.File = file
reportToExecute.FileName = file.Path
reportToExecute.Name = reportName
reportToExecute.Scripts = scripts
reportToExecute.Subscriber = reportRunner.Subscriber
reportToExecute.Timeout = timeout
reportRunner.Reports = append(reportRunner.Reports, reportToExecute)
}

Expand All @@ -336,7 +338,7 @@ func (runner *ReportRunner) Clean() error {
}

func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.Msg) error {
runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClient, s.FilesComClient, s.Name, s.Options.Topic, file, s.Reports)
runner, err := NewReportRunner(s.Config, s.Db, s.SalesforceClientFactory, s.FilesComClientFactory, s.Name, s.Options.Topic, file, s.Reports)
if err != nil {
log.Errorf("Failed to get new runner: %s", err)
msg.Ack()
Expand All @@ -354,7 +356,8 @@ func (s *BaseSubscriber) Handler(_ context.Context, file *db.File, msg *pubsub.M

const defaultHandlerDeadline = 10 * time.Minute

func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient common.SalesforceClient,
func NewBaseSubscriber(
filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory,
name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) *BaseSubscriber {
var subscriber = BaseSubscriber{
Options: pubsub.HandlerOptions{
Expand All @@ -363,19 +366,21 @@ func NewBaseSubscriber(filesClient common.FilesComClient, salesforceClient commo
AutoAck: false,
JSON: true,
Deadline: defaultHandlerDeadline,
}, Reports: reports,
},
Reports: reports,
}

subscriber.FilesComClient = filesClient
subscriber.SalesforceClient = salesforceClient
subscriber.Options.Handler = subscriber.Handler
subscriber.Config = cfg
subscriber.Name = topic
subscriber.Db = dbConn
subscriber.FilesComClientFactory = filesComClientFactory
subscriber.Name = topic
subscriber.Options.Handler = subscriber.Handler
subscriber.SalesforceClientFactory = salesforceClientFactory
return &subscriber
}

func NewProcessor(filesClient common.FilesComClient, salesforceClient common.SalesforceClient,
func NewProcessor(
filesComClientFactory common.FilesComClientFactory, salesforceClientFactory common.SalesforceClientFactory,
provider pubsub.Provider, cfg *config.Config, dbConn *gorm.DB) (*Processor, error) {
var err error
if dbConn == nil {
Expand All @@ -391,12 +396,12 @@ func NewProcessor(filesClient common.FilesComClient, salesforceClient common.Sal
}

return &Processor{
Config: cfg,
Db: dbConn,
FilesClient: filesClient,
Hostname: hostname,
Provider: provider,
SalesforceClient: salesforceClient,
Config: cfg,
Db: dbConn,
FilesComClientFactory: filesComClientFactory,
Hostname: hostname,
Provider: provider,
SalesforceClientFactory: salesforceClientFactory,
}, nil
}

Expand Down Expand Up @@ -477,6 +482,11 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time.
reportMap[report.Subscriber][report.CaseID][report.Name] = append(reportMap[report.Subscriber][report.CaseID][report.Name], report)
}

salesforceClient, err := p.SalesforceClientFactory.NewSalesforceClient(p.Config)
if err != nil {
log.Errorf("failed to get Salesforce client: %s", err)
return
}
for subscriberName, caseMap := range reportMap {
for caseId, reportsByType := range caseMap {
for _, reports := range reportsByType {
Expand Down Expand Up @@ -515,10 +525,10 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time.
}
var comment *simpleforce.SObject
if p.Config.Salesforce.EnableChatter {
comment = p.SalesforceClient.PostChatter(caseId,
comment = salesforceClient.PostChatter(caseId,
chunkHeader+chunk, subscriber.SFCommentIsPublic)
} else {
comment = p.SalesforceClient.PostComment(caseId,
comment = salesforceClient.PostComment(caseId,
chunkHeader+chunk, subscriber.SFCommentIsPublic)
}
if comment == nil {
Expand All @@ -538,8 +548,11 @@ func (p *Processor) BatchSalesforceComments(ctx *context.Context, interval time.
}
}

func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient common.FilesComClient,
salesforceClient common.SalesforceClient, name, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber) error {
func (p *Processor) Run(ctx context.Context, newSubscriberFn func(
filesComClientFactory common.FilesComClientFactory,
salesforceClientFactory common.SalesforceClientFactory,
name, topic string, reports map[string]config.Report,
cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber) error {

if ctx == nil {
var cancel context.CancelFunc
Expand All @@ -554,7 +567,8 @@ func (p *Processor) Run(ctx context.Context, newSubscriberFn func(filesClient co
})

for event := range p.Config.Processor.SubscribeTo {
go pubsub.Subscribe(newSubscriberFn(p.FilesClient, p.SalesforceClient, p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db))
go pubsub.Subscribe(newSubscriberFn(p.FilesComClientFactory, p.SalesforceClientFactory,
p.Hostname, event, p.getReportsByTopic(event), p.Config, p.Db))
}

interval, err := time.ParseDuration(p.Config.Processor.BatchCommentsEvery)
Expand Down
9 changes: 4 additions & 5 deletions pkg/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,8 @@ func (s *MockSubscriber) Setup(c *pubsub.Client) {
}

func (s *ProcessorTestSuite) TestRunProcessor() {
filesComClient := test.FilesComClient{}
salesforceClient := test.SalesforceClient{}

provider := &memory.MemoryProvider{}
processor, _ := NewProcessor(&filesComClient, &salesforceClient, provider, s.config, s.db)
processor, _ := NewProcessor(&test.FilesComClientFactory{}, &test.SalesforceClientFactory{}, provider, s.config, s.db)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expand All @@ -67,7 +64,9 @@ func (s *ProcessorTestSuite) TestRunProcessor() {

var called = 0

_ = processor.Run(ctx, func(fc common.FilesComClient, sf common.SalesforceClient,
_ = processor.Run(ctx, func(
filesComClientFactory common.FilesComClientFactory,
salesforceClientFactory common.SalesforceClientFactory,
name string, topic string, reports map[string]config.Report, cfg *config.Config, dbConn *gorm.DB) pubsub.Subscriber {
var subscriber = MockSubscriber{Options: pubsub.HandlerOptions{
Topic: topic,
Expand Down
Loading