-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDeviationTimeWarp.py
84 lines (64 loc) · 3.17 KB
/
DeviationTimeWarp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import math
import vtk
from PythonMetricsCalculator import PerkEvaluatorMetric
class DeviationTimeWarp( PerkEvaluatorMetric ):
# This metric computes the average Dynamic Time Warping distance from the analyzed trajectory to a reference trajectory
# Use dynamic programming to get this to work in "real-time"
# AddTimestamp works in O( n ) time, where n is the number of points in the reference trajectory (which is fixed)
# Inspired by: Despinoy, F., Zemiti, N., Forestier, G. et al. Int J CARS (2018) 13: 13. https://doi.org/10.1007/s11548-017-1666-6.
# Divided by dtw path length: Sakoe, H., Chiba, S., IEEE Transactions on Acoustics, Speech, and Signal Processing, Volume: 26, Issue: 1, Feb 1978, Page(s): 43 - 49.
# Static methods
@staticmethod
def GetMetricName():
return "Deviation from Trajectory - Time Warp"
@staticmethod
def GetMetricUnit():
return "mm"
@staticmethod
def GetAnatomyRoles():
return { "Trajectory": "vtkMRMLSequenceNode" }
# Instance methods
def __init__( self ):
PerkEvaluatorMetric.__init__( self )
self.distanceMatrix = []
self.dtwDistance = 0
self.trajectory = None
self.dtwPathLength = 0
self.pointPrev = None
def SetAnatomy( self, role, node ):
if ( role == "Trajectory" ):
self.trajectory = node
return True
return False
def AddTimestamp( self, time, matrix, point, role ):
if ( self.trajectory is None ):
return
# Build up the matrix
newRow = [ 0 ] * self.trajectory.GetNumberOfDataNodes()
self.distanceMatrix.append( newRow )
i = len( self.distanceMatrix ) - 1
# Use dynamic programming to compute the next row
for j in range( self.trajectory.GetNumberOfDataNodes() ):
currTrajectoryNode = self.trajectory.GetNthDataNode( j )
currTrajectoryMatrix = vtk.vtkMatrix4x4()
currTrajectoryNode.GetMatrixTransformToWorld( currTrajectoryMatrix )
currTrajectoryPoint = [ currTrajectoryMatrix.GetElement( 0, 3 ), currTrajectoryMatrix.GetElement( 1, 3 ), currTrajectoryMatrix.GetElement( 2, 3 ) ]
currDistance = math.sqrt( vtk.vtkMath.Distance2BetweenPoints( point[ 0:3 ], currTrajectoryPoint ) )
if ( i == 0 ):
if ( j == 0 ):
self.distanceMatrix[ i ][ j ] = currDistance
else:
self.distanceMatrix[ i ][ j ] = currDistance + self.distanceMatrix[ i ][ j - 1 ]
else:
if ( j == 0 ):
self.distanceMatrix[ i ][ j ] = currDistance + self.distanceMatrix[ i - 1 ][ j ]
else:
self.distanceMatrix[ i ][ j ] = currDistance + min( self.distanceMatrix[ i ][ j - 1 ], self.distanceMatrix[ i - 1 ][ j ], self.distanceMatrix[ i - 1 ][ j - 1 ] )
self.dtwDistance = self.distanceMatrix[ i ][ self.trajectory.GetNumberOfDataNodes() - 1 ]
# Assume the dtw path length is the sum of the number of points in each sequence minus 1
# Note: this counts diagonal jumps as 2
self.dtwPathLength = len( self.distanceMatrix ) + len( self.distanceMatrix[ i ] ) - 1
def GetMetric( self ):
if ( self.dtwPathLength != 0 ):
return self.dtwDistance / self.dtwPathLength
return 0