This repository has been archived by the owner on Jan 4, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathprocessing.py
983 lines (779 loc) · 41.3 KB
/
processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
'''Module to process FPP images'''
from __future__ import annotations
import multiprocessing
from multiprocessing import Pool
from typing import Optional, Tuple
import cv2
import numpy as np
from scipy import signal
from scipy.optimize import fsolve
import config
from fpp_structures import FPPMeasurement, PhaseShiftingAlgorithm
def calculate_phase_generic(images: list[np.ndarray], phase_shifts: Optional[list[float]]=None, frequency: Optional[float]=None, phase_shifting_type: PhaseShiftingAlgorithm = PhaseShiftingAlgorithm.n_step, direct_formula: bool = False) -> tuple[np.ndarray, np.ndarray, np.ndarray] :
'''
Calculate wrapped phase from several Phase Shifting Profilometry images by
generic formula (8) in https://doi.org/10.1016/j.optlaseng.2018.04.019
Args:
images (list of numpy arrays): the list of Phase Shifting Profilometry images
phase_shifts = None (list): the list of phase shifts for each image from images,
if phase_shifts is not defined, its calculated automatical for uniform step
frequency = None (float): the frequency of measurement to add PI for unity frequency images
phase_shifting_type = n_step (enum(int)): type of phase shifting algorithm should be used for phase calculating
direct_formula = False (bool): use direct formulas to calculate phases for 3- and 4-step phase shifts
Returns:
result_phase (2D numpy array): wrapped phase from images
average_intensity (2D numpy array): average intensity on images
modulated_intensity (2D numpy array): modulated intensity on images
'''
def calculate_n_step_phase(imgs: list[np.ndarray], phase_shifts: list[float]):
# Use specific case for phase shifts length
if direct_formula and len(phase_shifts) == 3:
# Calculate formula (14-16) in https://doi.org/10.1016/j.optlaseng.2018.04.019
sum12 = imgs[1] - imgs[2]
sum012 = 2 * imgs[0] - imgs[1] - imgs[2]
result_phase = np.arctan2(np.sqrt(3) * (sum12), sum012)
average_intensity = (imgs[0] + imgs[1] + imgs[2]) / 3
modulated_intensity = 1 / 3 * np.sqrt(3 * (sum12) ** 2 + (sum012) ** 2)
elif direct_formula and len(phase_shifts) == 4:
# Calculate formula (21-23) in https://doi.org/10.1016/j.optlaseng.2018.04.019
sum13 = imgs[1] - imgs[3]
sum02 = imgs[0] - imgs[2]
result_phase = np.arctan2(sum13, sum02)
average_intensity = (imgs[0] + imgs[1] + imgs[2] + imgs[3]) / 4
modulated_intensity = 0.5 * np.sqrt(sum13**2 + sum02**2)
else:
# Reshape phase shifts for broadcasting multiplying
phase_shifts = np.array(phase_shifts).reshape((-1,) + (1, 1))
# Add suplementary phase to get phase for unity frequency measurment
phase_sup = 0
if frequency is not None and frequency == 1:
phase_sup = np.pi
# Calculate formula (8) in https://doi.org/10.1016/j.optlaseng.2018.04.019
temp1 = np.multiply(imgs, np.sin(phase_shifts + phase_sup))
temp2 = np.multiply(imgs, np.cos(phase_shifts + phase_sup))
sum1 = np.sum(temp1, 0)
sum2 = np.sum(temp2, 0)
result_phase = np.arctan2(sum1, sum2)
# Calculate formula (9-10) in https://doi.org/10.1016/j.optlaseng.2018.04.019
average_intensity = np.mean(imgs, 0)
modulated_intensity = 2 * np.sqrt(np.power(sum1, 2) + np.power(sum2, 2)) / len(images)
return result_phase, average_intensity, modulated_intensity
# Calculate shifts if its not defined
if phase_shifts is None:
phase_shifts = [2 * np.pi / len(images) * n for n in range(len(images))]
# Form numpy array for broadcasting
imgs = np.zeros((len(images), images[0].shape[0], images[0].shape[1]))
# Add images to formed numpy array
for i in range(len(images)):
imgs[i] = images[i]
# Depending on phase shift algorithm calculate wrapped phase field
if phase_shifting_type == PhaseShiftingAlgorithm.n_step:
# Classic N-step approach
result_phase, average_intensity, modulated_intensity = calculate_n_step_phase(images, phase_shifts)
elif phase_shifting_type == PhaseShiftingAlgorithm.double_three_step:
# Double three-step approach - average of two 3-step phases (second shifted by PI/3)
# Calculate formula (26-31) from section 3.2 in https://doi.org/10.1016/j.optlaseng.2018.04.019
result_phase1, average_intensity1, modulated_intensity1 = calculate_n_step_phase(imgs[:3,:,:], phase_shifts[:3])
result_phase2, average_intensity2, modulated_intensity2 = calculate_n_step_phase(imgs[3:,:,:], phase_shifts[3:])
result_phase = (result_phase1 + result_phase2) / 2
average_intensity = (average_intensity1 + average_intensity2) / 2
modulated_intensity = (modulated_intensity1 + modulated_intensity2) / 2
return result_phase, average_intensity, modulated_intensity
def calculate_unwraped_phase(phase_l: np.ndarray, phase_h: np.ndarray, lamb_l:float , lamb_h: float) -> np.ndarray:
'''
Calculate unwrapped phase from two sets of Phase Shifting Profilometry images by
formula (94-95) in https://doi.org/10.1016/j.optlaseng.2018.04.019
with standard temporal phase unwrapping (TPU) algorithm
Args:
phase_l (2D numpy array): The calculated phase for set of PSP images with low frequency (lamb_l)
phase_h (2D numpy array): The calculated phase for set of PSP images with high frequency (lamb_h)
lamb_l (float): The low spatial frequency for first phase array (phase_l)
lamb_h (float): The high spatial frequency for second phase array (phase_h)
Returns:
unwrapped_phase (2D numpy array): unwrapped phase
'''
assert phase_h.shape == phase_l.shape, \
'Shapes of phase_l and phase_h must be equals'
# Formula (95) in https://doi.org/10.1016/j.optlaseng.2018.04.019
k = np.round(((lamb_l / lamb_h) * phase_l - phase_h) / (2 * np.pi)).astype(int)
# Formula (94) in https://doi.org/10.1016/j.optlaseng.2018.04.019
unwrapped_phase = phase_h + 2 * np.pi * k
return unwrapped_phase
def calculate_phase_for_fppmeasurement(measurement: FPPMeasurement):
'''
Calculate unwrapped phase for FPP measurement instance with the help
of calculate_phase_generic and calculate_unwraped_phase functions.
Calculated phase fields will be stored in input measurement argument.
Args:
measurement (FPPMeasurement): FPP measurement with images
'''
# Load measurement data
frequencies = measurement.frequencies
shifts = measurement.shifts
frequency_counts = len(measurement.frequencies)
for cam_result in measurement.camera_results:
# Empty lists for phase, unwrapped phase, average and modulated intensity
phases = []
unwrapped_phases = []
avg_ints = []
mod_ints = []
# Get images for one camera
images = cam_result.imgs_list
# Calculate fields for each frequency
for i in range(frequency_counts):
images_for_one_frequency = images[i]
phase, avg_int, mod_int = calculate_phase_generic(images_for_one_frequency, shifts, frequencies[i], phase_shifting_type=measurement.phase_shifting_type)
# Filter phase field with threshold
mask = np.where(mod_int > 5, 1, 0)
phase = phase * mask
phases.append(phase)
avg_ints.append(avg_int)
mod_ints.append(mod_int)
if i == 0:
# First phase field should be unit-frequency without ambiguity
unwrapped_phases.append(phase)
else:
# Next phase fields should be unwrapped
unwrapped_phase = calculate_unwraped_phase(unwrapped_phases[i-1], phases[i], 1 / frequencies[i-1], 1 / frequencies[i])
unwrapped_phases.append(unwrapped_phase)
# Set current camera results instance with calculated fields
cam_result.phases = phases
cam_result.unwrapped_phases = unwrapped_phases
cam_result.average_intensities = avg_ints
cam_result.modulated_intensities = mod_ints
def create_polygon(shape: Tuple[int], vertices: np.ndarray) -> np.ndarray:
'''
Creates 2D numpy array with poligon defined by vertices.
Points inside polygon fills with ones, all overs with zeros.
From https://stackoverflow.com/a/37123933
Args:
shape (tuple of int): The shape of 2D numpy array to generate
vertices (2D numpy array): The 2D numpy array with shape (N, 2) and coordinates of polygon vercities in it ([[x0, y0], ..., [xn, yn]])
Returns:
base_array (2D numpy array): 2D numpy array with poligon filled by ones
'''
def check(p1, p2, arr):
"""
Uses the line defined by p1 and p2 to check array of
input indices against interpolated value
Returns boolean array, with True inside and False outside of shape
"""
# Create 3D array of indices
idxs = np.indices(arr.shape)
p1 = p1[::-1].astype(float)
p2 = p2[::-1].astype(float)
# Calculate max column idx for each row idx based on interpolated line between two points
if p1[0] == p2[0]:
max_col_idx = (idxs[0] - p1[0]) * idxs.shape[1]
sign = np.sign(p2[1] - p1[1])
else:
max_col_idx = (idxs[0] - p1[0]) / (p2[0] - p1[0]) * (p2[1] - p1[1]) + p1[1]
sign = np.sign(p2[0] - p1[0])
return idxs[1] * sign <= max_col_idx * sign
base_array = np.zeros(shape, dtype=float) # Initialize your array of zeros
# Initialize boolean array defining shape fill
fill = np.ones(base_array.shape) * True
# Create check array for each edge segment, combine into fill array
for k in range(vertices.shape[0]):
fill = np.all([fill, check(vertices[k - 1], vertices[k], base_array)], axis=0)
# Set all values inside polygon to one
base_array[fill] = 1
return base_array
def point_inside_polygon(x: int, y: int, poly: list[tuple(int, int)] , include_edges: bool = True) -> bool:
'''
Test if point (x,y) is inside polygon poly
Point is inside polygon if horisontal beam to the right
from point crosses polygon even number of times. Works fine for non-convex polygons.
From: https://stackoverflow.com/questions/39660851/deciding-if-a-point-is-inside-a-polygon
Args:
x (int): horizontal point coordinate
y (int): vertical point coordinate
poly (list[tuple(int, int)]): N-vertices polygon defined as [(x1,y1),...,(xN,yN)] or [(x1,y1),...,(xN,yN),(x1,y1)]
Returns:
inside (bool): if point inside the polygon
'''
n = len(poly)
inside = False
p1x, p1y = poly[0]
for i in range(1, n + 1):
p2x, p2y = poly[i % n]
if p1y == p2y:
if y == p1y:
if min(p1x, p2x) <= x <= max(p1x, p2x):
# point is on horisontal edge
inside = include_edges
break
# point is to the left from current edge
elif x < min(p1x, p2x):
inside = not inside
else: # p1y!= p2y
if min(p1y, p2y) <= y <= max(p1y, p2y):
xinters = (y - p1y) * (p2x - p1x) / float(p2y - p1y) + p1x
# point is right on the edge
if x == xinters:
inside = include_edges
break
# point is to the left from current edge
if x < xinters:
inside = not inside
p1x, p1y = p2x, p2y
return inside
def triangulate_points(calibration_data: dict, image1_points: np.ndarray, image2_points: np.ndarray) -> tuple[np.ndarray, float, float, np.ndarray, np.ndarray]:
'''
Triangulate two set of 2D point in one set of 3D points
Args:
calibration_data (dictionary): calibration data used for triangulating
image1_points (numpy arrray [N, 2]): first set of 2D points
image2_points (numpy arrray [N, 2]): second set of 2D points
Returns:
points_3d (numpy arrray [N, 3]): triangulated 3D points
rms1 (float): overall reprojection error for first camera
rms2 (float): overall reprojection error for second camera
reproj_err1, reproj_err2 (numpy arrray [N]): reprojected error for each triangulated point for first and second camera
'''
# Calculate the projective matrices according to the stereo calibration data
cam1_mtx = np.array(calibration_data['camera_0']['mtx'])
cam2_mtx = np.array(calibration_data['camera_1']['mtx'])
dist1_mtx = np.array(calibration_data['camera_0']['dist'])
dist2_mtx = np.array(calibration_data['camera_1']['dist'])
# Calculate projective matrices for cameras
proj_mtx_1 = np.dot(cam1_mtx, np.hstack((np.identity(3), np.zeros((3,1)))))
proj_mtx_2 = np.dot(cam2_mtx, np.hstack((calibration_data['R'], calibration_data['T'])))
# Undistort 2d points
points_2d_1 = np.array(image1_points, dtype=np.float32)
points_2d_2 = np.array(image2_points, dtype=np.float32)
undist_points_2d_1 = cv2.undistortPoints(points_2d_1, cam1_mtx, dist1_mtx, P=cam1_mtx)
undist_points_2d_2 = cv2.undistortPoints(points_2d_2, cam2_mtx, dist2_mtx, P=cam2_mtx)
# Calculate the triangulation of 3D points
points_hom = cv2.triangulatePoints(proj_mtx_1, proj_mtx_2, undist_points_2d_1, undist_points_2d_2)
points_3d = cv2.convertPointsFromHomogeneous(points_hom.T)
points_3d = np.reshape(points_3d, (points_3d.shape[0], points_3d.shape[2]))
# Reproject triangulated points
reproj_points, _ = cv2.projectPoints(points_3d, np.identity(3), np.zeros((3,1)), cam1_mtx, dist1_mtx)
reproj_points2, _ = cv2.projectPoints(points_3d, np.array(calibration_data['R']), np.array(calibration_data['T']), cam2_mtx, dist2_mtx)
# Calculate reprojection error
reproj_err1 = np.sum(np.square(points_2d_1[:,np.newaxis,:] - reproj_points), axis=2)
rms1 = np.sqrt(np.sum(reproj_err1)/reproj_points.shape[0])
reproj_err2 = np.sum(np.square(points_2d_2[:,np.newaxis,:] - reproj_points2), axis=2)
rms2 = np.sqrt(np.sum(reproj_err2)/reproj_points.shape[0])
reproj_err1 = np.reshape(reproj_err1, (reproj_err1.shape[0]))
reproj_err2 = np.reshape(reproj_err2, (reproj_err2.shape[0]))
return points_3d, rms1, rms2, reproj_err1, reproj_err2
def calculate_bilinear_interpolation_coeficients(points: tuple[tuple]) -> np.ndarray:
'''
Calculate coeficients for bilinear interploation of 2d data. Bilinear interpolation is defined as
polinomal fit f(x0, y0) = a0 + a1 * x0 + a2 * y0 + a3 * x0 * y0. Equations is used from wiki:
https://en.wikipedia.org/wiki/Bilinear_interpolation
Args:
points (tuple[tuple]): four elements in format (x, y, f(x, y))
Returns:
bilinear_coeficients (numpy array): four coeficients for bilinear interploation for input points
'''
# Sort points
points = sorted(points)
# Get x, y coordinates and values for this points
(x1, y1, q11), (_, y2, q12), (x2, _, q21), (_, _, q22) = points
# Get matrix A
A = np.array(
[
[x2 * y2, -x2 * y1, -x1 * y2, x1 * y1],
[-y2, y1, y2, -y1],
[-x2, x2, x1, -x1],
[1, -1, -1, 1],
]
)
# Get vector B
B = np.array([q11, q12, q21, q22])
# Calculate coeficients for bilinear interploation
bilinear_coeficients = (1 / ((x2 - x1) * (y2 - y1))) * A.dot(B)
return bilinear_coeficients
def bilinear_phase_fields_approximation(p: tuple[float, float], *data: tuple) -> tuple[float, float]:
'''
Calculate residiuals for bilinear interploation of horizontal and vertical phase fields.
Function is used in find_phasogrammetry_corresponding_point fsolve function.
Args:
p (tuple[float, float]): x and y coordinates of point in which residiual is calculated
data (tuple): data to calculate residiuals
- a (numpy array): four coeficients which defines linear interploation for horizontal phase field
- b (numpy array): four coeficients which defines linear interploation for vertical phase field
- p_h (float): horizontal phase to match in interplotated field
- p_v (float): vertical phase to match in interplotated field
Returns:
res_h, res_v (tuple[float, float]): residiuals for horizontal and vertical field in point (x, y)
'''
x, y = p
a, b, p_h, p_v = data
return (
a[0] + a[1] * x + a[2] * y + a[3] * x * y - p_h,
b[0] + b[1] * x + b[2] * y + b[3] * x * y - p_v,
)
def find_phasogrammetry_corresponding_point(p1_h: np.ndarray, p1_v: np.ndarray, p2_h: np.ndarray, p2_v: np.ndarray, x: int, y: int, LUT:list[list[list[int]]]=None) -> tuple[float, float]:
'''
Finds the corresponding point coordinates for the second image using the phasogrammetry approach
For the given coordinates x and y, the phase values on the fields for the vertical and horizontal fringes
for the images of the first camera are determined. Then two isolines with defined values of the phase on
the corresponding fields for the second camera are found. The intersection of the isolines gives the
coordinates of the corresponding point on the image from the second camera.
Args:
p1_h (numpy array): phase field for horizontal fringes for first camera
p1_v (numpy array): phase field for vertical fringes for first camera
p2_h (numpy array): phase field for horizontal fringes for second camera
p2_v (numpy array): phase field for vertical fringes for second camera
x (int): horizontal coordinate of point for first camera
y (int): vertical coordinate of point for first camera
Returns:
x2, y2 (tuple[float, float]): horizontal and vertical coordinate of corresponding point for second camera
'''
# Get the phase values on vertical and horizontal phase fields
phase_h = p1_h[y, x]
phase_v = p1_v[y, x]
retval = [np.inf, np.inf]
# If LUT available calculate corresponding points with it
if LUT is not None:
# Get value for x, y coordinate from LUT as first approximation
try:
phase_h_index = np.argmin(np.abs(LUT[-2] - phase_h))
phase_v_index = np.argmin(np.abs(LUT[-1] - phase_v))
except:
# phases values not found in LUT
return -1, -1, retval
cor_points = LUT[phase_v_index][phase_h_index]
cor_points = np.array(cor_points)
if len(cor_points) > 0 and len(cor_points) < 20:
# Get mean value for x, y coordinate for points from LUT as second approximation
x0, y0 = np.mean(cor_points, axis=0)
# Get x, y coordinate from point with minimum phase difference as second approximation
p2_h_d = np.abs(p2_h[cor_points[:,1], cor_points[:,0]] - phase_h)
p2_v_d = np.abs(p2_v[cor_points[:,1], cor_points[:,0]] - phase_v)
x0, y0 = cor_points[np.argmin(p2_h_d + p2_v_d)]
iter_num = 0
# Iterate thru variants of x and y where fields are near to phase_v and phase_h
while iter_num < 5:
# Get neareast coords to current values of x and y
if int(np.round(x0)) - x0 == 0:
x1 = int(x0 - 1)
x2 = int(x0 + 1)
else:
x1 = int(np.floor(x0))
x2 = int(np.ceil(x0))
if int(np.round(y0)) - y0 == 0:
y1 = int(y0 - 1)
y2 = int(y0 + 1)
else:
y1 = int(np.floor(y0))
y2 = int(np.ceil(y0))
# Check if coords are on field (are positive and less than field shape)
if x1 > 0 and x2 > 0 and y1 > 0 and y2 > 0 and x1 < p1_h.shape[1] and x2 < p1_h.shape[1] and y1 < p1_h.shape[0] and y2 < p1_h.shape[0]:
# Get coeficients for bilinear interploation for horizontal phase
aa = calculate_bilinear_interpolation_coeficients(((x1, y1, p2_h[y1, x1]), (x1, y2, p2_h[y2, x1]),
(x2, y2, p2_h[y2, x2]), (x2, y1, p2_h[y2, x1])))
# Get coeficients for bilinear interploation for vertical phase
bb = calculate_bilinear_interpolation_coeficients(((x1, y1, p2_v[y1, x1]), (x1, y2, p2_v[y2, x1]),
(x2, y2, p2_v[y2, x2]), (x2, y1, p2_v[y2, x1])))
# Find there bilinear interploation is equal to phase_h and phase_v
x0, y0 = fsolve(bilinear_phase_fields_approximation, (x1, y1), args=(aa, bb, phase_h, phase_v))
# x0, y0 = minimize(
# bilinear_phase_fields_approximation,
# (x0, y0),
# args=(aa, bb, phase_h, phase_v),
# bounds=Bounds([x1, y1], [x2, y2]),
# method="Powell",
# ).x
# Calculate residiuals
h_res, v_res = bilinear_phase_fields_approximation((x0, y0), aa, bb, phase_h, phase_v)
# Check if x and y are between x1, x2, y1 and y2
if x2 >= x0 >= x1 and y2 >= y0 >= y1:
return x0, y0, [h_res, v_res]
else:
iter_num = iter_num + 1
else:
return -1, -1, [np.inf, np.inf]
return -1, -1, [np.inf, np.inf]
else:
return -1, -1, [np.inf, np.inf]
# Find coords of isophase curves
y_h, x_h = np.where(np.isclose(p2_h, phase_h, atol=10**-1))
y_v, x_v = np.where(np.isclose(p2_v, phase_v, atol=10**-1))
# Break if isoline not found
if y_h.size == 0 or y_v.size == 0:
return -1, -1
# A faster way to calculate using a flatten array
# _, yx_h = np.unravel_index(np.where(np.isclose(p2_h, p1_h[y, x], atol=10**-1)), p2_h.shape)
# _, yx_v = np.unravel_index(np.where(np.isclose(p2_v, p1_v[y, x], atol=10**-1)), p2_v.shape)
# Find ROI of coords for intersection
y_h_min = np.min(y_h)
y_h_max = np.max(y_h)
x_v_min = np.min(x_v)
x_v_max = np.max(x_v)
# Apply ROI for coords of isophase curves
y_h = y_h[(x_h >= x_v_min) & (x_h <= x_v_max)]
x_h = x_h[(x_h >= x_v_min) & (x_h <= x_v_max)]
x_v = x_v[(y_v >= y_h_min) & (y_v <= y_h_max)]
y_v = y_v[(y_v >= y_h_min) & (y_v <= y_h_max)]
# Break if too much points in isophase line
if len(y_h) > 500 or len(y_v) > 500:
return -1, -1
# Break if no points found
if x_h.size == 0 or x_v.size == 0:
return -1, -1
# Reshape coords to use broadcasting
x_h = x_h[:, np.newaxis]
y_h = y_h[:, np.newaxis]
y_v = y_v[np.newaxis, :]
x_v = x_v[np.newaxis, :]
# Calculate distance between points in coords
distance = np.sqrt((x_h - x_v) ** 2 + (y_h - y_v) ** 2)
# Find indicies of minimum distance
i_h_min, i_v_min = np.where(distance == distance.min())
i_v_min = i_v_min[0]
i_h_min = i_h_min[0]
# A faster way to calculate using a flatten array
# i_h_min, i_v_min = np.unravel_index(np.where(distance.ravel()==distance.min()), distance.shape)
# i_v_min = i_v_min[0][0]
# i_h_min = i_h_min[0][0]
x2, y2 = ((x_v[0, i_v_min] + x_h[i_h_min, 0]) / 2, (y_v[0, i_v_min] + y_h[i_h_min, 0]) / 2)
return x2, y2
def get_phasogrammetry_correlation(p1_h: np.ndarray, p1_v: np.ndarray, p2_h: np.ndarray, p2_v: np.ndarray, x1: int, y1: int, x2: int, y2: int, window_size: int) -> np.ndarray:
'''
Calculate correlation function for horizontal and vertical phase fields
Args:
p1_h (numpy array): phase field for horizontal fringes for first camera
p1_v (numpy array): phase field for vertical fringes for first camera
p2_h (numpy array): phase field for horizontal fringes for second camera
p2_v (numpy array): phase field for vertical fringes for second camera
x (int): horizontal coordinate of point for first camera
y (int): vertical coordinate of point for first camera
window_size (int): size of window to calculate correlation function
Returns:
corelation_field (numpy array): calculated correlation field
'''
p1_h_ij = p1_h[int(y1 - window_size//2):int(y1 + window_size//2), int(x1 - window_size//2):int(x1 + window_size//2)]
p1_v_ij = p1_v[int(y1 - window_size//2):int(y1 + window_size//2), int(x1 - window_size//2):int(x1 + window_size//2)]
p1_h_m = np.mean(p1_h_ij)
p1_v_m = np.mean(p1_v_ij)
t1_h = (p1_h_ij - p1_h_m) ** 2
t1_v = (p1_v_ij - p1_v_m) ** 2
corelation_field = np.zeros((window_size, window_size))
xx = np.linspace(x2 - window_size // 2, x2 + window_size // 2, window_size)
yy = np.linspace(y2 - window_size // 2, y2 + window_size // 2, window_size)
for j in range(yy.shape[0]):
for i in range(xx.shape[0]):
x0 = xx[i]
y0 = yy[j]
p2_h_ij = p2_h[int(y0 - window_size //2):int(y0 + window_size //2), int(x0 - window_size//2):int(x0 + window_size//2)]
p2_v_ij = p2_v[int(y0 - window_size //2):int(y0 + window_size //2), int(x0 - window_size//2):int(x0 + window_size//2)]
p2_h_m = np.mean(p2_h_ij)
p2_v_m = np.mean(p2_v_ij)
t2_h = (p2_h_ij - p2_h_m) ** 2
t2_v = (p2_v_ij - p2_v_m) ** 2
if p2_h_ij.size == p1_h_ij.size and p2_v_ij.size == p1_v_ij.size:
t = np.sum(t1_h * t1_v * t2_h * t2_v) / np.sqrt(np.sum(t1_h * t1_v) * np.sum(t2_h * t2_v))
# if t < 1:
corelation_field[j, i] = t
# Find maximum indexes for x and y
maximum = np.unravel_index(corelation_field.argmax(), corelation_field.shape)
# Get neighborhood pixels of maximum at X axis
cx0 = np.fabs(corelation_field[maximum[0], maximum[1] - 1])
cx1 = np.fabs(corelation_field[maximum[0], maximum[1] ])
if maximum[1] == corelation_field.shape[1]:
cx2 = np.fabs(corelation_field[maximum[0], maximum[1] + 1])
else:
cx2 = np.fabs(corelation_field[maximum[0], 0])
# Get neighborhood pixels of maximum at Y axis
cy0 = np.fabs(corelation_field[maximum[0] - 1, maximum[1]])
cy1 = np.fabs(corelation_field[maximum[0] , maximum[1]])
if maximum[0] == corelation_field.shape[0]:
cy2 = np.fabs(corelation_field[maximum[0] + 1, maximum[1]])
else:
cy2 = np.fabs(corelation_field[0, maximum[1]])
# 3-point gauss fit
try:
x_max = maximum[1] + (np.log(np.abs(cx0)) - np.log(np.abs(cx2)))/(2 * np.log(np.abs(cx0)) - 4 * np.log(np.abs(cx1)) + 2 * np.log(np.abs(cx2)))
except (ZeroDivisionError, ValueError):
x_max = 0
try:
y_max = maximum[0] + (np.log(np.abs(cy0)) - np.log(np.abs(cy2)))/(2 * np.log(np.abs(cy0)) - 4 * np.log(np.abs(cy1)) + 2 * np.log(np.abs(cy2)))
except (ZeroDivisionError, ValueError):
y_max = 0
# Shift maximum due to pereodic of correlation function
if x_max > corelation_field.shape[0] / 2:
x_max = x_max - corelation_field.shape[0]
elif np.fabs(x_max) < 0.01:
x_max = 0
# Shift maximum due to pereodic of correlation function
if y_max > corelation_field.shape[1] / 2:
y_max = y_max - corelation_field.shape[1]
elif np.fabs(y_max) < 0.01:
y_max = 0
if not np.isnan(x_max) and not np.isnan(y_max):
return x2 + x_max, y2 + y_max
else:
return -1, -1
def get_phase_field_ROI(fpp_measurement: FPPMeasurement, signal_to_nose_threshold: float = 0.25):
'''
Get ROI for FPP measurement with the help of signal to noise thresholding.
ROI is stored as a mask (fpp_measurement.signal_to_noise_mask) with values 0 for points
with signal-to-noise ratio below threshold and 1 for points with ratio above threshold.
Additionally ROI is stored as a quadrangle defined by four points consisting of minimum
and maximum x and y coordinates for points with signal-to-noise ratio above the threshold.
Calculated ROI will be stored in input fpp_measurement argument.
Args:
fpp_measurement (FPPMeasurement): FPP measurment for calcaulating ROI
signal_to_nose_threshold (float) = 0.25: threshold for signal to noise ratio to calcaulate ROI
'''
# For each camera result
for cam_result in fpp_measurement.camera_results:
# Calculate signal to noise ratio
signal_to_nose = cam_result.modulated_intensities[-1] / cam_result.average_intensities[-1]
# Threshold signal to noise with defined threshold level
thresholded_coords = np.argwhere(signal_to_nose > signal_to_nose_threshold)
# Store ROI mask
cam_result.signal_to_noise_mask = np.zeros(signal_to_nose.shape, dtype=int)
cam_result.signal_to_noise_mask[signal_to_nose > signal_to_nose_threshold] = 1
# Determine four points around thresholded area
x_min = np.min(thresholded_coords[:, 1])
x_max = np.max(thresholded_coords[:, 1])
y_min = np.min(thresholded_coords[:, 0])
y_max = np.max(thresholded_coords[:, 0])
# Store determined ROI
cam_result.ROI = np.array([[x_min, y_min], [x_max, y_min], [x_max, y_max], [x_min, y_max]])
def get_phase_field_LUT(measurement: FPPMeasurement) -> list[list[list]]:
'''
Get LUT for horizontal and vertical phase field to increase phasogrammetry calculation speed.
LUT is a two-dimensional array of coordinates whose indices correspond to the values of horizontal
and vertical phase in these coordinates. Knowing the values of the horizontal and vertical phase,
you can quickly find the coordinates of points with these values.
The LUT is a list of lists of lists of two-dimensional coordinates.
Args:
measurement (FPPMeasurement): FPP measurment with horizontal and vertical fringes
Returns:
LUT (list[list[list]]): LUT structure containing the coordinates of points for the horizontal and vertical phase values
'''
assert len(measurement.camera_results) == 4, 'It should be four (4) camera results in camera_results of fpp_measurement'
# Get horizontal and vertical unwrapped phases for second camera
cam1_meas_h = measurement.camera_results[2]
cam1_meas_v = measurement.camera_results[0]
cam2_meas_h = measurement.camera_results[3]
cam2_meas_v = measurement.camera_results[1]
p_h = cam2_meas_h.unwrapped_phases[-1]
p_v = cam2_meas_v.unwrapped_phases[-1]
# Find range for horizontal and vertical phase
if cam1_meas_h.ROI_mask is not None:
# If ROI mask defined - get min max phases from ROI area for first camera
p_h1 = cam1_meas_h.unwrapped_phases[-1][cam1_meas_h.ROI_mask == 1]
p_v1 = cam1_meas_v.unwrapped_phases[-1][cam1_meas_v.ROI_mask == 1]
ph_max = np.max(p_h1)
ph_min = np.min(p_h1)
pv_max = np.max(p_v1)
pv_min = np.min(p_v1)
else:
ph_max = np.max(p_h)
ph_min = np.min(p_h)
pv_max = np.max(p_v)
pv_min = np.min(p_v)
step = 1.0
h_range = np.arange(ph_min, ph_max, step)
v_range = np.arange(pv_min, pv_max, step)
# Determine size of LUT structure
w, h = h_range.shape[0] + 1, v_range.shape[0] + 1
# Create LUT structure
LUT = [[[] for x in range(w)] for y in range(h)]
w = p_h.shape[1]
h = p_h.shape[0]
# Phase rounding with an offset so that they start from zero
p_h_r = np.round((p_h - ph_min) / step).astype(int).tolist()
p_v_r = np.round((p_v - pv_min) / step).astype(int).tolist()
# Fill LUT with coordinates of points with horizontal and vertical values as indicies
for y in range(h):
for x in range(w):
if cam2_meas_h.signal_to_noise_mask[y, x] == 1 and cam2_meas_v.signal_to_noise_mask[y, x] == 1:
if (pv_max - pv_min) / step >= p_v_r[y][x] >= 0 and (ph_max - ph_min) / step >= p_h_r[y][x] >= 0:
LUT[p_v_r[y][x]][p_h_r[y][x]].append([x, y])
# Add range of horizontal and vertical phases at the end of LUT
LUT.append(h_range)
LUT.append(v_range)
return LUT
def process_fppmeasurement_with_phasogrammetry(measurement: FPPMeasurement, step_x: float, step_y: float, LUT:list[list[list[int]]]=None) -> tuple[np.ndarray, np.ndarray]:
'''
Find 2D corresponding points for two phase fields sets with phasogrammetry approach
Args:
measurement (FPPMeasurement): FPPMeasurement instances with horizontal and vertical phase field for two cameras
step_x, step_y (float): horizontal and vertical steps to calculate corresponding points
LUT (list[list[list]]): LUT structure containing the coordinates of points for the horizontal and vertical phase values
Returns:
points_1 (numpy array [N, 2]): corresponding 2D points from first camera
points_2 (numpy array [N, 2]): corresponding 2D points from second camera
'''
# Take phases with highest frequencies
p1_h = measurement.camera_results[2].unwrapped_phases[-1]
p2_h = measurement.camera_results[3].unwrapped_phases[-1]
p1_v = measurement.camera_results[0].unwrapped_phases[-1]
p2_v = measurement.camera_results[1].unwrapped_phases[-1]
# Get ROI from measurement object
ROI1 = measurement.camera_results[0].ROI
# Cut ROI from phase fields for second camera
ROIx = slice(0, measurement.camera_results[1].unwrapped_phases[-1].shape[1])
ROIy = slice(0, measurement.camera_results[1].unwrapped_phases[-1].shape[0])
p2_h = p2_h[ROIy, ROIx]
p2_v = p2_v[ROIy, ROIx]
# Calculation of the coordinate grid on first image
xx = np.arange(0, p1_h.shape[1], step_x, dtype=np.int32)
yy = np.arange(0, p1_h.shape[0], step_y, dtype=np.int32)
coords1 = []
for y in yy:
for x in xx:
if measurement.camera_results[0].ROI_mask[y, x] == 1:
coords1.append((x, y))
coords2 = []
errors = []
coords_to_delete = []
if config.USE_MULTIPROCESSING:
# Use parallel calaculation to increase processing speed
with multiprocessing.Pool(config.POOLS_NUMBER) as p:
coords2 = p.starmap(find_phasogrammetry_corresponding_point, [(p1_h, p1_v, p2_h, p2_v, coords1[i][0], coords1[i][1], LUT) for i in range(len(coords1))])
# Search for corresponding points not found
for i in range(len(coords2)):
if coords2[i][0] < 0 and coords2[i][1] < 0:
coords_to_delete.append(i)
else:
# Add ROI left and top coordinates
coords2[i] = (coords2[i][0] + ROIx.start, coords2[i][1] + ROIy.start)
# If no point found, delete coordinate from grid
for index in reversed(coords_to_delete):
coords1.pop(index)
coords2.pop(index)
else:
for i in range(len(coords1)):
# Find corresponding point coordinate on second image
x, y, err = find_phasogrammetry_corresponding_point(p1_h, p1_v, p2_h, p2_v, coords1[i][0], coords1[i][1], LUT)
# If no point found, delete coordinate from grid
if (x == -1 and y == -1) or (abs(err[0]) > 0.1 or abs(err[1]) > 0.1):
coords_to_delete.append(i)
else:
coords2.append((x + ROIx.start, y + ROIy.start))
errors.append(err)
# Delete point in grid with no coresponding point on second image
for index in reversed(coords_to_delete):
coords1.pop(index)
# Form a set of coordinates of corresponding points on the first and second images
image1_points = []
image2_points = []
distance = []
for point1, point2 in zip(coords1, coords2):
image1_points.append([point1[0], point1[1]])
image2_points.append([point2[0], point2[1]])
distance.append(((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2)**0.5)
# Remove outliers
std_d = np.std(distance)
indicies_to_delete = [i for i in range(len(distance)) if distance[i] > std_d * 10]
for index in reversed(indicies_to_delete):
image1_points.pop(index)
image2_points.pop(index)
errors.pop(index)
# Convert list to array before returning result from function
image1_points = np.array(image1_points, dtype=np.float32)
image2_points = np.array(image2_points, dtype=np.float32)
errors = np.array(errors, dtype=np.float32)
return image1_points, image2_points, errors
def calculate_displacement_field(field1: np.ndarray, field2: np.ndarray, win_size_x: int, win_size_y: int, step_x: int, step_y: int) -> np. ndarray:
'''
Calculate displacement field between two scalar fields thru correlation.
Args:
field1 (2D numpy array): first scalar field
field2 (2D numpy array): second scalar field
win_size_x (int): interrogation window horizontal size
win_size_y (int): interrogation window vertical size
step_x (int): horizontal step for dividing on interrogation windows
step_y (int): vertical step for dividing on interrogation windows
Returns:
vector_field (): vector field of displacements
'''
assert field1.shape == field2.shape, 'Shapes of field1 and field2 must be equals'
assert win_size_x > 4 and win_size_y > 4, 'Size of interrogation windows should be greater than 4 pixels'
assert step_x > 0 and step_y > 0, 'Horizontal and vertical steps should be greater than zero'
# Get interrogation windows
list_of_windows = [[], []]
list_of_coords = []
width = field1.shape[1]
height = field1.shape[0]
num_win_x = range(int(np.floor((width - win_size_x) / step_x + 1)))
num_win_y = range(int(np.floor((height - win_size_y) / step_y + 1)))
for i in num_win_x:
start_x = step_x * i
end_x = step_x * i + win_size_x
center_x = np.round(end_x - win_size_x / 2)
for j in num_win_y:
start_y = step_y * j
end_y = step_y * j + win_size_y
center_y = np.round(end_y - win_size_y / 2)
window1 = field1[start_y:end_y, start_x:end_x]
window2 = field2[start_y:end_y, start_x:end_x]
list_of_windows[0].append(window1)
list_of_windows[1].append(window2)
list_of_coords.append([center_x, center_y])
# Calculate correlation function
correlation_list = []
# Create 2D Gauss kernel
gauss = np.outer(signal.windows.gaussian(win_size_x, win_size_x / 2),
signal.windows.gaussian(win_size_y, win_size_y / 2))
for i in range(len(list_of_windows[0])):
# Windowing interrogation windows
list_of_windows[0][i] = list_of_windows[0][i] * gauss
list_of_windows[1][i] = list_of_windows[1][i] * gauss
mean1 = np.mean(list_of_windows[0][i])
std1 = np.std(list_of_windows[0][i])
mean2 = np.mean(list_of_windows[1][i])
std2 = np.std(list_of_windows[1][i])
a = np.fft.rfft2(list_of_windows[0][i] - mean1, norm='ortho')
b = np.fft.rfft2(list_of_windows[1][i] - mean2, norm='ortho')
c = np.multiply(a, b.conjugate())
d = np.fft.irfft2(c)
if std1 == 0:
std1 = 1
if std2 == 0:
std2 = 1
e = d / (std1 * std2)
correlation_list.append(e)
# Find maximums
maximums_list = []
for i in range(len(correlation_list)):
# Find maximum indexes for x and y
maximum = np.unravel_index(correlation_list[i].argmax(), correlation_list[i].shape)
# Get neighborhood pixels of maximum at X axis
cx0 = np.fabs(correlation_list[i][maximum[0], maximum[1] - 1])
cx1 = np.fabs(correlation_list[i][maximum[0], maximum[1] ])
if maximum[1] == correlation_list[i].shape[1]:
cx2 = np.fabs(correlation_list[i][maximum[0], maximum[1] + 1])
else:
cx2 = np.fabs(correlation_list[i][maximum[0], 0])
# Get neighborhood pixels of maximum at Y axis
cy0 = np.fabs(correlation_list[i][maximum[0] - 1, maximum[1]])
cy1 = np.fabs(correlation_list[i][maximum[0] , maximum[1]])
if maximum[0] == correlation_list[i].shape[0]:
cy2 = np.fabs(correlation_list[i][maximum[0] + 1, maximum[1]])
else:
cy2 = np.fabs(correlation_list[i][0, maximum[1]])
# 3-point gauss fit
try:
x_max = maximum[1] + (np.log(np.abs(cx0)) - np.log(np.abs(cx2)))/(2 * np.log(np.abs(cx0)) - 4 * np.log(np.abs(cx1)) + 2 * np.log(np.abs(cx2)))
except (ZeroDivisionError, ValueError):
x_max = 0
try:
y_max = maximum[0] + (np.log(np.abs(cy0)) - np.log(np.abs(cy2)))/(2 * np.log(np.abs(cy0)) - 4 * np.log(np.abs(cy1)) + 2 * np.log(np.abs(cy2)))
except (ZeroDivisionError, ValueError):
y_max = 0
# Shift maximum due to pereodic of correlation function
if x_max > correlation_list[i].shape[0] / 2:
x_max = x_max - correlation_list[i].shape[0]
elif np.fabs(x_max) < 0.01:
x_max = 0
# Shift maximum due to pereodic of correlation function
if y_max > correlation_list[i].shape[1] / 2:
y_max = y_max - correlation_list[i].shape[1]
elif np.fabs(y_max) < 0.01:
y_max = 0
# Not actual maximum value
maximums_list.append([x_max, y_max, np.max(correlation_list[i])])
# Create vector field
vector_field = []
return np.array(list_of_coords), np.array(maximums_list)