From 6ac6f604e4bcf29bc0f7a9a614d086cb7e7f6186 Mon Sep 17 00:00:00 2001 From: Steve Varner Date: Tue, 3 May 2016 12:45:10 -0400 Subject: [PATCH] Adding changes to support nanmean, nammin, nanmax, etc. --- bolt/spark/array.py | 105 +++++++++++++++++++ bolt/spark/statcounter.py | 209 ++++++++++++++++++++++++++++++++++---- 2 files changed, 292 insertions(+), 22 deletions(-) diff --git a/bolt/spark/array.py b/bolt/spark/array.py index d180abd..44e5165 100644 --- a/bolt/spark/array.py +++ b/bolt/spark/array.py @@ -334,6 +334,21 @@ def mean(self, axis=None, keepdims=False): """ return self._stat(axis, name='mean', keepdims=keepdims) + def nanmean(self, axis=None, keepdims=False): + """ + Return the nanmean of the array over the given axis. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanmean', keepdims=keepdims) + def var(self, axis=None, keepdims=False): """ Return the variance of the array over the given axis. @@ -349,6 +364,21 @@ def var(self, axis=None, keepdims=False): """ return self._stat(axis, name='variance', keepdims=keepdims) + def nanvar(self, axis=None, keepdims=False): + """ + Return the variance of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanvariance', keepdims=keepdims) + def std(self, axis=None, keepdims=False): """ Return the standard deviation of the array over the given axis. @@ -364,6 +394,21 @@ def std(self, axis=None, keepdims=False): """ return self._stat(axis, name='stdev', keepdims=keepdims) + def nanstd(self, axis=None, keepdims=False): + """ + Return the standard deviation of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanstdev', keepdims=keepdims) + def sum(self, axis=None, keepdims=False): """ Return the sum of the array over the given axis. @@ -380,6 +425,21 @@ def sum(self, axis=None, keepdims=False): from operator import add return self._stat(axis, func=add, keepdims=keepdims) + def nansum(self, axis=None, keepdims=False): + """ + Return the sum of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nansum', keepdims=keepdims) + def max(self, axis=None, keepdims=False): """ Return the maximum of the array over the given axis. @@ -396,6 +456,21 @@ def max(self, axis=None, keepdims=False): from numpy import maximum return self._stat(axis, func=maximum, keepdims=keepdims) + def nanmax(self, axis=None, keepdims=False): + """ + Return the maximum of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanmax', keepdims=keepdims) + def min(self, axis=None, keepdims=False): """ Return the minimum of the array over the given axis. @@ -412,6 +487,36 @@ def min(self, axis=None, keepdims=False): from numpy import minimum return self._stat(axis, func=minimum, keepdims=keepdims) + def nanmin(self, axis=None, keepdims=False): + """ + Return the minimum of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanmin', keepdims=keepdims) + + def nancount(self, axis=None, keepdims=False): + """ + Return the count of non NaN values. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nancount', keepdims=keepdims) + def concatenate(self, arry, axis=0): """ Join this array with another array. diff --git a/bolt/spark/statcounter.py b/bolt/spark/statcounter.py index 162f6eb..616e8e4 100644 --- a/bolt/spark/statcounter.py +++ b/bolt/spark/statcounter.py @@ -20,29 +20,52 @@ # This code is based on pyspark's statcounter.py and used under the ASF 2.0 license. import copy +import math from itertools import chain -from numpy import sqrt +from numpy import zeros, maximum, minimum, sqrt, isnan, fmin, fmax, shape, reshape, invert, amax, amin, nansum, dstack class StatCounter(object): REQUIRED_FOR = { + 'count': ('n',), 'mean': ('mu',), - 'sum': ('mu',), - 'variance': ('mu', 'm2'), - 'stdev': ('mu', 'm2'), - 'all': ('mu', 'm2') + 'sum': ('mu','n'), + 'min': ('minValue',), + 'max': ('maxValue',), + 'variance': ('mu', 'n', 'm2'), + 'sampleVariance': ('mu', 'n', 'm2'), + 'stdev': ('mu', 'n', 'm2'), + 'sampleStdev': ('mu', 'n', 'm2'), + 'nancount': ('n_n',), + 'nanmean': ('mu_n',), + 'nansum': ('mu_n', 'n_n'), + 'nanmin': ('minValue_n',), + 'nanmax': ('maxValue_n',), + 'nanvariance': ('mu_n', 'n_n', 'm2_n'), + 'nansampleVariance': ('mu_n', 'n_n', 'm2_n'), + 'nanstdev': ('mu_n', 'n_n', 'm2_n'), + 'nansampleStdev': ('mu_n', 'n_n', 'm2_n'), + 'all': ('n', 'mu', 'm2', 'minValue', 'maxValue', 'n_n', 'mu_n', 'm2_n', 'minValue_n', 'maxValue_n') } def __init__(self, values=(), stats='all'): - self.n = 0 - self.mu = 0.0 - self.m2 = 0.0 + self.n = 0L # Running count of our values + self.mu = 0.0 # Running mean of our values + self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) + self.maxValue = None + self.minValue = None + self.n_n = None # Running count of our values + self.mu_n = None # Running mean of our values + self.m2_n = None # Running variance numerator (sum of (x - mean)^2) + self.maxValue_n = None + self.minValue_n = None - if isinstance(stats, str): + if isinstance(stats, basestring): stats = [stats] - self.required = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats])) + + self.requiredAttrs = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats])) for v in values: self.merge(v) @@ -55,30 +78,53 @@ def merge(self, value): self.mu += delta / self.n if self.__requires('m2'): self.m2 += delta * (value - self.mu) + if self.__requires('maxValue'): + self.maxValue = maximum(self.maxValue, value) if not self.maxValue is None else value + if self.__requires('minValue'): + self.minValue = minimum(self.minValue, value) if not self.minValue is None else value + + if self.n_n is None: + #Create the initial counter and set it to zeros + self.n_n = zeros(value.shape) + self.mu_n = zeros(value.shape) + self.m2_n = zeros(value.shape) + + self.n_n += ~isnan(value) + if self.__requires('mu_n'): + delta = value - self.mu_n + delta[isnan(value)] = 0 + self.mu_n = nansum(dstack((self.mu_n, (delta / self.n_n))),axis=2) + if self.__requires('m2_n'): + #Since value can have nans - replace with zeros + tmpVal = value; + tmpVal[isnan(tmpVal)] = 0 + self.m2_n += delta * (tmpVal - self.mu_n) + if self.__requires('maxValue_n'): + self.maxValue_n = fmax(self.maxValue_n, value) if not self.maxValue_n is None else value + if self.__requires('minValue_n'): + self.minValue_n = fmin(self.minValue_n, value) if not self.minValue_n is None else value return self # checks whether the passed attribute name is required to be updated in order to support the # statistics requested in self.requested def __requires(self, attrname): - return attrname in self.required + return attrname in self.requiredAttrs # merge another StatCounter into this one, adding up the statistics def combine(self, other): if not isinstance(other, StatCounter): - raise Exception("can only merge StatCounters!") + raise Exception("Can only merge Statcounters!") - # reference equality holds - if other is self: - # avoid overwriting fields in a weird order - self.merge(copy.deepcopy(other)) + if other is self: # reference equality holds + self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order else: - # accumulator should only be updated if it's valid in both statcounters - self.required = set(self.required).intersection(set(other.required)) + # accumulator should only be updated if it's valid in both statcounters: + self.requiredAttrs = set(self.requiredAttrs).intersection(set(other.requiredAttrs)) if self.n == 0: self.n = other.n - for attrname in ('mu', 'm2'): + for attrname in ('mu', 'm2', 'maxValue', 'minValue', 'n_n', 'mu_n', 'm2_n', 'maxValue_n', 'minValue_n'): if self.__requires(attrname): setattr(self, attrname, getattr(other, attrname)) @@ -95,16 +141,50 @@ def combine(self, other): if self.__requires('m2'): self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) + if self.__requires('maxValue'): + self.maxValue = maximum(self.maxValue, other.maxValue) + if self.__requires('minValue'): + self.minValue = minimum(self.minValue, other.minValue) + self.n += other.n + + if self.__requires('mu_n'): + delta = other.mu_n - self.mu_n + self.mu_n = (self.mu_n * self.n_n + other.mu_n * other.n_n) / (self.n_n + other.n_n) + + #Set areas with no data to zero + self.mu_n[isnan(self.mu_n)] = 0 + + + if self.__requires('m2_n'): + tmpAdd = (delta * delta * self.n_n * other.n_n) / (self.n_n + other.n_n) + tmpAdd[isnan(tmpAdd)] = 0 + self.m2_n += other.m2_n + tmpAdd + + if self.__requires('maxValue_n'): + self.maxValue_n = fmax(self.maxValue_n, other.maxValue_n) + if self.__requires('minValue_n'): + self.minValue_n = fmin(self.minValue_n, other.minValue_n) + + self.n_n += other.n_n + + + return self - def count(self): - return self.n + # Clone this StatCounter + def copy(self): + return copy.deepcopy(self) + def __isavail(self, attrname): - if not all(attr in self.required for attr in StatCounter.REQUIRED_FOR[attrname]): + if not all(attr in self.requiredAttrs for attr in StatCounter.REQUIRED_FOR[attrname]): raise ValueError("'%s' stat not available, must be requested at " "StatCounter instantiation" % attrname) + @property + def count(self): + self.__isavail('count') + return self.n @property def mean(self): @@ -116,6 +196,17 @@ def sum(self): self.__isavail('sum') return self.n * self.mu + @property + def min(self): + self.__isavail('min') + return self.minValue + + @property + def max(self): + self.__isavail('max') + return self.maxValue + + # Return the variance of the values. @property def variance(self): self.__isavail('variance') @@ -128,3 +219,77 @@ def variance(self): def stdev(self): self.__isavail('stdev') return sqrt(self.variance) + + # + # Return the sample standard deviation of the values, which corrects for bias in estimating the + # variance by dividing by N-1 instead of N. + # + @property + def sampleStdev(self): + self.__isavail('sampleStdev') + return sqrt(self.sampleVariance) + + @property + def nancount(self): + self.__isavail('nancount') + return self.n_n + + @property + def nanmean(self): + self.__isavail('nanmean') + return self.mu_n + + @property + def nansum(self): + self.__isavail('nansum') + return self.n_n * self.mu_n + + @property + def nanmin(self): + self.__isavail('nanmin') + return self.minValue_n + + @property + def nanmax(self): + self.__isavail('nanmax') + return self.maxValue_n + + # Return the variance of the values. + @property + def nanvariance(self): + self.__isavail('nanvariance') + tmpVar = self.m2_n / self.n_n + #set areas with no data to zero + tmpVar[isnan(tmpVar)] = 0 + return tmpVar + + # + # Return the sample variance, which corrects for bias in estimating the variance by dividing + # by N-1 instead of N. + # + @property + def nansampleVariance(self): + self.__isavail('nansampleVariance') + tmpVar = self.m2_n / (self.n_n - 1) + #set areas with no data to zero + tmpVar[isnan(tmpVar)] = 0 + return tmpVar + + # Return the standard deviation of the values. + @property + def nanstdev(self): + self.__isavail('nanstdev') + return sqrt(self.nanvariance) + + # + # Return the sample standard deviation of the values, which corrects for bias in estimating the + # variance by dividing by N-1 instead of N. + # + @property + def nansampleStdev(self): + self.__isavail('nansampleStdev') + return sqrt(self.nansampleVariance) + + def __repr__(self): + return ("(count: %s, mean: %s, stdev: %s, max: %s, min: %s, required: %s, nancount: %s, nanmean: %s, nanstdev: %s, nanmax: %s, nanmin: %s)" % + (self.count(), self.mean(), self.stdev(), self.max(), self.min(), str(tuple(self.requiredAttrs)), self.nancount(), self.nanmean(), self.nanstdev(), self.nanmax(), self.nanmin()))