Skip to content

Commit

Permalink
Merge pull request prometheus#513 from prometheus/beorn7/ingestion-tw…
Browse files Browse the repository at this point in the history
…eaks

Ingestion tweaks
  • Loading branch information
juliusv committed Feb 6, 2015
2 parents 1c9b3c4 + 16a1a6d commit 4e6a807
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 23 deletions.
10 changes: 6 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
alertmanagerURL = flag.String("alertmanager.url", "", "The URL of the alert manager to send notifications to.")
notificationQueueCapacity = flag.Int("alertmanager.notification-queue-capacity", 100, "The capacity of the queue for pending alert manager notifications.")

metricsStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.")
persistenceStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.")

remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
Expand All @@ -56,7 +56,8 @@ var (

numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")

storageRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
persistenceRetentionPeriod = flag.Duration("storage.local.retention", 15*24*time.Hour, "How long to retain samples in the local storage.")
persistenceQueueCapacity = flag.Int("storage.local.persistence-queue-capacity", 128*1024, "How many chunks can be waiting for being persisted before sample ingestion will stop.")

checkpointInterval = flag.Duration("storage.local.checkpoint-interval", 5*time.Minute, "The period at which the in-memory index of time series is checkpointed.")
checkpointDirtySeriesLimit = flag.Int("storage.local.checkpoint-dirty-series-limit", 5000, "If approx. that many time series are in a state that would require a recovery operation after a crash, a checkpoint is triggered, even if the checkpoint interval hasn't passed yet. A recovery operation requires a disk seek. The default limit intends to keep the recovery time below 1min even on spinning disks. With SSD, recovery is much faster, so you might want to increase this value in that case to avoid overly frequent checkpoints.")
Expand Down Expand Up @@ -116,8 +117,9 @@ func NewPrometheus() *prometheus {

o := &local.MemorySeriesStorageOptions{
MemoryChunks: *numMemoryChunks,
PersistenceStoragePath: *metricsStoragePath,
PersistenceRetentionPeriod: *storageRetentionPeriod,
PersistenceStoragePath: *persistenceStoragePath,
PersistenceRetentionPeriod: *persistenceRetentionPeriod,
PersistenceQueueCapacity: *persistenceQueueCapacity,
CheckpointInterval: *checkpointInterval,
CheckpointDirtySeriesLimit: *checkpointDirtySeriesLimit,
Dirty: *storageDirty,
Expand Down
17 changes: 16 additions & 1 deletion retrieval/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,25 @@ func (t *target) RunScraper(ingester extraction.Ingester, interval time.Duration
case <-t.scraperStopping:
return
case <-ticker.C:
targetIntervalLength.WithLabelValues(interval.String()).Observe(float64(time.Since(t.lastScrape) / time.Second))
took := time.Since(t.lastScrape)
t.Lock() // Write t.lastScrape requires locking.
t.lastScrape = time.Now()
t.Unlock()
targetIntervalLength.WithLabelValues(interval.String()).Observe(
float64(took) / float64(time.Second), // Sub-second precision.
)
// Throttle the scrape if it took longer than interval - by
// sleeping for the time it took longer. This will make the
// actual scrape interval increase as long as a scrape takes
// longer than the interval we are aiming for.
time.Sleep(took - interval)
// After the sleep, we should check again if we have been stopped.
select {
case <-t.scraperStopping:
return
default:
// Do nothing.
}
t.scrape(ingester)
}
}
Expand Down
31 changes: 13 additions & 18 deletions storage/local/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
)

const (
persistQueueCap = 1024
evictRequestsCap = 1024
chunkLen = 1024

Expand Down Expand Up @@ -82,6 +81,7 @@ type memorySeriesStorage struct {

persistLatency prometheus.Summary
persistErrors prometheus.Counter
persistQueueCapacity prometheus.Metric
persistQueueLength prometheus.Gauge
numSeries prometheus.Gauge
seriesOps *prometheus.CounterVec
Expand All @@ -97,6 +97,7 @@ type MemorySeriesStorageOptions struct {
MemoryChunks int // How many chunks to keep in memory.
PersistenceStoragePath string // Location of persistence files.
PersistenceRetentionPeriod time.Duration // Chunks at least that old are purged.
PersistenceQueueCapacity int // Capacity of queue for chunks to be persisted.
CheckpointInterval time.Duration // How often to checkpoint the series map and head chunks.
CheckpointDirtySeriesLimit int // How many dirty series will trigger an early checkpoint.
Dirty bool // Force the storage to consider itself dirty on startup.
Expand Down Expand Up @@ -134,7 +135,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
checkpointInterval: o.CheckpointInterval,
checkpointDirtySeriesLimit: o.CheckpointDirtySeriesLimit,

persistQueue: make(chan persistRequest, persistQueueCap),
persistQueue: make(chan persistRequest, o.PersistenceQueueCapacity),
persistStopped: make(chan struct{}),
persistence: p,

Expand All @@ -157,6 +158,14 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) (Storage, error) {
Name: "persist_errors_total",
Help: "The total number of errors while persisting chunks.",
}),
persistQueueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"),
"The total capacity of the persist queue.",
nil, nil,
),
prometheus.GaugeValue, float64(o.PersistenceQueueCapacity),
),
persistQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -837,32 +846,19 @@ func (s *memorySeriesStorage) loadChunkDescs(fp clientmodel.Fingerprint, beforeT
return s.persistence.loadChunkDescs(fp, beforeTime)
}

// To expose persistQueueCap as metric:
var (
persistQueueCapDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "persist_queue_capacity"),
"The total capacity of the persist queue.",
nil, nil,
)
persistQueueCapGauge = prometheus.MustNewConstMetric(
persistQueueCapDesc, prometheus.GaugeValue, persistQueueCap,
)
)

// Describe implements prometheus.Collector.
func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) {
s.persistence.Describe(ch)

ch <- s.persistLatency.Desc()
ch <- s.persistErrors.Desc()
ch <- s.persistQueueCapacity.Desc()
ch <- s.persistQueueLength.Desc()
ch <- s.numSeries.Desc()
s.seriesOps.Describe(ch)
ch <- s.ingestedSamplesCount.Desc()
ch <- s.invalidPreloadRequestsCount.Desc()

ch <- persistQueueCapDesc

ch <- numMemChunksDesc
}

Expand All @@ -872,14 +868,13 @@ func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) {

ch <- s.persistLatency
ch <- s.persistErrors
ch <- s.persistQueueCapacity
ch <- s.persistQueueLength
ch <- s.numSeries
s.seriesOps.Collect(ch)
ch <- s.ingestedSamplesCount
ch <- s.invalidPreloadRequestsCount

ch <- persistQueueCapGauge

count := atomic.LoadInt64(&numMemChunks)
ch <- prometheus.MustNewConstMetric(numMemChunksDesc, prometheus.GaugeValue, float64(count))
}

0 comments on commit 4e6a807

Please sign in to comment.