diff --git a/lib/carbon/cache.py b/lib/carbon/cache.py index cf8b7d1e..72ccf6cf 100644 --- a/lib/carbon/cache.py +++ b/lib/carbon/cache.py @@ -16,7 +16,7 @@ import threading from operator import itemgetter from random import choice -from collections import defaultdict +from collections import defaultdict, deque from carbon.conf import settings from carbon import events, log @@ -189,6 +189,7 @@ class _MetricCache(defaultdict): def __init__(self, strategy=None): self.lock = threading.Lock() self.size = 0 + self.new_metrics = deque() self.strategy = None if strategy: self.strategy = strategy(self) @@ -253,6 +254,8 @@ def store(self, metric, datapoint): log.msg("MetricCache is full: self.size=%d" % self.size) events.cacheFull() else: + if not self[metric]: + self.new_metrics.append(metric) self.size += 1 self[metric][timestamp] = value if self.strategy: diff --git a/lib/carbon/util.py b/lib/carbon/util.py index 2b670824..a7eade8d 100644 --- a/lib/carbon/util.py +++ b/lib/carbon/util.py @@ -287,7 +287,7 @@ def drain(self, cost, blocking=False): '''Given a number of tokens (or fractions) drain will return True and drain the number of tokens from the bucket if the capacity allows, otherwise we return false and leave the contents of the bucket.''' - if cost <= self.tokens: + if self.peek(cost): self._tokens -= cost return True @@ -310,16 +310,16 @@ def setCapacityAndFillRate(self, new_capacity, new_fill_rate): self.fill_rate = float(new_fill_rate) self._tokens = delta + self._tokens - @property - def tokens(self): - '''The tokens property will return the current number of tokens in the - bucket.''' - if self._tokens < self.capacity: + def peek(self, cost): + '''Return true if the bucket can drain cost without blocking.''' + if self._tokens >= cost: + return True + else: now = time() delta = self.fill_rate * (now - self.timestamp) self._tokens = min(self.capacity, self._tokens + delta) self.timestamp = now - return self._tokens + return self._tokens >= cost class PluginRegistrar(type): diff --git a/lib/carbon/writer.py b/lib/carbon/writer.py index 9161d95a..adcc0a01 100644 --- a/lib/carbon/writer.py +++ b/lib/carbon/writer.py @@ -95,24 +95,25 @@ def writeCachedDataPoints(): cache = MetricCache() while cache: - (metric, datapoints) = cache.drain_metric() - if metric is None: - # end the loop - break + # First, create new metrics files, which is helpful for graphite-web + while cache.new_metrics and (not CREATE_BUCKET or CREATE_BUCKET.peek(1)): + metric = cache.new_metrics.popleft() - dbFileExists = state.database.exists(metric) + if metric not in cache: + # This metric has already been drained. There's no sense in creating it. + continue - if not dbFileExists: - if CREATE_BUCKET and not CREATE_BUCKET.drain(1): - # If our tokenbucket doesn't have enough tokens available to create a new metric - # file then we'll just drop the metric on the ground and move on to the next - # metric. - # XXX This behavior should probably be configurable to no tdrop metrics - # when rate limiting unless our cache is too big or some other legit - # reason. - instrumentation.increment('droppedCreates') + if state.database.exists(metric): continue + if CREATE_BUCKET and not CREATE_BUCKET.drain(1): + # This should never actually happen as no other thread should be + # draining our tokens, and we just checked for a token. + # Just put the new metric back in the create list and we'll try again + # after writing an update. + cache.new_metrics.appendleft(metric) + break + archiveConfig = None xFilesFactor, aggregationMethod = None, None @@ -150,6 +151,18 @@ def writeCachedDataPoints(): instrumentation.increment('errors') continue + # now drain and persist some data + (metric, datapoints) = cache.drain_metric() + if metric is None: + # end the loop + break + + if not state.database.exists(metric): + # If we get here, the metric must still be in new_metrics. We're + # creating too fast, and we'll drop this data. + instrumentation.increment('droppedCreates') + continue + # If we've got a rate limit configured lets makes sure we enforce it waitTime = 0 if UPDATE_BUCKET: