diff --git a/node/pkg/logscribe/logprocessor/logprocessor.go b/node/pkg/logscribe/logprocessor/logprocessor.go index f75c6ed9c..8ceef56f0 100644 --- a/node/pkg/logscribe/logprocessor/logprocessor.go +++ b/node/pkg/logscribe/logprocessor/logprocessor.go @@ -186,9 +186,9 @@ func (p *LogProcessor) fetchCurrentIssues(ctx context.Context) ([]string, error) return issues, nil } -func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []LogInsertModelWithCount, service string) { +func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []LogInsertModelWithCount, service string) []LogInsertModelWithCount { if p.githubOwner == "test" { - return + return nil } currentIssues, err := p.fetchCurrentIssues(ctx) @@ -201,6 +201,7 @@ func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []Lo issueCount := 0 processedLogHashes := [][]interface{}{} + excludedLogs := []LogInsertModelWithCount{} for _, entry := range processedLogs { hash := hashLog(entry.LogInsertModel) res, err := db.QueryRow[Count](ctx, logAlreadyProcessedQuery, map[string]any{ @@ -208,21 +209,25 @@ func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []Lo }) if err != nil { log.Error().Err(err).Msg("Failed to check if log already processed") + excludedLogs = append(excludedLogs, entry) continue } if res.Count > 0 { log.Debug().Msg("Log already processed, skipping creation of github issue") + excludedLogs = append(excludedLogs, entry) continue } if slices.Contains(currentIssues, entry.Message) { log.Debug().Msgf("Issue already exists, skipping: %s", entry.Message) + excludedLogs = append(excludedLogs, entry) continue } entryJson, err := json.MarshalIndent(entry, "", " ") if err != nil { log.Error().Err(err).Msg("Failed to marshal log") - return + excludedLogs = append(excludedLogs, entry) + continue } formattedBody := "```go\n" + string(entryJson) + "\n```" issueRequest := &github.IssueRequest{ @@ -247,6 +252,8 @@ func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []Lo if err == nil { processedLogHashes = append(processedLogHashes, []interface{}{hash}) issueCount++ + } else { + excludedLogs = append(excludedLogs, entry) } } @@ -258,6 +265,8 @@ func (p *LogProcessor) CreateGithubIssue(ctx context.Context, processedLogs []Lo } log.Info().Msgf("Created %d github issues", issueCount) + + return excludedLogs } func (p *LogProcessor) BulkCopyLogs(ctx context.Context, logsChannel <-chan *[]LogInsertModel) { @@ -304,8 +313,10 @@ func (p *LogProcessor) StartProcessingCronJob(ctx context.Context) error { for _, service := range services { processedLogs := ProcessLogs(ctx, service.Service) if len(processedLogs) > 0 { - addLogsToSlackMessage(&slackMessage, processedLogs) - p.CreateGithubIssue(ctx, processedLogs, service.Service) + duplicateOrLogsWithIssues := p.CreateGithubIssue(ctx, processedLogs, service.Service) + if len(duplicateOrLogsWithIssues) > 0 { + addLogsToSlackMessage(&slackMessage, duplicateOrLogsWithIssues) + } } } if slackMessage.Len() > 0 {