Skip to content

Commit

Permalink
Ingester: add tracing to shipper.Sync
Browse files Browse the repository at this point in the history
I see a lot of unattached spans from `storage.Object.Attrs` and
`storage.Object.Writer`; creating one large span for `shipper.Sync`
should join them all up into something more meaningful.
  • Loading branch information
bboreham committed Oct 17, 2024
1 parent a98e096 commit 28d5c61
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
* `-memberlist.acquire-writer-timeout`
* [ENHANCEMENT] memberlist: Notifications can now be processed once per interval specified by `-memberlist.notify-interval` to reduce notify storm CPU activity in large clusters. #9594
* [ENHANCEMENT] Return server-side total bytes processed statistics as a header through query frontend. #9645
* [ENHANCEMENT] Ingester: Emit traces for block syncing, to join up block-upload traces. #9656
* [BUGFIX] Fix issue where functions such as `rate()` over native histograms could return incorrect values if a float stale marker was present in the selected range. #9508
* [BUGFIX] Fix issue where negation of native histograms (eg. `-some_native_histogram_series`) did nothing. #9508
* [BUGFIX] Fix issue where `metric might not be a counter, name does not end in _total/_sum/_count/_bucket` annotation would be emitted even if `rate` or `increase` did not have enough samples to compute a result. #9508
Expand Down
24 changes: 14 additions & 10 deletions pkg/ingester/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

mimir_tsdb "github.com/grafana/mimir/pkg/storage/tsdb"
"github.com/grafana/mimir/pkg/storage/tsdb/block"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

// shipperMetrics holds the shipper metrics. Mimir runs 1 shipper for each tenant but
Expand Down Expand Up @@ -103,13 +104,16 @@ func newShipper(
//
// It is not concurrency-safe, however it is compactor-safe (running concurrently with compactor is ok).
func (s *shipper) Sync(ctx context.Context) (shipped int, err error) {
log, ctx := spanlogger.NewWithLogger(ctx, s.logger, "Ingester.Shipper.Sync")
log.SetTag("organization", s.userID)
defer log.Finish()
shippedBlocks, err := readShippedBlocks(s.dir)
if err != nil {
// If we encounter any error, proceed with an new list of shipped blocks.
// The meta file will be overridden later. Note that the meta file is only
// used to avoid unnecessary bucket.Exists call, which are properly handled
// by the system if their occur anyway.
level.Warn(s.logger).Log("msg", "reading meta file failed, will override it", "err", err)
level.Warn(log).Log("msg", "reading meta file failed, will override it", "err", err)

// Reset the shipped blocks slice, so we can rebuild it only with blocks that still exist locally.
shippedBlocks = map[ulid.ULID]time.Time{}
Expand All @@ -132,7 +136,7 @@ func (s *shipper) Sync(ctx context.Context) (shipped int, err error) {

if m.Stats.NumSamples == 0 {
// Ignore empty blocks.
level.Debug(s.logger).Log("msg", "ignoring empty block", "block", m.ULID)
log.DebugLog("msg", "ignoring empty block", "block", m.ULID)
continue
}

Expand All @@ -155,24 +159,24 @@ func (s *shipper) Sync(ctx context.Context) (shipped int, err error) {
continue
}

level.Info(s.logger).Log("msg", "uploading new block to long-term storage", "block", m.ULID)
if err := s.upload(ctx, m); err != nil {
level.Info(log).Log("msg", "uploading new block to long-term storage", "block", m.ULID)
if err := s.upload(ctx, log, m); err != nil {
// No error returned, just log line. This is because we want other blocks to be shipped even
// though this one failed. It will be retried on second Sync iteration.
level.Error(s.logger).Log("msg", "uploading new block to long-term storage failed", "block", m.ULID, "err", err)
level.Error(log).Log("msg", "uploading new block to long-term storage failed", "block", m.ULID, "err", err)
uploadErrs++
continue
}
level.Info(s.logger).Log("msg", "finished uploading new block to long-term storage", "block", m.ULID)
level.Info(log).Log("msg", "finished uploading new block to long-term storage", "block", m.ULID)

meta.Shipped[m.ULID] = model.Now()
shipped++
s.metrics.uploads.Inc()
s.metrics.lastSuccessfulUploadTime.SetToCurrentTime()
}

if err := writeShipperMetaFile(s.logger, s.dir, meta); err != nil {
level.Warn(s.logger).Log("msg", "updating meta file failed", "err", err)
if err := writeShipperMetaFile(log, s.dir, meta); err != nil {
level.Warn(log).Log("msg", "updating meta file failed", "err", err)
}

if uploadErrs > 0 {
Expand All @@ -186,7 +190,7 @@ func (s *shipper) Sync(ctx context.Context) (shipped int, err error) {
// upload method uploads the block to blocks storage. Block is uploaded with updated meta.json file with extra details.
// This updated version of meta.json is however not persisted locally on the disk, to avoid race condition when TSDB
// library could actually unload the block if it found meta.json file missing.
func (s *shipper) upload(ctx context.Context, meta *block.Meta) error {
func (s *shipper) upload(ctx context.Context, logger log.Logger, meta *block.Meta) error {
blockDir := filepath.Join(s.dir, meta.ULID.String())

meta.Thanos.Source = s.source
Expand All @@ -198,7 +202,7 @@ func (s *shipper) upload(ctx context.Context, meta *block.Meta) error {
}

// Upload block with custom metadata.
return block.Upload(ctx, s.logger, s.bucket, blockDir, meta)
return block.Upload(ctx, logger, s.bucket, blockDir, meta)
}

// blockMetasFromOldest returns the block meta of each block found in dir
Expand Down

0 comments on commit 28d5c61

Please sign in to comment.