Skip to content

Commit

Permalink
[CSE-1] Improve logging of CSV export (#285)
Browse files Browse the repository at this point in the history
* [CSE-1] Improve logging of CSV export

- Standardised the fields coming out of the regular exporter
- Added timing information from the CSV exporter

* Encode ts in JSON logs as ISO timestamps
  • Loading branch information
BCook98 authored Sep 26, 2022
1 parent e10b389 commit 9b32926
Show file tree
Hide file tree
Showing 21 changed files with 183 additions and 152 deletions.
25 changes: 22 additions & 3 deletions internal/app/feed/exporter_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ type CSVExporter struct {

// CreateSchema generated schema for a feed in csv format
func (e *CSVExporter) CreateSchema(feed Feed, rows interface{}) error {
e.Logger.Infof("%s: writing out CSV schema file", feed.Name())
logger := e.Logger.With(
"feed", feed.Name(),
)
logger.Info("writing out CSV schema file")

exportFilePath := filepath.Join(e.ExportPath, fmt.Sprintf("%s.csv", feed.Name()))
_, err := os.Stat(exportFilePath)
Expand All @@ -36,14 +39,17 @@ func (e *CSVExporter) CreateSchema(feed Feed, rows interface{}) error {
return gocsv.Marshal(rows, file)
}

e.Logger.Infof("%s: skipping. CSV file already exists.", feed.Name())
logger.Info("CSV file already exists, skipping")

return nil
}

// FinaliseExport closes out an export
func (e *CSVExporter) FinaliseExport(feed Feed, rows interface{}) error {
e.Logger.Infof("%s: writing out CSV file", feed.Name())
logger := e.Logger.With(
"feed", feed.Name(),
)
logger.Info("writing out CSV file")

err := e.cleanOldFiles(feed.Name())
if err != nil {
Expand All @@ -58,6 +64,7 @@ func (e *CSVExporter) FinaliseExport(feed Feed, rows interface{}) error {
rowsAdded := 0
var file *os.File
for {
preTime := time.Now()
resp := e.DB.Table(feed.Name()).
Order(feed.Order()).
Limit(limit).
Expand All @@ -66,6 +73,7 @@ func (e *CSVExporter) FinaliseExport(feed Feed, rows interface{}) error {
if resp.Error != nil {
return resp.Error
}
postQueryTime := time.Now()

if resp.RowsAffected == 0 || resp.RowsAffected == -1 {
break
Expand Down Expand Up @@ -97,9 +105,20 @@ func (e *CSVExporter) FinaliseExport(feed Feed, rows interface{}) error {
}
}

postWriteTime := time.Now()

offset = offset + limit
rowsAdded += int(resp.RowsAffected)

logger.With(
"rows_added", resp.RowsAffected,
"total_time_ms", postWriteTime.Sub(preTime).Milliseconds(),
"query_time_ms", postQueryTime.Sub(preTime).Milliseconds(),
"write_time_ms", postWriteTime.Sub(postQueryTime).Milliseconds(),
).Info("writing out CSV batch")
}

logger.Info("CSV exported")
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/app/feed/exporter_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func isPrimaryKey(pk int) string {

// WriteSchema writes schema of a feed to output in tabular format
func (e *SchemaExporter) WriteSchema(feed Feed) error {
e.Logger.Infof("Schema for %s:", feed.Name())
e.Logger.With("feed", feed.Name()).Info("writing schema")

schema := &[]*schema{}

Expand Down
4 changes: 3 additions & 1 deletion internal/app/feed/exporter_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func (e *SQLExporter) InitFeed(feed Feed, opts *InitFeedOptions) error {
}

if opts.Truncate {
e.Logger.Infof("%s: truncating", feed.Name())
e.Logger.With(
"feed", feed.Name(),
).Info("truncating")
result := e.DB.Session(&gorm.Session{AllowGlobalUpdate: true}).Unscoped().Delete(model)
if result.Error != nil {
return errors.Wrap(result.Error, "Unable to truncate table")
Expand Down
31 changes: 0 additions & 31 deletions internal/app/feed/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package feed

import (
"context"
"fmt"
"time"

"github.com/SafetyCulture/iauditor-exporter/internal/app/api"
Expand Down Expand Up @@ -44,33 +43,3 @@ type Exporter interface {
SupportsUpsert() bool
ParameterLimit() int
}

// LogStringConfig is the config for GetLogString function
type LogStringConfig struct {
RemainingRecords int64
HTTPDuration time.Duration
ExporterDuration time.Duration
}

// GetLogString build a log string based on input arguments
func GetLogString(feedName string, cfg *LogStringConfig) string {
var args = []any{feedName}
var format = "%s: "

if cfg != nil {
format = format + "%d remaining."
args = append(args, cfg.RemainingRecords)

if cfg.HTTPDuration.Milliseconds() != 0 {
format = format + " Last http call was %dms."
args = append(args, cfg.HTTPDuration.Milliseconds())
}

if cfg.ExporterDuration.Milliseconds() != 0 {
format = format + " Last export operation was %dms."
args = append(args, cfg.ExporterDuration.Milliseconds())
}
}

return fmt.Sprintf(format, args...)
}
20 changes: 12 additions & 8 deletions internal/app/feed/feed_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ func (f *ActionFeed) CreateSchema(exporter Exporter) error {

// Export exports the feed to the supplied exporter
func (f *ActionFeed) Export(ctx context.Context, apiClient *api.Client, exporter Exporter, orgID string) error {
logger := util.GetLogger()
feedName := f.Name()
logger := util.GetLogger().With(
"feed", f.Name(),
"org_id", orgID,
)

exporter.InitFeed(f, &InitFeedOptions{
// Delete data if incremental refresh is disabled so there is no duplicates
Expand All @@ -107,7 +109,9 @@ func (f *ActionFeed) Export(ctx context.Context, apiClient *api.Client, exporter
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
util.Check(err, "unable to load modified after")

logger.Infof("%s: exporting for org_id: %s since: %s", feedName, orgID, f.ModifiedAfter.Format(time.RFC1123))
logger.With(
"modified_after", f.ModifiedAfter.Format(time.RFC1123),
).Info("exporting")

err = apiClient.DrainFeed(ctx, &api.GetFeedRequest{
InitialURL: "/feed/actions",
Expand Down Expand Up @@ -136,11 +140,11 @@ func (f *ActionFeed) Export(ctx context.Context, apiClient *api.Client, exporter
}
}

logger.Info(GetLogString(f.Name(), &LogStringConfig{
RemainingRecords: resp.Metadata.RemainingRecords,
HTTPDuration: apiClient.Duration,
ExporterDuration: exporter.GetDuration(),
}))
logger.With(
"estimated_remaining", resp.Metadata.RemainingRecords,
"duration_ms", apiClient.Duration.Milliseconds(),
"export_duration_ms", exporter.GetDuration().Milliseconds(),
).Info("export batch complete")
return nil
})
util.Check(err, "Failed to export feed")
Expand Down
16 changes: 12 additions & 4 deletions internal/app/feed/feed_action_assignees.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ func (f *ActionAssigneeFeed) writeRows(ctx context.Context, exporter Exporter, r

// Export exports the feed to the supplied exporter
func (f *ActionAssigneeFeed) Export(ctx context.Context, apiClient *api.Client, exporter Exporter, orgID string) error {
logger := util.GetLogger()
feedName := f.Name()
logger := util.GetLogger().With(
"feed", f.Name(),
"org_id", orgID,
)

exporter.InitFeed(f, &InitFeedOptions{
// Delete data if incremental refresh is disabled so there is no duplicates
Expand All @@ -109,7 +111,9 @@ func (f *ActionAssigneeFeed) Export(ctx context.Context, apiClient *api.Client,
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
util.Check(err, "unable to load modified after")

logger.Infof("%s: exporting for org_id: %s since: %s", feedName, orgID, f.ModifiedAfter.Format(time.RFC1123))
logger.With(
"modified_after", f.ModifiedAfter.Format(time.RFC1123),
).Info("exporting")

err = apiClient.DrainFeed(ctx, &api.GetFeedRequest{
InitialURL: "/feed/action_assignees",
Expand All @@ -127,7 +131,11 @@ func (f *ActionAssigneeFeed) Export(ctx context.Context, apiClient *api.Client,
util.Check(err, "Failed to write data to exporter")
}

logger.Infof("%s: %d remaining", feedName, resp.Metadata.RemainingRecords)
logger.With(
"estimated_remaining", resp.Metadata.RemainingRecords,
"duration_ms", apiClient.Duration.Milliseconds(),
"export_duration_ms", exporter.GetDuration().Milliseconds(),
).Info("export batch complete")
return nil
})
util.Check(err, "Failed to export feed")
Expand Down
14 changes: 10 additions & 4 deletions internal/app/feed/feed_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,12 @@ func (f *GroupFeed) CreateSchema(exporter Exporter) error {

// Export exports the feed to the supplied exporter
func (f *GroupFeed) Export(ctx context.Context, apiClient *api.Client, exporter Exporter, orgID string) error {
logger := util.GetLogger()
feedName := f.Name()
logger := util.GetLogger().With(
"feed", f.Name(),
"org_id", orgID,
)

logger.Infof("%s: exporting for org_id: %s", feedName, orgID)
logger.Info("exporting")

exporter.InitFeed(f, &InitFeedOptions{
// Truncate files if upserts aren't supported.
Expand Down Expand Up @@ -96,7 +98,11 @@ func (f *GroupFeed) Export(ctx context.Context, apiClient *api.Client, exporter
}
}

logger.Infof("%s: %d remaining. Last call was %dms", feedName, resp.Metadata.RemainingRecords, apiClient.Duration.Milliseconds())
logger.With(
"estimated_remaining", resp.Metadata.RemainingRecords,
"duration_ms", apiClient.Duration.Milliseconds(),
"export_duration_ms", exporter.GetDuration().Milliseconds(),
).Info("export batch complete")
return nil
})
util.Check(err, "Failed to export feed")
Expand Down
14 changes: 10 additions & 4 deletions internal/app/feed/feed_group_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ func (f *GroupUserFeed) CreateSchema(exporter Exporter) error {

// Export exports the feed to the supplied exporter
func (f *GroupUserFeed) Export(ctx context.Context, apiClient *api.Client, exporter Exporter, orgID string) error {
logger := util.GetLogger()
feedName := f.Name()
logger := util.GetLogger().With(
"feed", f.Name(),
"org_id", orgID,
)

logger.Infof("%s: exporting for org_id: %s", feedName, orgID)
logger.Info("exporting")

exporter.InitFeed(f, &InitFeedOptions{
// Truncate files if upserts aren't supported.
Expand Down Expand Up @@ -101,7 +103,11 @@ func (f *GroupUserFeed) Export(ctx context.Context, apiClient *api.Client, expor
}
}

logger.Infof("%s: %d remaining. Last call was %dms", feedName, resp.Metadata.RemainingRecords, apiClient.Duration.Milliseconds())
logger.With(
"estimated_remaining", resp.Metadata.RemainingRecords,
"duration_ms", apiClient.Duration.Milliseconds(),
"export_duration_ms", exporter.GetDuration().Milliseconds(),
).Info("export batch complete")
return nil
})
util.Check(err, "Failed to export feed")
Expand Down
22 changes: 14 additions & 8 deletions internal/app/feed/feed_inspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ func (f *InspectionFeed) CreateSchema(exporter Exporter) error {

// Export exports the feed to the supplied exporter
func (f *InspectionFeed) Export(ctx context.Context, apiClient *api.Client, exporter Exporter, orgID string) error {
logger := util.GetLogger()
feedName := f.Name()
logger := util.GetLogger().With(
"feed", f.Name(),
"org_id", orgID,
)

exporter.InitFeed(f, &InitFeedOptions{
// Delete data if incremental refresh is disabled so there is no duplicates
Expand All @@ -173,7 +175,10 @@ func (f *InspectionFeed) Export(ctx context.Context, apiClient *api.Client, expo
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
util.Check(err, "unable to load modified after")

logger.Infof("%s: exporting for org_id: %s since: %s - %s", feedName, orgID, f.ModifiedAfter.Format(time.RFC1123), f.WebReportLink)
logger.With(
"modified_after", f.ModifiedAfter.Format(time.RFC1123),
"web_report_link_type", f.WebReportLink,
).Info("exporting")

// Process Inspections
err = f.processNewInspections(ctx, apiClient, exporter)
Expand Down Expand Up @@ -214,11 +219,12 @@ func (f *InspectionFeed) processNewInspections(ctx context.Context, apiClient *a
}
}

lg.Info(GetLogString(f.Name(), &LogStringConfig{
RemainingRecords: resp.Metadata.RemainingRecords,
HTTPDuration: apiClient.Duration,
ExporterDuration: exporter.GetDuration(),
}))
lg.With(
"estimated_remaining", resp.Metadata.RemainingRecords,
"duration_ms", apiClient.Duration.Milliseconds(),
"export_duration_ms", exporter.GetDuration().Milliseconds(),
).Info("export batch complete")

return nil
}

Expand Down
20 changes: 12 additions & 8 deletions internal/app/feed/feed_inspection_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ func (f *InspectionItemFeed) CreateSchema(exporter Exporter) error {

// Export exports the feed to the supplied exporter
func (f *InspectionItemFeed) Export(ctx context.Context, apiClient *api.Client, exporter Exporter, orgID string) error {
logger := util.GetLogger()
feedName := f.Name()
logger := util.GetLogger().With(
"feed", f.Name(),
"org_id", orgID,
)

exporter.InitFeed(f, &InitFeedOptions{
// Delete data if incremental refresh is disabled so there is no duplicates
Expand All @@ -234,7 +236,9 @@ func (f *InspectionItemFeed) Export(ctx context.Context, apiClient *api.Client,
f.ModifiedAfter, err = exporter.LastModifiedAt(f, f.ModifiedAfter, orgID)
util.Check(err, "unable to load modified after")

logger.Infof("%s: exporting for org_id: %s since: %s", feedName, orgID, f.ModifiedAfter.Format(time.RFC1123))
logger.With(
"modified_after", f.ModifiedAfter.Format(time.RFC1123),
).Info("exporting")

err = apiClient.DrainFeed(ctx, &api.GetFeedRequest{
InitialURL: "/feed/inspection_items",
Expand All @@ -257,11 +261,11 @@ func (f *InspectionItemFeed) Export(ctx context.Context, apiClient *api.Client,
util.Check(err, "Failed to write data to exporter")
}

logger.Info(GetLogString(f.Name(), &LogStringConfig{
RemainingRecords: resp.Metadata.RemainingRecords,
HTTPDuration: apiClient.Duration,
ExporterDuration: exporter.GetDuration(),
}))
logger.With(
"estimated_remaining", resp.Metadata.RemainingRecords,
"duration_ms", apiClient.Duration.Milliseconds(),
"export_duration_ms", exporter.GetDuration().Milliseconds(),
).Info("export batch complete")
return nil
})
util.Check(err, "Failed to export feed")
Expand Down
9 changes: 7 additions & 2 deletions internal/app/feed/feed_issue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package feed
import (
"context"
"encoding/json"
"time"

"github.com/SafetyCulture/iauditor-exporter/internal/app/api"
"github.com/SafetyCulture/iauditor-exporter/internal/app/util"
"time"
)

const (
Expand Down Expand Up @@ -118,7 +119,11 @@ func (f *IssueFeed) Export(ctx context.Context, apiClient *api.Client, exporter
}
}

logger.Infof("%s: %d remaining. Last call was %dms", f.Name(), resp.Metadata.RemainingRecords, apiClient.Duration.Milliseconds())
logger.With(
"estimated_remaining", resp.Metadata.RemainingRecords,
"duration_ms", apiClient.Duration.Milliseconds(),
"export_duration_ms", exporter.GetDuration().Milliseconds(),
).Info("export batch complete")
return nil
}

Expand Down
Loading

0 comments on commit 9b32926

Please sign in to comment.