-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRelentlessFractals.py
executable file
·2158 lines (1541 loc) · 87 KB
/
RelentlessFractals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python3
EXIT_CODE_FILESYSTEM_ERROR = 3
""" builtin imports """
import time
import math
import itertools
import collections
import copy
import random
import gc
import posixpath
import pathlib
""" third-party imports """
import os; os.environ['OPENBLAS_NUM_THREADS'] = '1'; os.environ['MKL_NUM_THREADS'] = '1'; # https://stackoverflow.com/questions/17053671/how-do-you-stop-numpy-from-multithreading
import numpy
import pygame
try:
import fxpmath
except ImportError:
print("fxpmath is not installed. You might not need it, though.")
fxpmath = summon_cactus("fxpmath_was_never_imported_because_it_is_not_installed")
""" in-project imports """
from inlinetesting.TestingAtoms import assert_equal, summon_cactus
from inlinetesting.PureGenTools import gen_track_previous, take_first_and_iter, gen_track_previous_full, gen_track_recent, ProvisionError, izip_longest, gen_track_recent_trimmed, enumerate_to_depth_packed, iterate_to_depth, izip_shortest, gen_chunks_as_lists
import ComplexGeometry
from ComplexGeometry import real_of, imag_of, inv_abs_of, get_complex_angle, get_normalized, float_range
import SegmentGeometry
from SegmentGeometry import find_left_min, lerp, reals_of, imags_of
from HigherRangeFunctionalTools import higher_range, higher_range_by_corners, corners_to_range_descriptions
import MatrixMath
import CGOL
from ColorTools import atan_squish_to_byteint_unsigned_uniform_nearest
import Trig
sin, cos, tan = (Trig.sin, Trig.cos, Trig.tan) # short names for use only in compilation of mandel methods.
cpx, norm = (complex, get_normalized)
import PygameDashboard
from PygameDashboard import measure_time_nicknamed
def THIS_MODULE_EXEC(string):
exec(string)
CAPTION_RATE_LIMITER = PygameDashboard.SimpleRateLimiter(1.0)
STATUS_RATE_LIMITER = PygameDashboard.RateLimiter(3.0)
PASSIVE_DISPLAY_FLIP_RATE_LIMITER = PygameDashboard.RateLimiter(30.0)
COMPLEX_NAN = complex(math.nan, math.nan)
def shape_of(data_to_test):
result = []
while hasattr(data_to_test, "__len__"):
if isinstance(data_to_test, str):
print("shape_of: warning: a string will not be treated as a storage object, but this behavior is not standard.")
break
result.append(len(data_to_test))
if result[-1] == 0:
break
data_to_test = data_to_test[0]
return tuple(result)
assert shape_of([[0,1,2],[3,4,5]]) == (2, 3)
def enumerate_from_both_ends(data):
assert hasattr(data, "__len__")
for forwardI, item in enumerate(data):
yield (forwardI, len(data)-forwardI-1, item)
assert [item for item in enumerate_from_both_ends("abc")] == [(0,2,"a"), (1,1,"b"), (2,0,"c")]
assert [item for item in enumerate_from_both_ends("abcd")] == [(0,3,"a"), (1,2,"b"), (2,1,"c"), (3,0,"d")]
def is_round_binary(value):
assert value > 0
return value == 2**(value.bit_length()-1)
assert all(is_round_binary(testNum) for testNum in [2**i for i in range(2, 33)])
assert not any(is_round_binary(testNum) for testNum in [2**i+chg for i in range(2,33) for chg in (-1,1)])
def assure_round_binary(value):
assert is_round_binary(value), "could not assure value is round in binary."
return value
def enforce_tuple_length(input_tuple, length, default=None):
assert type(input_tuple) == tuple
if len(input_tuple) == length:
return input_tuple
elif len(input_tuple) < length:
return input_tuple + tuple(default for i in range(length-len(input_tuple)))
else:
return input_tuple[:length]
def to_portable(path_str):
# https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file
# windows forbidden: "<>:\"/\\|?*"
# slashes are allowed because they are used for saving to a folder.
"""
forbiddenDict = {
"<":"Lss", ">":"Gtr", ":":"Cln", "\"":"Dblqt", "\\":"Bkslsh", "|":"Vrtpipe", "?":"Quemrk", "*":"Astrsk",
"=":"Eq", "'":"Sglqt", "!":"Exclmrk", "@":"Atsgn", "#":"Poundsgn", "$":"Dlrsgn", "%":"Prcntsgn", "^":"Caret", "&":"Amprsnd",
}
"""
forbiddenDict = {
"<":"LS", ">":"GR", ":":"CN", "\"":"DQ", "\\":"BS", "|":"VP", "?":"QM", "*":"AK",
"=":"EQ", "'":"SQ", "!":"XM", "@":"AT", "#":"HS", "$":"DS", "%":"PC", "^":"CT", "&":"AP",
";":"SN", "~":"TD", "[":"LB", "]":"RB", "{":"LC", "}":"RC",
}
for forbiddenChar, replacementChar in forbiddenDict.items():
oldPathStr = path_str
path_str = path_str.replace(forbiddenChar, replacementChar)
if path_str != oldPathStr:
print("to_portable: Warning: ~{} occurrences of {} will be replaced with {} for portability.".format(oldPathStr.count(forbiddenChar), repr(forbiddenChar), repr(replacementChar)))
return path_str
@measure_time_nicknamed("save_surface_as", end="\n\n", include_lap=True, include_load=True)
def save_surface_as(surface, name_prefix="", name=None,
_gccollect = measure_time_nicknamed("garbage collection", include_load=True)(gc.collect)
):
if name is None:
size = surface.get_size()
sizeStr = str(size).replace(", ","x") if (size[0] != size[1]) else "({}x)".format(size[0])
name = "{}{}.png".format(round(time.monotonic(), ndigits=1), sizeStr)
usedName = to_portable(OUTPUT_FOLDER + name_prefix + name)
print("saving file {}.".format(usedName))
print("WILL SAVE TO TEST FILE!")
# testName = "./"+usedName.split("/")[-1]
testName = "./test file with spaces.png"
print("TEST NAME IS {}.".format(testName))
pygame.image.save(surface, testName)
print("DONE SAVING TEST FILE.")
assert usedName.endswith(".png")
pygame.image.save(surface, usedName)
_gccollect()
#print("{} unreachable objects.".format())
@measure_time_nicknamed("draw_squished_ints_to_surface", include_load=True)
def draw_squished_ints_to_surface(dest_surface, channels, access_order=None):
# maybe this method shouldn't exist. Maybe image creation should happen in another process, like photo.py in GeodeFractals.
assert COLOR_SETTINGS_SUMMARY_STR == "color(atan)"
if access_order == "cyx":
colorDataGetter = lambda argX, argY, argC: channels[argC][argY][argX]
xSize, ySize, cSize = (len(channels[0][0]), len(channels[0]), len(channels))
elif access_order == "yxc":
colorDataGetter = lambda argX, argY, argC: channels[argY][argX][argC]
xSize, ySize, cSize = (len(channels[0]), len(channels), len(channels[0][0]))
else:
raise ValueError("unsupported access order.")
assert cSize == 3, shape_of(channels)
assert xSize > 4
assert ySize > 4
try:
for y in range(ySize):
for x in range(xSize):
color = tuple(atan_squish_to_byteint_unsigned_uniform_nearest(colorDataGetter(x, y, chi)) for chi in range(cSize))
dest_surface.set_at((x, y), color)
except IndexError as ie:
print("index error when (x, y)=({}, {}): {}.".format(x, y, ie))
exit(1)
def dv1range(subdivisions):
denom = float(subdivisions)
for x in range(subdivisions):
yield x/denom
assert_equal(list(dv1range(2)), [0.0, 0.5])
assert_equal(list(dv1range(4)), [0.0, 0.25, 0.5, 0.75])
def construct_data(size, default_value=None, converter_fun=None, print_status=False):
assert len(size) > 0
if converter_fun is not None:
raise NotImplementedError("converter_fun")
"""
if STATUS_RATE_LIMITER.get_judgement():
print()
"""
if len(size) == 1:
result = [copy.deepcopy(default_value) for i in range(size[0])]
return result
else:
result = []
for i in range(size[0]):
if STATUS_RATE_LIMITER.get_judgement():
print("construct_data: {}%...".format(round(i*100.0/size[0], ndigits=3)))
result.append(construct_data(size[1:], default_value=default_value, print_status=False))
return result
assert_equal(shape_of(construct_data([5,6,7])), (5,6,7))
def construct_numpy_data(size, default_value=None):
return construct_data(size, default_value=default_value, converter_fun=numpy.array)
def fill_data(data, fill_value):
# print("fix fill data")
assert isinstance(data, (list, numpy.ndarray)), type(data)
for i in range(len(data)):
if isinstance(data[i], (list, numpy.ndarray)):
fill_data(data[i], fill_value)
elif isinstance(data[i], (tuple, str)):
raise TypeError("type {} can't be processed!".format(type(data[i])))
else:
if not hasattr(data[i], "__setitem__") == hasattr(fill_value, "__setitem__"): # don't test for __getitem__ because numpy.int64 has that.
raise NotImplementedError("type can't be changed from {} to {} because only one seems to be a container!".format(repr(type(data[i])), repr(type(fill_value))))
data[i] = fill_value
def gen_assuredly_ascending(input_seq):
for previousItem, item in gen_track_previous(input_seq):
if previousItem is not None:
assert previousItem <= item , "could not assure ascending! not {} <= {}.".format(repr(previousItem),repr(item))
yield item
"""
def mutate_method_consts(fun_to_mutate, replacement_dict):
originalConsts = fun_to_mutate.__code__.co_consts
for key in replacement_dict.keys():
assert originalConsts.count(key) == 1, (key, fun_to_mutate, originalConsts)
fun_to_mutate.__code__ = fun_to_mutate.__code__.replace(co_consts=tuple((replacement_dict[item] if item in replacement_dict else item) for item in originalConsts))
cannot work. modifies original even if deepcopies are made.
"""
_mandelMethodsSourceStrs={
"c_to_mandel_itercount_fast":"""
def c_to_mandel_itercount_fast(c, iter_limit):
${init_formula}
for n in range(iter_limit):
if ${esc_test}:
return n
${iter_formula}
return None""",
"c_to_escstop_mandel_journey":"""
def c_to_escstop_mandel_journey(c):
${init_formula}
for n in itertools.count():
${yield_formula}
if ${esc_test}:
return
${iter_formula}""",
}
# z0="0+0j", exponent="2"
def compile_mandel_method(method_name, init_formula=None, yield_formula=None, esc_test=None, iter_formula=None):
sourceStr = _mandelMethodsSourceStrs[method_name]
assert sourceStr.count("def {}(".format(method_name)) == 1, "bad source code string for name {}!".format(method_name)
sourceStr = sourceStr.replace("${init_formula}", init_formula).replace("${yield_formula}", yield_formula).replace("${esc_test}", esc_test).replace("${iter_formula}", iter_formula)
exec(sourceStr)
assert method_name in locals().keys(), "method name {} wasn't in locals! bad source code string?".format(method_name)
return locals()[method_name]
def gen_embed_exceptions(input_seq, exception_types):
try:
for item in input_seq:
yield item
except exception_types as e:
yield e
return
def gen_suppress_exceptions(input_seq, exception_types):
try:
for item in input_seq:
yield item
except exception_types:
return
"""
def c_to_mandel_journey_OLD(c):
z = 0+0j
while True:
yield z
z = z**2 + c
"""
def c_to_mandel_journey_abberated_by_addition(c, abberation_seq):
raise NotImplementedError("should probably be moved to the source strings.")
z = 0+0j
yield z
for abber in abberation_seq:
z = z**2 + c
z += abber
yield z
while True:
z = z**2 + c
yield z
"""
def c_must_be_in_mandelbrot(c):
circles = [(complex(-1.0, 0.0), 0.24), (complex(0.5, 0.0),2.45)]
for circle in circles:
if abs(c - circle[0]) < circle[1]:
return True
return False
"""
"""
def gen_constrain_journey(journey, iter_limit, escape_radius):
# assert escape_radius <= 256, "this may break seg intersections."
for i, point in enumerate(journey):
yield point
if i >= iter_limit:
return
if abs(point) > escape_radius:
return
assert False, "incomplete journey."
"""
def get_sum_of_inverse_segment_lengths(constrained_journey):
result = 0.0
for previousPoint, point in gen_track_previous(constrained_journey):
if previousPoint is not None:
currentSegLen = abs(point-previousPoint)
try:
result += 1.0 / currentSegLen
except ZeroDivisionError:
result = math.inf
# print("returning {}.".format(result))
return result
def get_sum_of_inverse_abs_vals(constrained_journey):
result = 0.0
for point in constrained_journey:
currentVal = abs(point)
try:
result += 1.0 / currentVal
except ZeroDivisionError:
result = math.inf
return result
def count_float_local_minima(input_seq): # does not recognize any minimum with more than one identical value in a row.
raise NotImplementedError("tests needed! and maybe rewrite using gen_track_recent...")
result = 0
history = [None, None]
for item in input_seq:
if None not in history:
if history[1] < history[0] and history[1] < item:
result += 1
history[0] = history[1]
history[1] = item
return result
def gen_seg_seq_intersections_with_seg(seg_seq, reference_seg, intersection_fun=None):
for oldSeg in seg_seq:
intersection = intersection_fun(reference_seg, oldSeg)
if intersection is not None:
yield intersection
def gen_seg_seq_self_intersections(seg_seq, intersection_fun=None, preloaded_seg_history=None, freeze_seg_history=None, gap_size=None, sort_by_time=None, combine_colinear=None):
assert sort_by_time is not None
assert freeze_seg_history is not None
if sort_by_time:
assert combine_colinear is not None
else:
assert not combine_colinear, "bad args"
if preloaded_seg_history is None:
segHistory = []
assert freeze_seg_history is False
else:
segHistory = preloaded_seg_history
assert freeze_seg_history, "are you sure? if so, remove this assertion to use this feature."
for currentSeg in seg_seq:
if not freeze_seg_history:
segHistory.append(currentSeg)
intersectionGen = gen_seg_seq_intersections_with_seg(segHistory[:-1-gap_size], currentSeg, intersection_fun=intersection_fun)
if sort_by_time:
intersectionList = sorted(intersectionGen, key=(lambda point: abs(currentSeg[0]-point))) # WOW, that looks slow.
if len(intersectionList) > 0:
if combine_colinear:
yield intersectionList[0]
if len(intersectionList) > 1:
yield intersectionList[-1]
else:
for intersection in intersectionList:
yield intersection
else:
for intersection in intersectionGen:
yield intersection
def gen_path_self_intersections(journey, intersection_fun=None, sort_by_time=None, combine_colinear=False): #could use less memory.
return gen_seg_seq_self_intersections(gen_track_previous_full(journey, allow_waste=True), intersection_fun=intersection_fun, freeze_seg_history=False, gap_size=1, sort_by_time=sort_by_time, combine_colinear=combine_colinear)
assert_equal(list(gen_path_self_intersections([complex(0,0),complex(0,4),complex(2,2),complex(-2,2), complex(-2,3),complex(10,3)], intersection_fun=SegmentGeometry.segment_intersection, sort_by_time=False)), [complex(0,2), complex(0,3),complex(1,3)])
def gen_path_pair_mutual_intersections(point_seq_0, point_seq_1, intersection_fun=None):
segList0 = list(gen_track_previous_full(point_seq_0, allow_waste=True))
segGen1 = gen_track_previous_full(point_seq_1, allow_waste=True)
# let the gap size be 0 because regardless of gap size, segs are never actually compared to their predecessor in this method!
return gen_seg_seq_self_intersections(segGen1, intersection_fun=intersection_fun, preloaded_seg_history=segList0, freeze_seg_history=True, gap_size=0, sort_by_time=False, combine_colinear=False)
assert_equal(list(gen_path_pair_mutual_intersections([1+2j, 3+2j, 30+2j, 30+3j, 29+3j, 29+1j], [2+1j, 2+3j, 2+30j, 3+30j, 3+29j, 1+29j], intersection_fun=SegmentGeometry.segment_intersection)), [2+2j])
# print("tests needed for path pair windowed mutual intersections.")
def gen_path_pair_windowed_mutual_intersections(point_seq_0, point_seq_1, intersection_fun=None, window_distance=None, skip_count=0):
raise NotImplementedError("tests needed! also, verify usage of izip_shortest is correct.")
assert window_distance >= 1
assert 0 <= skip_count < window_distance # this window distance test I'm not so sure about.
segGenPair = [gen_track_previous_full(pointSeq, allow_waste=True) for pointSeq in (point_seq_0, point_seq_1)]
segWindowGenPair = [gen_track_recent_trimmed(segGen, count=window_distance+1) for segGen in segGenPair]
for leftWindow, rightWindow in izip_shortest(*segWindowGenPair):
leftOnRightGen = (intersection_fun(leftWindow[0], otherSeg) for otherSeg in rightWindow[skip_count:])
rightOnLeftGen = (intersection_fun(rightWindow[0], otherSeg) for otherSeg in leftWindow[max(skip_count, 1):])
intersectionGen = (item for item in itertools.chain(leftOnRightGen, rightOnLeftGen) if item is not None)
for intersection in intersectionGen:
yield intersection
"""
def gen_path_self_non_intersections(journey, intersection_fun=None): # code duplication, but there's no other fast way.
knownSegs = []
for currentSeg in gen_track_previous_full(journey):
knownSegs.append(currentSeg)
for oldKnownSeg in knownSegs[:-2]:
if intersection_fun(currentSeg, oldKnownSeg) is not None:
break
else:
yield currentSeg[1]
"""
# disabled because it is probably better to zip them elsewhere to avoid confusion.
"""
def gen_ladder_rung_self_intersections(journey0, journey1, intersection_fun=None):
return gen_seg_seq_self_intersections(izip(journey0, journey1), intersection_fun=intersection_fun)
"""
def gen_path_intersections_with_seg(journey, reference_seg, intersection_fun=None):
raise NotImplementedError("possibly redefine using call to seg seq methods, and create new tests.")
for currentSeg in gen_track_previous_full(journey):
intersection = intersection_fun(currentSeg, reference_seg)
if intersection is not None:
yield intersection
def gen_path_zipped_multi_seg_intersections(journey, reference_segs, intersection_fun=None):
for currentSeg in gen_track_previous_full(journey):
intersections = [intersection_fun(currentSeg, referenceSeg) for referenceSeg in reference_segs]
if any(intersection is not None for intersection in intersections):
yield intersections
"""
def gen_path_pair_mutual_intersections(journies, intersection_fun=None):
assert len(journies) == 2
knownSegsByJourney = [[] for i in range(len(journies))]
for currentSegs in zip(gen_track_previous_full(
"""
def gen_record_breakers(input_seq, score_fun=None):
try:
first, inputGen = take_first_and_iter(input_seq)
except ProvisionError:
return
record = score_fun(first)
yield first
for inputItem in inputGen:
score = score_fun(inputItem)
if score > record:
record = score
yield inputItem
def gen_flag_multi_record_breakers(input_seq, score_funs=None):
try:
first, inputGen = take_first_and_iter(input_seq)
except ProvisionError:
return
records = [scoreFun(first) for scoreFun in score_funs]
yield (first, [True for i in range(len(score_funs))])
for inputItem in inputGen:
scores = [scoreFun(inputItem) for scoreFun in score_funs]
newRecordFlags = tuple((score > record) for score, record in zip(scores, records))
if any(newRecordFlags):
for i, (score, isNewRecord) in enumerate(zip(scores, newRecordFlags)):
if isNewRecord:
records[i] = score
yield (inputItem, newRecordFlags)
"""
if not any((score > record) for score, record in zip(scores, records)):
continue
currentResult =
for i, (score, record) in enumerate(zip(scores, records)):
if score > record:
records[i] = score
currentResult[i] = item
yield currentResult
"""
def gen_track_sum(input_seq):
try:
sumSoFar, inputGen = take_first_and_iter(input_seq)
except ProvisionError:
return
yield (sumSoFar, sumSoFar)
for item in inputGen:
sumSoFar += item
yield (sumSoFar, item)
assert_equal(list(gen_track_sum([1,2,3,4.5])), [(1,1),(3,2),(6,3),(10.5,4.5)])
def gen_track_mean(input_seq):
for denominator, (sumSoFar, item) in enumerate(gen_track_sum(input_seq), 1):
yield (sumSoFar/float(denominator), item)
assert_equal(list(gen_track_mean([1,2,3,2])), [(1.0,1),(1.5,2),(2.0,3),(2.0,2)])
assert_equal(list(gen_track_mean([complex(4,40),complex(0,0)])), [(complex(4,40), complex(4,40)), (complex(2,20), complex(0,0))])
def gen_track_decaying_mean(input_seq, feedback=None):
feedbackCompliment = 1.0-feedback
try:
first, inputGen = take_first_and_iter(input_seq)
except ProvisionError:
return
memoryValue = feedbackCompliment*first
yield (memoryValue, first)
for item in inputGen:
memoryValue = (feedback*memoryValue) + (feedbackCompliment*item)
yield (memoryValue, item)
def gen_change_basis_using_embedded_triplets(input_seq):
left, middle, right = (None, None, None)
for i, (left, middle, right) in enumerate(gen_track_recent(input_seq, count=3, default=0j)):
if i == 0:
continue
yield middle.real*left + middle.imag*right
if right is not None:
yield right.real*middle
assert_equal(list(gen_change_basis_using_embedded_triplets([1+2j, 20+30j, 11+12j])), [2*(20+30j), 20*(1+2j)+30*(11+12j), 11*(20+30j)])
assert_equal(list(gen_change_basis_using_embedded_triplets([1+2j, 3+4j])), [2*(3+4j), 3*(1+2j)])
assert_equal(list(gen_change_basis_using_embedded_triplets([1+2j])), [0+0j])
def gen_change_basis_using_zipped_triplets(input_seq):
raise NotImplementedError()
class SetMathProvisionError(Exception):
pass
def mean(input_seq):
sumSoFar = 0
i = -1
for i, item in enumerate(input_seq):
sumSoFar += item
itemCount = i + 1
if itemCount == 0:
raise SetMathProvisionError("This used to return 0 here. Is that allowed?")
# return 0
assert itemCount > 0
return sumSoFar / float(itemCount)
assert mean([3,4,5]) == 4
assert mean([1,1,1,5]) == 2
assert mean([1,2]) == 1.5
def median(input_seq):
inputList = sorted(input_seq)
if len(inputList) == 0:
raise SetMathProvisionError()
centerIndex = len(inputList)//2
if len(inputList) % 2 == 1:
return inputList[centerIndex]
else:
assert len(inputList) % 2 == 0
return (inputList[centerIndex]+inputList[centerIndex-1])/2.0
assert median([1,2,3,50,400,500,600]) == 50
def complex_decomposed_median(input_seq):
inputList = [item for item in input_seq]
realMedian = median(reals_of(inputList))
imagMedian = median(imags_of(inputList))
return complex(realMedian, imagMedian)
assert complex_decomposed_median([1+600j,4+500j,5+55j,9+400j,125+43j,126+44j,127+45j]) == 9+55j
def farcancel_median(input_seq, _enumerateToDepthTwoPacked=(lambda thing: enumerate_to_depth_packed(thing, depth=2))):
inputList = [item for item in input_seq]
if len(inputList) == 0:
raise SetMathProvisionError()
if len(inputList) == 1:
return inputList[0]
if len(inputList) == 2:
return mean(inputList)
distances = [[abs(itemA-itemB) for itemB in inputList] for itemA in inputList]
descendingDistanceSegs = sorted(_enumerateToDepthTwoPacked(distances), key=(lambda thing: -thing[1]))
demirroredDescendingDistanceSegs = [item for item in descendingDistanceSegs if item[0][0] < item[0][1]]
lastValidSegment = None
cancelledIndicesSet = set()
for segment in demirroredDescendingDistanceSegs:
if segment[1] == math.inf:
raise NotImplementedError("can't handle infinite distances yet!")
if segment[0][0] in cancelledIndicesSet or segment[0][1] in cancelledIndicesSet:
continue
else:
if lastValidSegment is not None:
assert segment[1] <= lastValidSegment[1]
lastValidSegment = segment
for index in segment[0]:
assert index not in cancelledIndicesSet
cancelledIndicesSet.add(index)
assert lastValidSegment is not None
if len(cancelledIndicesSet) == len(inputList):
lastValidSegmentEndpoints = [inputList[index] for index in lastValidSegment[0]]
assert len(lastValidSegmentEndpoints) == 2
assert abs(lastValidSegmentEndpoints[1] - lastValidSegmentEndpoints[0]) == lastValidSegment[1]
return mean(lastValidSegmentEndpoints)
else:
assert len(cancelledIndicesSet) == len(inputList)-1
for i, point in enumerate(inputList):
if i not in cancelledIndicesSet:
return point
assert False, "failed somehow."
assert False
assert_equal(farcancel_median([1+1j,3+3j,7+7j,5+5j,4+4j,2+2j,6+6j]), 4+4j)
assert_equal(farcancel_median([100+100j,300+300j,100+105j,100+95j,-200-200j,110+110j,90+90j]), 100+100j)
assert_equal(farcancel_median([0+1j,0+0j, complex(100,100)]), 0+1j)
def gen_linear_downsample(input_seq, count=None, analysis_fun=None):
"""
inputGen = iter(input_seq)
while True:
currentBucket = [item for item in itertools.islice(inputGen, 0, count)]
if len(currentBucket) == 0:
return
yield analysis_fun(currentBucket)
if len(currentBucket) < count:
return
assert False
"""
for chunk in gen_chunks_as_lists(input_seq, count):
yield analysis_fun(chunk)
assert_equal(list(gen_linear_downsample([1,3,2,4,3,5,10,20], count=2, analysis_fun=mean)), [2,3,4,15])
def gen_shrinking_selections_as_lists(input_seq):
inputList = [item for item in input_seq]
combinationTupleListGen = (list(itertools.combinations(inputList, size)) for size in range(len(inputList),0,-1))
selectionTupleGen = itertools.chain.from_iterable(combinationTupleListGen)
return selectionTupleGen
assert list(gen_shrinking_selections_as_lists(range(0,3))) == [(0,1,2),(0,1),(0,2),(1,2),(0,),(1,),(2,)]
def gen_shrinking_selection_analyses(input_seq, analysis_fun=None):
return (analysis_fun(selection) for selection in gen_shrinking_selections_as_lists(input_seq))
def gen_path_seg_lerps(input_seq, t=None):
raise NotImplementedError("tests needed!")
assert 0.0 <= t <= 1.0
for pointA, pointB in gen_track_previous_full(input_seq):
yield lerp(pointA, pointB, t)
"""
def gen_path_seg_multi_lerps(input_seq, t_seq):
# assert isinstance(t_list, (tuple, list))
return itertools.chain.from_iterable(izip(gen_path_seg_lerps(input_seq, t) for t in t_seq))
"""
def gen_path_seg_multi_lerps(input_seq, t_list=None): # could easily be faster with a multi lerp method.
raise NotImplementedError("tests needed!")
assert isinstance(t_list, (tuple, list))
# assert all(0.0 <= t <= 1.0 for t in t_list)
for pointA, pointB in gen_track_previous_full(input_seq):
for t in t_list:
yield lerp(pointA, pointB, t)
# gen_path_seg_multi_lerps_12x = SegmentGeometry.compose_single_arg_function(gen_path_seg_multi_lerps, depth=12)
# gen_path_seg_quarterbevel_12x = (lambda input_seq: gen_path_seg_multi_lerps(input_seq, t_list=[0.25, 0.75]))
"""
def gen_path_seg_midpoints(input_seq):
return gen_path_seg_lerps(input_seq, t=0.5)
"""
def make_list_copier_from_list_mutator(input_mutator):
def inner(input_seq, **kwargs):
workingList = [item for item in input_seq]
input_mutator(workingList)
return workingList
return inner
def sort_with_greedy_neighbor_distance_minimizer(input_list, distance_fun=None):
for i in range(len(input_list)-1):
bestNextItemRelIndex, bestNextItem = find_left_min(distance_fun(input_list[i], item) for item in input_list[i+1:])
# assert input_list[i+1:][bestNextItemRelIndex] == bestNextItem
bestNextItemIndex = bestNextItemRelIndex + i + 1
# assert distance_fun(input_list[i], input_list[bestNextItemIndex]) == bestNextItem, (bestNextItem, i, bestNextItemRelIndex, input_list)
if bestNextItemIndex != i + 1:
input_list[i + 1], input_list[bestNextItemIndex] = (input_list[bestNextItemIndex], input_list[i+1])
def sort_to_greedy_shortest_path_order(input_list):
return sort_with_greedy_neighbor_distance_minimizer(input_list, (lambda testValA, testValB: abs(testValA - testValB)))
testList = [complex(1,1),complex(3,1),complex(2,5),complex(2,2)]
sort_to_greedy_shortest_path_order(testList)
assert_equal(testList, [complex(1,1),complex(2,2),complex(3,1),complex(2,5)])
del testList
def sort_to_greedy_longest_path_order(input_list):
return sort_with_greedy_neighbor_distance_minimizer(input_list, (lambda testValA, testValB: -abs(testValA - testValB)))
testList = [complex(1,1),complex(3,1),complex(2,5),complex(2,2)]
sort_to_greedy_longest_path_order(testList)
assert_equal(testList, [complex(1,1),complex(2,5),complex(3,1),complex(2,2)])
del testList
sorted_to_greedy_shortest_path_order = make_list_copier_from_list_mutator(sort_to_greedy_shortest_path_order)
sorted_to_greedy_longest_path_order = make_list_copier_from_list_mutator(sort_to_greedy_longest_path_order)
def eat_in_greedy_shortest_path_order(input_list):
raise NotImplementedError()
"""
def sorted_to_greedy_shortest_path_order(input_seq):
workingList = [item for item in input_seq]
sort_to_greedy_shortest_path_order(workingList)
return workingList
"""
def parallel_div_complex_by_floats(view_size, screen_size):
return complex(view_size.real/screen_size[0], view_size.imag/screen_size[1])
def parallel_mul_complex_by_floats(complex_val, float_pair):
return complex(complex_val.real * float_pair[0], complex_val.imag * float_pair[1])
def parallel_div_complex_by_complex(val0, val1):
return complex(val0.real/val1.real, val0.imag/val1.imag)
def ordify(string):
return (ord(char) for char in string)
def scaled_size(input_size, input_scale):
assert len(input_size) == 2
assert isinstance(input_scale, int)
return (input_size[0]*input_scale, input_size[1]*input_scale)
"""
class CoordinateError(Exception):
pass
"""
"""
class ExtremeScaleWarning(Exception):
pass
"""
"""
class ViewOutOfBoundsError(Exception):
pass
"""
class ViewBaseCoordinateError(Exception):
pass
class ViewOutOfStrictBoundsError(ViewBaseCoordinateError):
pass
class ViewOutOfInttupBoundsError(ViewBaseCoordinateError):
pass
class ViewOutOfMatrixBoundsError(ViewBaseCoordinateError):
pass
def relativecpx_is_in_bounds(value):
return value.real >= 0 and value.real < 1 and value.imag >= 0 and value.imag < 1
def inttup_is_in_bounds(int_tup, size):
return not (int_tup[0] < 0 or int_tup[0] >= size[0] or int_tup[1] < 0 or int_tup[1] >= size[1])
def tunnel_absolutecpx(value, view0, view1, bound=True, default=ViewOutOfStrictBoundsError):
return view1.relativecpx_to_absolutecpx(view0.absolutecpx_to_relativecpx(value, bound=bound, default=default), bound=bound, default=default)
def gen_tunnel_absolutecpx(input_seq, *args, **kwargs):
assert "default" not in kwargs
for item in input_seq:
if item == COMPLEX_NAN:
print_and_reduce_repetition("gen_tunnel_absolutecpx: warning: complex nan was already in the data. it will disappear.")
result = tunnel_absolutecpx(item, *args, **kwargs, default=COMPLEX_NAN)
if result == COMPLEX_NAN:
continue
yield result
class View:
def __init__(self, *, center_pos=None, corner_pos=None, sizer=None):
self.sizer = sizer
assert self.sizer.real > 0
assert self.sizer.imag > 0
if corner_pos is not None:
self.corner_pos = corner_pos
assert center_pos is None
else:
assert center_pos is not None
self.corner_pos = center_pos - 0.5*self.sizer
@property
def center_pos(self):
return self.corner_pos + 0.5*self.sizer