forked from pmelchior/pygmmis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpygmmis.py
1425 lines (1196 loc) · 56.7 KB
/
pygmmis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from __future__ import division
import numpy as np
import scipy.special, scipy.stats
import ctypes
import logging
logger = logging.getLogger("pygmmis")
# set up multiprocessing
import multiprocessing
import parmap
def createShared(a, dtype=ctypes.c_double):
"""Create a shared array to be used for multiprocessing's processes.
Taken from http://stackoverflow.com/questions/5549190/
Works only for float, double, int, long types (e.g. no bool).
Args:
numpy array, arbitrary shape
Returns:
numpy array whose container is a multiprocessing.Array
"""
shared_array_base = multiprocessing.Array(dtype, a.size)
shared_array = np.ctypeslib.as_array(shared_array_base.get_obj())
shared_array[:] = a.flatten()
shared_array = shared_array.reshape(a.shape)
return shared_array
# this is to allow multiprocessing pools to operate on class methods:
# https://gist.github.com/bnyeggen/1086393
def _pickle_method(method):
func_name = method.__func__.__name__ # func_name = method.im_func.__name__
obj = method.__self__ # obj = method.im_self
cls = method.__self__.__class__# cls = method.im_class
if func_name.startswith('__') and not func_name.endswith('__'): #deal with mangled names
cls_name = cls.__name__.lstrip('_')
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.__mro__:
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import types
# python 2 -> 3 adjustments
try:
import copy_reg
except ImportError:
import copyreg as copy_reg
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
try:
xrange
except NameError:
xrange = range
# Blantant copy from Erin Sheldon's esutil
# https://github.com/esheldon/esutil/blob/master/esutil/numpy_util.py
def match1d(arr1input, arr2input, presorted=False):
"""
NAME:
match
CALLING SEQUENCE:
ind1,ind2 = match(arr1, arr2, presorted=False)
PURPOSE:
Match two numpy arrays. Return the indices of the matches or empty
arrays if no matches are found. This means arr1[ind1] == arr2[ind2] is
true for all corresponding pairs. arr1 must contain only unique
inputs, but arr2 may be non-unique.
If you know arr1 is sorted, set presorted=True and it will run
even faster
METHOD:
uses searchsorted with some sugar. Much faster than old version
based on IDL code.
REVISION HISTORY:
Created 2015, Eli Rykoff, SLAC.
"""
# make sure 1D
arr1 = np.array(arr1input, ndmin=1, copy=False)
arr2 = np.array(arr2input, ndmin=1, copy=False)
# check for integer data...
if (not issubclass(arr1.dtype.type,np.integer) or
not issubclass(arr2.dtype.type,np.integer)) :
mess="Error: only works with integer types, got %s %s"
mess = mess % (arr1.dtype.type,arr2.dtype.type)
raise ValueError(mess)
if (arr1.size == 0) or (arr2.size == 0) :
mess="Error: arr1 and arr2 must each be non-zero length"
raise ValueError(mess)
# make sure that arr1 has unique values...
test=np.unique(arr1)
if test.size != arr1.size:
raise ValueError("Error: the arr1input must be unique")
# sort arr1 if not presorted
if not presorted:
st1 = np.argsort(arr1)
else:
st1 = None
# search the sorted array
sub1=np.searchsorted(arr1,arr2,sorter=st1)
# check for out-of-bounds at the high end if necessary
if (arr2.max() > arr1.max()) :
bad,=np.where(sub1 == arr1.size)
sub1[bad] = arr1.size-1
if not presorted:
sub2,=np.where(arr1[st1[sub1]] == arr2)
sub1=st1[sub1[sub2]]
else:
sub2,=np.where(arr1[sub1] == arr2)
sub1=sub1[sub2]
return sub1,sub2
def logsum(logX, axis=0):
"""Computes log of the sum along give axis from the log of the summands.
This method tries hard to avoid over- or underflow.
See appendix A of Bovy, Hogg, Roweis (2009).
Args:
logX: numpy array of logarithmic summands
axis (int): axis to sum over
Returns:
log of the sum, shortened by one axis
Throws:
ValueError if logX has length 0 along given axis
"""
floatinfo = np.finfo(logX.dtype)
underflow = np.log(floatinfo.tiny) - logX.min(axis=axis)
overflow = np.log(floatinfo.max) - logX.max(axis=axis) - np.log(logX.shape[axis])
c = np.where(underflow < overflow, underflow, overflow)
# adjust the shape of c for addition with logX
c_shape = [slice(None) for i in xrange(len(logX.shape))]
c_shape[axis] = None
return np.log(np.exp(logX + c[tuple(c_shape)]).sum(axis=axis)) - c
def chi2_cutoff(D, cutoff=3.):
"""D-dimensional eqiuvalent of "n sigma" cut.
Evaluates the quantile function of the chi-squared distribution to determine
the limit for the chi^2 of samples wrt to GMM so that they satisfy the
68-95-99.7 percent rule of the 1D Normal distribution.
Args:
D (int): dimensions of the feature space
cutoff (float): 1D equivalent cut [in units of sigma]
Returns:
float: upper limit for chi-squared in D dimensions
"""
cdf_1d = scipy.stats.norm.cdf(cutoff)
confidence_1d = 1-(1-cdf_1d)*2
cutoff_nd = scipy.stats.chi2.ppf(confidence_1d, D)
return cutoff_nd
def covar_callback_default(coords, default=None):
N,D = coords.shape
if default.shape != (D,D):
raise RuntimeError("covar_callback received improper default covariance %r" % default)
# no need to copy since a single covariance matrix is sufficient
# return np.tile(default, (N,1,1))
return default
class GMM(object):
"""Gaussian mixture model with K components in D dimensions.
Attributes:
amp: numpy array (K,), component amplitudes
mean: numpy array (K,D), component means
covar: numpy array (K,D,D), component covariances
"""
def __init__(self, K=0, D=0):
"""Create the arrays for amp, mean, covar."""
self.amp = np.zeros((K))
self.mean = np.empty((K,D))
self.covar = np.empty((K,D,D))
@property
def K(self):
"""int: number of components, depends on size of amp."""
return self.amp.size
@property
def D(self):
"""int: dimensions of the feature space."""
return self.mean.shape[1]
def save(self, filename, **kwargs):
"""Save GMM to file.
Args:
filename (str): name for saved file, should end on .npz as the default
of numpy.savez(), which is called here
kwargs: dictionary of additional information to be stored in file.
Returns:
None
"""
np.savez(filename, amp=self.amp, mean=self.mean, covar=self.covar, **kwargs)
def load(self, filename):
"""Load GMM from file.
Additional arguments stored by save() will be ignored.
Args:
filename (str): name for file create with save().
Returns:
None
"""
F = np.load(filename)
self.amp = F["amp"]
self.mean = F["mean"]
self.covar = F["covar"]
F.close()
@staticmethod
def from_file(filename):
"""Load GMM from file.
Additional arguments stored by save() will be ignored.
Args:
filename (str): name for file create with save().
Returns:
GMM
"""
gmm = GMM()
gmm.load(filename)
return gmm
def draw(self, size=1, rng=np.random):
"""Draw samples from the GMM.
Args:
size (int): number of samples to draw
rng: numpy.random.RandomState for deterministic draw
Returns:
numpy array (size,D)
"""
# draw indices for components given amplitudes, need to make sure: sum=1
ind = rng.choice(self.K, size=size, p=self.amp/self.amp.sum())
N = np.bincount(ind, minlength=self.K)
# for each component: draw as many points as in ind from a normal
samples = np.empty((size, self.D))
lower = 0
for k in np.flatnonzero(N):
upper = lower + N[k]
samples[lower:upper, :] = rng.multivariate_normal(self.mean[k], self.covar[k], size=N[k])
lower = upper
return samples
def __call__(self, coords, covar=None, as_log=False):
"""Evaluate model PDF at given coordinates.
see logL() for details.
Args:
coords: numpy array (D,) or (N, D) of test coordinates
covar: numpy array (D, D) or (N, D, D) covariance matrix of coords
as_log (bool): return log(p) instead p
Returns:
numpy array (1,) or (N, 1) of PDF (or its log)
"""
if as_log:
return self.logL(coords, covar=covar)
else:
return np.exp(self.logL(coords, covar=covar))
def _mp_chunksize(self):
# find how many components to distribute over available threads
cpu_count = multiprocessing.cpu_count()
chunksize = max(1, self.K//cpu_count)
n_chunks = min(cpu_count, self.K//chunksize)
return n_chunks, chunksize
def _get_chunks(self):
# split all component in ideal-sized chunks
n_chunks, chunksize = self._mp_chunksize()
left = self.K - n_chunks*chunksize
chunks = []
n = 0
for i in xrange(n_chunks):
n_ = n + chunksize
if left > i:
n_ += 1
chunks.append((n, n_))
n = n_
return chunks
def logL(self, coords, covar=None):
"""Log-likelihood of coords given all (i.e. the sum of) GMM components
Distributes computation over all threads on the machine.
If covar is None, this method returns
log(sum_k(p(x | k)))
of the data values x. If covar is set, the method returns
log(sum_k(p(y | k))),
where y = x + noise and noise ~ N(0, covar).
Args:
coords: numpy array (D,) or (N, D) of test coordinates
covar: numpy array (D, D) or (N, D, D) covariance matrix of coords
Returns:
numpy array (1,) or (N, 1) log(L), depending on shape of data
"""
# Instead log p (x | k) for each k (which is huge)
# compute it in stages: first for each chunk, then sum over all chunks
pool = multiprocessing.Pool()
chunks = self._get_chunks()
results = [pool.apply_async(self._logsum_chunk, (chunk, coords, covar)) for chunk in chunks]
log_p_y_chunk = []
for r in results:
log_p_y_chunk.append(r.get())
pool.close()
pool.join()
return logsum(np.array(log_p_y_chunk)) # sum over all chunks = all k
def _logsum_chunk(self, chunk, coords, covar=None):
# helper function to reduce the memory requirement of logL
log_p_y_k = np.empty((chunk[1]-chunk[0], len(coords)))
for i in xrange(chunk[1] - chunk[0]):
k = chunk[0] + i
log_p_y_k[i,:] = self.logL_k(k, coords, covar=covar)
return logsum(log_p_y_k)
def logL_k(self, k, coords, covar=None, chi2_only=False):
"""Log-likelihood of coords given only component k.
Args:
k (int): component index
coords: numpy array (D,) or (N, D) of test coordinates
covar: numpy array (D, D) or (N, D, D) covariance matrix of coords
chi2_only (bool): only compute deltaX^T Sigma_k^-1 deltaX
Returns:
numpy array (1,) or (N, 1) log(L), depending on shape of data
"""
# compute p(x | k)
dx = coords - self.mean[k]
if covar is None:
T_k = self.covar[k]
else:
T_k = self.covar[k] + covar
chi2 = np.einsum('...i,...ij,...j', dx, np.linalg.inv(T_k), dx)
if chi2_only:
return chi2
# prevent tiny negative determinants to mess up
(sign, logdet) = np.linalg.slogdet(T_k)
log2piD2 = np.log(2*np.pi)*(0.5*self.D)
return np.log(self.amp[k]) - log2piD2 - sign*logdet/2 - chi2/2
class Background(object):
"""Background object to be used in conjuction with GMM.
For a normalizable uniform distribution, a support footprint must be set.
It should be sufficiently large to explain all non-clusters samples.
Attributes:
amp (float): mixing amplitude
footprint: numpy array, (D,2) of rectangular volume
adjust_amp (bool): whether amp will be adjusted as part of the fit
amp_max (float): maximum value of amp allowed if adjust_amp=True
"""
def __init__(self, footprint, amp=0):
"""Initialize Background with a footprint.
Args:
footprint: numpy array, (D,2) of rectangular volume
Returns:
None
"""
self.amp = amp
self.footprint = footprint
self.adjust_amp = True
self.amp_max = 1
self.amp_min = 0
@property
def p(self):
"""Probability of the background model.
Returns:
float, equal to 1/volume, where volume is given by footprint.
"""
volume = np.prod(self.footprint[1] - self.footprint[0])
return 1/volume
def draw(self, size=1, rng=np.random):
"""Draw samples from uniform background.
Args:
size (int): number of samples to draw
rng: numpy.random.RandomState for deterministic draw
Returns:
numpy array (size, D)
"""
dx = self.footprint[1] - self.footprint[0]
return self.footprint[0] + dx*rng.rand(size,len(self.footprint[0]))
############################
# Begin of fit functions
############################
def initFromDataMinMax(gmm, data, covar=None, s=None, k=None, rng=np.random):
"""Initialization callback for uniform random component means.
Component amplitudes are set at 1/gmm.K, covariances are set to
s**2*np.eye(D), and means are distributed randomly over the range that is
covered by data.
If s is not given, it will be set such that the volume of all components
completely fills the space covered by data.
Args:
gmm: A GMM to be initialized
data: numpy array (N,D) to define the range of the component means
covar: ignored in this callback
s (float): if set, sets component variances
k (iterable): list of components to set, is None sets all components
rng: numpy.random.RandomState for deterministic behavior
Returns:
None
"""
if k is None:
k = slice(None)
gmm.amp[k] = 1/gmm.K
# set model to random positions with equally sized spheres within
# volumne spanned by data
min_pos = data.min(axis=0)
max_pos = data.max(axis=0)
gmm.mean[k,:] = min_pos + (max_pos-min_pos)*rng.rand(gmm.K, gmm.D)
# if s is not set: use volume filling argument:
# K spheres of radius s [having volume s^D * pi^D/2 / gamma(D/2+1)]
# should completely fill the volume spanned by data.
if s is None:
vol_data = np.prod(max_pos-min_pos)
s = (vol_data / gmm.K * scipy.special.gamma(gmm.D*0.5 + 1))**(1/gmm.D) / np.sqrt(np.pi)
logger.info("initializing spheres with s=%.2f in data domain" % s)
gmm.covar[k,:,:] = s**2 * np.eye(data.shape[1])
def initFromDataAtRandom(gmm, data, covar=None, s=None, k=None, rng=np.random):
"""Initialization callback for component means to follow data on scales > s.
Component amplitudes are set to 1/gmm.K, covariances are set to
s**2*np.eye(D). For each mean, a data sample is selected at random, and a
multivariant Gaussian offset is added, whose variance is given by s**2.
If s is not given, it will be set such that the volume of all components
completely fills the space covered by data.
Args:
gmm: A GMM to be initialized
data: numpy array (N,D) to define the range of the component means
covar: ignored in this callback
s (float): if set, sets component variances
k (iterable): list of components to set, is None sets all components
rng: numpy.random.RandomState for deterministic behavior
Returns:
None
"""
if k is None:
k = slice(None)
k_len = gmm.K
else:
try:
k_len = len(gmm.amp[k])
except TypeError:
k_len = 1
gmm.amp[k] = 1/gmm.K
# initialize components around data points with uncertainty s
refs = rng.randint(0, len(data), size=k_len)
D = data.shape[1]
if s is None:
min_pos = data.min(axis=0)
max_pos = data.max(axis=0)
vol_data = np.prod(max_pos-min_pos)
s = (vol_data / gmm.K * scipy.special.gamma(gmm.D*0.5 + 1))**(1/gmm.D) / np.sqrt(np.pi)
logger.info("initializing spheres with s=%.2f near data points" % s)
gmm.mean[k,:] = data[refs] + rng.multivariate_normal(np.zeros(D), s**2 * np.eye(D), size=k_len)
gmm.covar[k,:,:] = s**2 * np.eye(data.shape[1])
def initFromKMeans(gmm, data, covar=None, rng=np.random):
"""Initialization callback from a k-means clustering run.
See Algorithm 1 from Bloemer & Bujna (arXiv:1312.5946)
NOTE: The result of this call are not deterministic even if rng is set
because scipy.cluster.vq.kmeans2 uses its own initialization.
Args:
gmm: A GMM to be initialized
data: numpy array (N,D) to define the range of the component means
covar: ignored in this callback
rng: numpy.random.RandomState for deterministic behavior
Returns:
None
"""
from scipy.cluster.vq import kmeans2
center, label = kmeans2(data, gmm.K)
for k in xrange(gmm.K):
mask = (label == k)
gmm.amp[k] = mask.sum() / len(data)
gmm.mean[k,:] = data[mask].mean(axis=0)
d_m = data[mask] - gmm.mean[k]
# funny way of saying: for each point i, do the outer product
# of d_m with its transpose and sum over i
gmm.covar[k,:,:] = (d_m[:, :, None] * d_m[:, None, :]).sum(axis=0) / len(data)
def fit(gmm, data, covar=None, R=None, init_method='random', w=0., cutoff=None, sel_callback=None, oversampling=10, covar_callback=None, background=None, tol=1e-3, miniter=1, maxiter=1000, frozen=None, split_n_merge=False, rng=np.random):
"""Fit GMM to data.
If given, init_callback is called to set up the GMM components. Then, the
EM sequence is repeated until the mean log-likelihood converges within tol.
Args:
gmm: an instance if GMM
data: numpy array (N,D)
covar: sample noise covariance; numpy array (N,D,D) or (D,D) if i.i.d.
R: sample projection matrix; numpy array (N,D,D)
init_method (string): one of ['random', 'minmax', 'kmeans', 'none']
defines the method to initialize the GMM components
w (float): minimum covariance regularization
cutoff (float): size of component neighborhood [in 1D equivalent sigmas]
sel_callback: completeness callback to generate imputation samples.
oversampling (int): number of imputation samples per data sample.
only used if sel_callback is set.
value of 1 is fine but results are noisy. Set as high as feasible.
covar_callback: covariance callback for imputation samples.
needs to be present if sel_callback and covar are set.
background: an instance of Background if simultaneous fitting is desired
tol (float): tolerance for covergence of mean log-likelihood
maxiter (int): maximum number of iterations of EM
frozen (iterable or dict): index list of components that are not updated
split_n_merge (int): number of split & merge attempts
rng: numpy.random.RandomState for deterministic behavior
Notes:
If frozen is a simple list, it will be assumed that is applies to mean
and covariance of the specified components. It can also be a dictionary
with the keys "mean" and "covar" to specify them separately.
In either case, amplitudes will be updated to reflect any changes made.
If frozen["amp"] is set, it will use this list instead.
Returns:
mean log-likelihood (float), component neighborhoods (list of ints)
Throws:
RuntimeError for inconsistent argument combinations
"""
N = len(data)
# if there are data (features) missing, i.e. masked as np.nan, set them to zeros
# and create/set covariance elements to very large value to reduce its weight
# to effectively zero
missing = np.isnan(data)
if missing.any():
data_ = createShared(data.copy())
data_[missing] = 0 # value does not matter as long as it's not nan
if covar is None:
covar = np.zeros((gmm.D, gmm.D))
# need to create covar_callback if imputation is requested
if sel_callback is not None:
from functools import partial
covar_callback = partial(covar_callback_default, default=np.zeros((gmm.D, gmm.D)))
if covar.shape == (gmm.D, gmm.D):
covar_ = createShared(np.tile(covar, (N,1,1)))
else:
covar_ = createShared(covar.copy())
large = 1e10
for d in range(gmm.D):
covar_[missing[:,d],d,d] += large
covar_[missing[:,d],d,d] += large
else:
data_ = createShared(data.copy())
if covar is None or covar.shape == (gmm.D, gmm.D):
covar_ = covar
else:
covar_ = createShared(covar.copy())
# init components
if init_method.lower() not in ['random', 'minmax', 'kmeans', 'none']:
raise NotImplementedError("init_mehod %s not in ['random', 'minmax', 'kmeans', 'none']" % init_method)
if init_method.lower() == 'random':
initFromDataAtRandom(gmm, data_, covar=covar_, rng=rng)
if init_method.lower() == 'minmax':
initFromDataMinMax(gmm, data_, covar=covar_, rng=rng)
if init_method.lower() == 'kmeans':
initFromKMeans(gmm, data_, covar=covar_, rng=rng)
# test if callbacks are consistent
if sel_callback is not None and covar is not None and covar_callback is None:
raise NotImplementedError("covar is set, but covar_callback is None: imputation samples inconsistent")
# set up pool
pool = multiprocessing.Pool()
n_chunks, chunksize = gmm._mp_chunksize()
# containers
# precautions for cases when some points are treated as outliers
# and not considered as belonging to any component
log_S = createShared(np.zeros(N)) # S = sum_k p(x|k)
log_p = [[] for k in xrange(gmm.K)] # P = p(x|k) for x in U[k]
T_inv = [None for k in xrange(gmm.K)] # T = covar(x) + gmm.covar[k]
U = [None for k in xrange(gmm.K)] # U = {x close to k}
p_bg = None
if background is not None:
gmm.amp *= 1 - background.amp # GMM amp + BG amp = 1
p_bg = [None] # p_bg = p(x|BG), no log because values are larger
if covar is not None:
# check if covar is diagonal and issue warning if not
mess = "background model will only consider diagonal elements of covar"
nondiag = ~np.eye(gmm.D, dtype='bool')
if covar.shape == (gmm.D, gmm.D):
if (covar[nondiag] != 0).any():
logger.warning(mess)
else:
if (covar[np.tile(nondiag,(N,1,1))] != 0).any():
logger.warning(mess)
# check if all component parameters can be changed
changeable = {"amp": slice(None), "mean": slice(None), "covar": slice(None)}
if frozen is not None:
if all(isinstance(item, int) for item in frozen):
changeable['amp'] = changeable['mean'] = changeable['covar'] = np.in1d(xrange(gmm.K), frozen, assume_unique=True, invert=True)
elif hasattr(frozen, 'keys') and np.in1d(["amp","mean","covar"], tuple(frozen.keys()), assume_unique=True).any():
if "amp" in frozen.keys():
changeable['amp'] = np.in1d(xrange(gmm.K), frozen['amp'], assume_unique=True, invert=True)
if "mean" in frozen.keys():
changeable['mean'] = np.in1d(xrange(gmm.K), frozen['mean'], assume_unique=True, invert=True)
if "covar" in frozen.keys():
changeable['covar'] = np.in1d(xrange(gmm.K), frozen['covar'], assume_unique=True, invert=True)
else:
raise NotImplementedError("frozen should be list of indices or dictionary with keys in ['amp','mean','covar']")
try:
log_L, N, N2 = _EM(gmm, log_p, U, T_inv, log_S, data_, covar=covar_, R=R, sel_callback=sel_callback, oversampling=oversampling, covar_callback=covar_callback, w=w, pool=pool, chunksize=chunksize, cutoff=cutoff, background=background, p_bg=p_bg, changeable=changeable, miniter=miniter, maxiter=maxiter, tol=tol, rng=rng)
except Exception:
# cleanup
pool.close()
pool.join()
del data_, covar_, log_S
raise
# should we try to improve by split'n'merge of components?
# if so, keep backup copy
gmm_ = None
if frozen is not None and split_n_merge:
logger.warning("forgoing split'n'merge because some components are frozen")
else:
while split_n_merge and gmm.K >= 3:
if gmm_ is None:
gmm_ = GMM(gmm.K, gmm.D)
gmm_.amp[:] = gmm.amp[:]
gmm_.mean[:] = gmm.mean[:,:]
gmm_.covar[:,:,:] = gmm.covar[:,:,:]
U_ = [U[k].copy() for k in xrange(gmm.K)]
changing, cleanup = _findSNMComponents(gmm, U, log_p, log_S, N+N2, pool=pool, chunksize=chunksize)
logger.info("merging %d and %d, splitting %d" % tuple(changing))
# modify components
_update_snm(gmm, changing, U, N+N2, cleanup)
# run partial EM on changeable components
# NOTE: for a partial run, we'd only need the change to Log_S from the
# changeable components. However, the neighborhoods can change from _update_snm
# or because they move, so that operation is ill-defined.
# Thus, we'll always run a full E-step, which is pretty cheap for
# converged neighborhood.
# The M-step could in principle be run on the changeable components only,
# but there seem to be side effects in what I've tried.
# Similar to the E-step, the imputation step needs to be run on all
# components, otherwise the contribution of the changeable ones to the mixture
# would be over-estimated.
# Effectively, partial runs are as expensive as full runs.
changeable['amp'] = changeable['mean'] = changeable['covar'] = np.in1d(xrange(gmm.K), changing, assume_unique=True)
log_L_, N_, N2_ = _EM(gmm, log_p, U, T_inv, log_S, data_, covar=covar_, R=R, sel_callback=sel_callback, oversampling=oversampling, covar_callback=covar_callback, w=w, pool=pool, chunksize=chunksize, cutoff=cutoff, background=background, p_bg=p_bg, maxiter=maxiter, tol=tol, prefix="SNM_P", changeable=changeable, rng=rng)
changeable['amp'] = changeable['mean'] = changeable['covar'] = slice(None)
log_L_, N_, N2_ = _EM(gmm, log_p, U, T_inv, log_S, data_, covar=covar_, R=R, sel_callback=sel_callback, oversampling=oversampling, covar_callback=covar_callback, w=w, pool=pool, chunksize=chunksize, cutoff=cutoff, background=background, p_bg=p_bg, maxiter=maxiter, tol=tol, prefix="SNM_F", changeable=changeable, rng=rng)
if log_L >= log_L_:
# revert to backup
gmm.amp[:] = gmm_.amp[:]
gmm.mean[:] = gmm_.mean[:,:]
gmm.covar[:,:,:] = gmm_.covar[:,:,:]
U = U_
logger.info ("split'n'merge likelihood decreased: reverting to previous model")
break
log_L = log_L_
split_n_merge -= 1
pool.close()
pool.join()
del data_, covar_, log_S
return log_L, U
# run EM sequence
def _EM(gmm, log_p, U, T_inv, log_S, data, covar=None, R=None, sel_callback=None, oversampling=10, covar_callback=None, background=None, p_bg=None, w=0, pool=None, chunksize=1, cutoff=None, miniter=1, maxiter=1000, tol=1e-3, prefix="", changeable=None, rng=np.random):
# compute effective cutoff for chi2 in D dimensions
if cutoff is not None:
# note: subsequently the cutoff parameter, e.g. in _E(), refers to this:
# chi2 < cutoff,
# while in fit() it means e.g. "cut at 3 sigma".
# These differing conventions need to be documented well.
cutoff_nd = chi2_cutoff(gmm.D, cutoff=cutoff)
# store chi2 cutoff for component shifts, use 0.5 sigma
shift_cutoff = chi2_cutoff(gmm.D, cutoff=min(0.1, cutoff/2))
else:
cutoff_nd = None
shift_cutoff = chi2_cutoff(gmm.D, cutoff=0.1)
if sel_callback is not None:
omega = createShared(sel_callback(data).astype("float"))
if np.any(omega == 0):
logger.warning("Selection probability Omega = 0 for an observed sample.")
logger.warning("Selection callback likely incorrect! Bad things will happen!")
else:
omega = None
it = 0
header = "ITER\tSAMPLES"
if sel_callback is not None:
header += "\tIMPUTED\tORIG"
if background is not None:
header += "\tBG_AMP"
header += "\tLOG_L\tSTABLE"
logger.info(header)
# save backup
gmm_ = GMM(gmm.K, gmm.D)
gmm_.amp[:] = gmm.amp[:]
gmm_.mean[:,:] = gmm.mean[:,:]
gmm_.covar[:,:,:] = gmm.covar[:,:,:]
N0 = len(data) # size of original (unobscured) data set (signal and background)
N2 = 0 # size of imputed signal sample
if background is not None:
bg_amp_ = background.amp
while it < maxiter: # limit loop in case of slow convergence
log_L_, N, N2_, N0_ = _EMstep(gmm, log_p, U, T_inv, log_S, N0, data, covar=covar, R=R, sel_callback=sel_callback, omega=omega, oversampling=oversampling, covar_callback=covar_callback, background=background, p_bg=p_bg , w=w, pool=pool, chunksize=chunksize, cutoff=cutoff_nd, tol=tol, changeable=changeable, it=it, rng=rng)
# check if component has moved by more than sigma/2
shift2 = np.einsum('...i,...ij,...j', gmm.mean - gmm_.mean, np.linalg.inv(gmm_.covar), gmm.mean - gmm_.mean)
moved = np.flatnonzero(shift2 > shift_cutoff)
status_mess = "%s%d\t%d" % (prefix, it, N)
if sel_callback is not None:
status_mess += "\t%.2f\t%.2f" % (N2_, N0_)
if background is not None:
status_mess += "\t%.3f" % bg_amp_
status_mess += "\t%.3f\t%d" % (log_L_, gmm.K - moved.size)
logger.info(status_mess)
# convergence tests
if it > miniter:
if sel_callback is None:
if np.abs(log_L_ - log_L) < tol * np.abs(log_L) and moved.size == 0:
log_L = log_L_
logger.info("likelihood converged within relative tolerance %r: stopping here." % tol)
break
else:
if np.abs(N0_ - N0) < tol * N0 and np.abs(N2_ - N2) < tol * N2 and moved.size == 0:
log_L = log_L_
logger.info("imputation sample size converged within relative tolerance %r: stopping here." % tol)
break
# force update to U for all moved components
if cutoff is not None:
for k in moved:
U[k] = None
if moved.size:
logger.debug("resetting neighborhoods of moving components: (" + ("%d," * moved.size + ")") % tuple(moved))
# update all important _ quantities for convergence test(s)
log_L = log_L_
N0 = N0_
N2 = N2_
# backup to see if components move or if next step gets worse
# note: not gmm = gmm_ !
gmm_.amp[:] = gmm.amp[:]
gmm_.mean[:,:] = gmm.mean[:,:]
gmm_.covar[:,:,:] = gmm.covar[:,:,:]
if background is not None:
bg_amp_ = background.amp
it += 1
return log_L, N, N2
# run one EM step
def _EMstep(gmm, log_p, U, T_inv, log_S, N0, data, covar=None, R=None, sel_callback=None, omega=None, oversampling=10, covar_callback=None, background=None, p_bg=None, w=0, pool=None, chunksize=1, cutoff=None, tol=1e-3, changeable=None, it=0, rng=np.random):
# NOTE: T_inv (in fact (T_ik)^-1 for all samples i and components k)
# is very large and is unfortunately duplicated in the parallelized _Mstep.
# If memory is too limited, one can recompute T_inv in _Msums() instead.
log_L = _Estep(gmm, log_p, U, T_inv, log_S, data, covar=covar, R=R, omega=omega, background=background, p_bg=p_bg, pool=pool, chunksize=chunksize, cutoff=cutoff, it=it)
A,M,C,N,B = _Mstep(gmm, U, log_p, T_inv, log_S, data, covar=covar, R=R, p_bg=p_bg, pool=pool, chunksize=chunksize)
A2 = M2 = C2 = B2 = N2 = 0
# here the magic happens: imputation from the current model
if sel_callback is not None:
# if there are projections / missing data, we don't know how to
# generate those for the imputation samples
# NOTE: in principle, if there are only missing data, i.e. R is 1_D,
# we could ignore missingness for data2 because we'll do an analytic
# marginalization. This doesn't work if R is a non-trivial matrix.
if R is not None:
raise NotImplementedError("R is not None: imputation samples likely inconsistent")
# create fake data with same mechanism as the original data,
# but invert selection to get the missing part
data2, covar2, N0, omega2 = draw(gmm, len(data)*oversampling, sel_callback=sel_callback, orig_size=N0*oversampling, invert_sel=True, covar_callback=covar_callback, background=background, rng=rng)
data2 = createShared(data2)
if not(covar2 is None or covar2.shape == (gmm.D, gmm.D)):
covar2 = createShared(covar2)
N0 = N0/oversampling
U2 = [None for k in xrange(gmm.K)]
if len(data2) > 0:
log_S2 = np.zeros(len(data2))
log_p2 = [[] for k in xrange(gmm.K)]
T2_inv = [None for k in xrange(gmm.K)]
R2 = None
if background is not None:
p_bg2 = [None]
else:
p_bg2 = None
log_L2 = _Estep(gmm, log_p2, U2, T2_inv, log_S2, data2, covar=covar2, R=R2, omega=None, background=background, p_bg=p_bg2, pool=pool, chunksize=chunksize, cutoff=cutoff, it=it)
A2,M2,C2,N2,B2 = _Mstep(gmm, U2, log_p2, T2_inv, log_S2, data2, covar=covar2, R=R2, p_bg=p_bg2, pool=pool, chunksize=chunksize)
# normalize for oversampling
A2 /= oversampling
M2 /= oversampling
C2 /= oversampling
B2 /= oversampling
N2 = N2/oversampling # need floating point precision in update
# check if components have outside selection
sel_outside = A2 > tol * A
if sel_outside.any():
logger.debug("component inside fractions: " + ("(" + "%.2f," * gmm.K + ")") % tuple(A/(A+A2)))
# correct the observed likelihood for the overall normalization constant of
# of the data process with selection:
# logL(x | gmm) = sum_k p_k(x) / Z(gmm), with Z(gmm) = int dx sum_k p_k(x) = 1
# becomes
# logL(x | gmm) = sum_k Omega(x) p_k(x) / Z'(gmm),
# with Z'(gmm) = int dx Omega(x) sum_k p_k(x), which we can gt by MC integration
log_L -= N * np.log((omega.sum() + omega2.sum() / oversampling) / (N + N2))
_update(gmm, A, M, C, N, B, A2, M2, C2, N2, B2, w, changeable=changeable, background=background)
return log_L, N, N2, N0
# perform E step calculations.
# If cutoff is set, this will also set the neighborhoods U
def _Estep(gmm, log_p, U, T_inv, log_S, data, covar=None, R=None, omega=None, background=None, p_bg=None, pool=None, chunksize=1, cutoff=None, it=0, rng=np.random):
# compute p(i | k) for each k independently in the pool
# need S = sum_k p(i | k) for further calculation
log_S[:] = 0
# H = {i | i in neighborhood[k]} for any k, needed for outliers below
# TODO: Use only when cutoff is set
H = np.zeros(len(data), dtype="bool")
k = 0
for log_p[k], U[k], T_inv[k] in \
parmap.starmap(_Esum, zip(xrange(gmm.K), U), gmm, data, covar, R, cutoff, pm_pool=pool, pm_chunksize=chunksize):
log_S[U[k]] += np.exp(log_p[k]) # actually S, not logS
H[U[k]] = 1
k += 1
if background is not None:
p_bg[0] = background.amp * background.p
if covar is not None:
# This is the zeroth moment of a truncated Normal error distribution
# Its calculation is simple only of the covariance is diagonal!
# See e.g. Manjunath & Wilhem (2012) if not
error = np.ones(len(data))
x0,x1 = background.footprint
for d in range(gmm.D):
if covar.shape == (gmm.D, gmm.D): # one-for-all
denom = np.sqrt(2 * covar[d,d])
else:
denom = np.sqrt(2 * covar[:,d,d])
# CAUTION: The erf is approximate and returns 0
# Thus, we don't add the logs but multiple the value itself
# underrun is not a big problem here
error *= np.real(scipy.special.erf((data[:,d] - x0[d])/denom) - scipy.special.erf((data[:,d] - x1[d])/denom)) / 2
p_bg[0] *= error
log_S[:] = np.log(log_S + p_bg[0])
if omega is not None:
log_S += np.log(omega)
log_L = log_S.sum()
else:
# need log(S), but since log(0) isn't a good idea, need to restrict to H
log_S[H] = np.log(log_S[H])
if omega is not None:
log_S += np.log(omega)
log_L = log_S[H].sum()
return log_L
# compute chi^2, and apply selections on component neighborhood based in chi^2
def _Esum(k, U_k, gmm, data, covar=None, R=None, cutoff=None):
# since U_k could be None, need explicit reshape
d_ = data[U_k].reshape(-1, gmm.D)
if covar is not None:
if covar.shape == (gmm.D, gmm.D): # one-for-all
covar_ = covar
else: # each datum has covariance
covar_ = covar[U_k].reshape(-1, gmm.D, gmm.D)
else:
covar_ = 0
if R is not None:
R_ = R[U_k].reshape(-1, gmm.D, gmm.D)
# p(x | k) for all x in the vicinity of k
# determine all points within cutoff sigma from mean[k]
if R is None:
dx = d_ - gmm.mean[k]
else:
dx = d_ - np.dot(R_, gmm.mean[k])
if covar is None and R is None:
T_inv_k = None
chi2 = np.einsum('...i,...ij,...j', dx, np.linalg.inv(gmm.covar[k]), dx)
else:
# with data errors: need to create and return T_ik = covar_i + C_k
# and weight each datum appropriately
if R is None:
T_inv_k = np.linalg.inv(gmm.covar[k] + covar_)
else: # need to project out missing elements: T_ik = R_i C_k R_i^R + covar_i
T_inv_k = np.linalg.inv(np.einsum('...ij,jk,...lk', R_, gmm.covar[k], R_) + covar_)
chi2 = np.einsum('...i,...ij,...j', dx, T_inv_k, dx)
# NOTE: close to convergence, we could stop applying the cutoff because
# changes to U will be minimal
if cutoff is not None:
indices = chi2 < cutoff
chi2 = chi2[indices]
if (covar is not None and covar.shape != (gmm.D, gmm.D)) or R is not None:
T_inv_k = T_inv_k[indices]
if U_k is None:
U_k = np.flatnonzero(indices)
else: