【线】轨迹相似度

动态时间调整算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import numpy as np

float_formatter = lambda x: "%.2f" % x
np.set_printoptions(formatter={'float_kind': float_formatter})

def DynamicTimeWarping(s1, s2):
l1 = len(s1)
l2 = len(s2)
paths = np.full((l1 + 1, l2 + 1), np.inf) # 全部赋予无穷大
paths[0, 0] = 0
for i in range(l1):
for j in range(l2):
d = s1[i] - s2[j]
cost = d ** 2
paths[i + 1, j + 1] = cost + min(paths[i, j + 1], paths[i + 1, j], paths[i, j])

paths = np.sqrt(paths)
s = paths[l1, l2]
return s, paths.T

if __name__ == '__main__':
s1 = [1, 2, 0, 1, 1, 2]
s2 = [1, 0, 1]

distance, paths = DynamicTimeWarping(s1, s2)
print(paths)
print(distance)

动态时间调整算法(改进)

原始的动态时间调整算法对于具有周期性的序列距离计算不是很好,改进思路是求得一个惩罚系数α,这个α和原算法的距离相乘,得到更新后的距离。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import numpy as np

float_formatter = lambda x: "%.2f" % x
np.set_printoptions(formatter={'float_kind': float_formatter})

def DynamicTimeWarpingImprove(s1, s2):
# 取较大的标准差
sdt = np.std(s1, ddof=1) if np.std(s1, ddof=1) > np.std(s2, ddof=1) else np.std(s2, ddof=1)
# print("两个序列最大标准差:" + str(sdt))
l1 = len(s1)
l2 = len(s2)
paths = np.full((l1 + 1, l2 + 1), np.inf) # 全部赋予无穷大
sub_matrix = np.full((l1, l2), 0) # 全部赋予0
max_sub_len = 0

paths[0, 0] = 0
for i in range(l1):
for j in range(l2):
d = s1[i] - s2[j]
cost = d ** 2
paths[i + 1, j + 1] = cost + min(paths[i, j + 1], paths[i + 1, j], paths[i, j])
if np.abs(s1[i] - s2[j]) < sdt:
if i == 0 or j == 0:
sub_matrix[i][j] = 1
else:
sub_matrix[i][j] = sub_matrix[i - 1][j - 1] + 1
max_sub_len = sub_matrix[i][j] if sub_matrix[i][j] > max_sub_len else max_sub_len

paths = np.sqrt(paths)
s = paths[l1, l2]
return s, paths.T, [max_sub_len]

def calculate_attenuate_weight(seqLen1, seqLen2, com_ls):
weight = 0
for comlen in com_ls:
weight = weight + comlen / seqLen1 * comlen / seqLen2
return 1 - weight

if __name__ == '__main__':
# 测试数据
s1 = [1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1, 1, 2, 0, 1]
s2 = [0, 1, 1, 2, 0, 1, 1.7, 2, 0, 1, 1, 2, 0, 1, 1, 2]
s3 = [0.8, 1.5, 0, 1.2, 0, 0, 0.6, 1, 1.2, 0, 0, 1, 0.2, 2.4, 0.5, 0.4]

# 原始算法
distance12, paths12, max_sub12 = DynamicTimeWarpingImprove(s1, s2)
distance13, paths13, max_sub13 = DynamicTimeWarpingImprove(s1, s3)

print("更新前s1和s2距离:" + str(distance12))
print("更新前s1和s3距离:" + str(distance13))

# 衰减系数
weight12 = calculate_attenuate_weight(len(s1), len(s2), max_sub12)
weight13 = calculate_attenuate_weight(len(s1), len(s3), max_sub13)

# 更新距离
print("更新后s1和s2距离:" + str(distance12 * weight12))
print("更新后s1和s3距离:" + str(distance13 * weight13))

最长公共子序列算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def LongestCommonSubsequence(a, b):
lena = len(a)
lenb = len(b)
# c用来保存对应位置匹配的结果
c = [[0 for i in range(lenb + 1)] for j in range(lena + 1)]
# flag用来记录转移方向
flag = [[0 for i in range(lenb + 1)] for j in range(lena + 1)]
for i in range(lena):
for j in range(lenb):
if a[i] == b[j]: # 字符匹配成功,则该位置的值为左上方的值加1
c[i + 1][j + 1] = c[i][j] + 1
flag[i + 1][j + 1] = 'ok'
elif c[i + 1][j] > c[i][j + 1]: # 左值大于上值,则该位置的值为左值,并标记回溯时的方向
c[i + 1][j + 1] = c[i + 1][j]
flag[i + 1][j + 1] = 'left'
else: # 上值大于左值,则该位置的值为上值,并标记方向up
c[i + 1][j + 1] = c[i][j + 1]
flag[i + 1][j + 1] = 'up'

(p1, p2) = (len(a), len(b))
s = []
while c[p1][p2]: # 不为None时
t = flag[p1][p2]
if t == 'ok': # 匹配成功,插入该字符,并向左上角找下一个
s.append(a[p1-1])
p1 -= 1
p2 -= 1
if t == 'left': # 根据标记,向左找下一个
p2 -= 1
if t == 'up': # 根据标记,向上找下一个
p1 -= 1
s.reverse()
return s

if __name__ == '__main__':
a = [180, 180, 141, 146, 141, 200, 235, 235, 173, 172, 141, 141, 172, 180]
b = [165, 235, 180, 141, 240, 171, 173, 172, 141]

s = LongestCommonSubsequence(a, b)
print(s)
print(len(s)/min(len(a), len(b)))

弗雷歇距离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import numpy as np

class DiscretFrechetDistance:
def eucl_dist(self, x, y):
dist = np.linalg.norm(x-y)
return dist

def _c(self, dp, i, j, curve_a, curve_b):
if dp[i][j] > -1:
return dp[i][j]
elif i == 0 and j ==0:
dp[i][j] = self.eucl_dist(curve_a[0], curve_b[0])
elif i > 0 and j == 0:
dp[i][j] = max(self._c(dp, i-1, 0, curve_a, curve_b), self.eucl_dist(curve_a[i], curve_b[0]))
elif i == 0 and j > 0:
dp[i][j] = max(self._c(dp, 0, j-1, curve_a,curve_b), self.eucl_dist(curve_a[0],curve_b[j]))
elif i > 0 and j > 0:
dp[i][j] = max(min(self._c(dp, i-1, j, curve_a, curve_b), self._c(dp, i-1, j-1, curve_a, curve_b), self._c(dp, i, j-1, curve_a, curve_b)), self.eucl_dist(curve_a[i], curve_b[j]))
else:
dp[i][j] = float("inf")
return dp[i][j]

def main(self, curve_a, curve_b):
dp = np.ones((len(curve_a), len(curve_b)))
dp = np.multiply(dp, -1)
# dp = [[-1 for _ in range(len(curve_b))] for _ in range(len(curve_a))]
similarity = self._c(dp, len(curve_a)-1, len(curve_b)-1, curve_a, curve_b)
return similarity

if __name__ == '__main__':
a = [180, 180, 141, 146, 141, 200, 235, 235, 173, 172, 141, 141, 172, 180]
b = [180, 182, 141, 146, 149, 200, 235, 211, 173, 172, 130]

f = DiscretFrechetDistance()
s = f.main(a, b)
print(s)
0%