Skip to content

Commit

Permalink
(Logscribe) send duplicate or logs with issues to slack (#2256)
Browse files Browse the repository at this point in the history
* send duplicate or logs with issues to slack

* reflect feedback
  • Loading branch information
Intizar-T authored Sep 6, 2024
1 parent ba7b1e5 commit 7b8f678
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions node/pkg/logscribe/logprocessor/logprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ func (p *LogProcessor) fetchCurrentIssues(ctx context.Context) ([]string, error)
return issues, nil
}

func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []LogInsertModelWithCount, service string) {
func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []LogInsertModelWithCount, service string) []LogInsertModelWithCount {
if p.githubOwner == "test" {
return
return nil
}

currentIssues, err := p.fetchCurrentIssues(ctx)
Expand All @@ -201,28 +201,33 @@ func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []Lo

issueCount := 0
processedLogHashes := [][]interface{}{}
excludedLogs := []LogInsertModelWithCount{}
for _, entry := range processedLogs {
hash := hashLog(entry.LogInsertModel)
res, err := db.QueryRow[Count](ctx, logAlreadyProcessedQuery, map[string]any{
"hash": hash,
})
if err != nil {
log.Error().Err(err).Msg("Failed to check if log already processed")
excludedLogs = append(excludedLogs, entry)
continue
}
if res.Count > 0 {
log.Debug().Msg("Log already processed, skipping creation of github issue")
excludedLogs = append(excludedLogs, entry)
continue
}
if slices.Contains(currentIssues, entry.Message) {
log.Debug().Msgf("Issue already exists, skipping: %s", entry.Message)
excludedLogs = append(excludedLogs, entry)
continue
}

entryJson, err := json.MarshalIndent(entry, "", " ")
if err != nil {
log.Error().Err(err).Msg("Failed to marshal log")
return
excludedLogs = append(excludedLogs, entry)
continue
}
formattedBody := "```go\n" + string(entryJson) + "\n```"
issueRequest := &github.IssueRequest{
Expand All @@ -247,6 +252,8 @@ func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []Lo
if err == nil {
processedLogHashes = append(processedLogHashes, []interface{}{hash})
issueCount++
} else {
excludedLogs = append(excludedLogs, entry)
}
}

Expand All @@ -258,6 +265,8 @@ func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []Lo
}

log.Info().Msgf("Created %d github issues", issueCount)

return excludedLogs
}

func (p *LogProcessor) BulkCopyLogs(ctx context.Context, logsChannel <-chan *[]LogInsertModel) {
Expand Down Expand Up @@ -304,8 +313,10 @@ func (p *LogProcessor) StartProcessingCronJob(ctx context.Context) error {
for _, service := range services {
processedLogs := ProcessLogs(ctx, service.Service)
if len(processedLogs) > 0 {
addLogsToSlackMessage(&slackMessage, processedLogs)
p.CreateGithubIssue(ctx, processedLogs, service.Service)
duplicateOrLogsWithIssues := p.CreateGithubIssue(ctx, processedLogs, service.Service)
if len(duplicateOrLogsWithIssues) > 0 {
addLogsToSlackMessage(&slackMessage, duplicateOrLogsWithIssues)
}
}
}
if slackMessage.Len() > 0 {
Expand Down

0 comments on commit 7b8f678

Please sign in to comment.