非监督学习之聚类算法(6)-- 基于核函数

核函数是能够将低维不可分的数据映射到高维空间进行线性可分时能够降低数据处理难度的重要手段。核函数的本质就是一种将一个空间转化为另一个空间的变化。
核函数聚类源于支持向量机(SVM)理论,主要思想是引入基于核函数的相似性测度,从而分辨、提取并放大有用的特征,实现更为准确的聚类。
核聚类方法是普适的,并在性能上优于经典的聚类算法;同时,算法的收敛速度也较快。在经典聚类算法失效的情况下,核聚类算法仍能够得到正确的聚类。

mean-shift算法

均值漂移算法(MeanShift)是一种旨在发现团(blobs)的聚类算法。核心思想是寻找核密度极值点并作为簇的质心,然后根据最近邻原则将样本点赋予质心

1、执行步骤

Step1:获取可以作为起始质心的样本点
Step2:对每个起始质心进行漂移,漂移终止条件就是漂移距离小于epsilon。若漂移结束最终的质心与已存在的质心距离小于带宽则合并。
Step3:分类。将样本点归属到距离最近的质心中。

2、优缺点

(1)优点

  • 不需要事先制定聚类数量
  • 对聚类的形状没有限制
  • 对异常值不敏感

(2)缺点

  • 因为无法控制簇个数,可能会产生过多的簇而导致聚类失去意义
  • 时间复杂度较高O(n²),计算开销大
  • 带宽参数的选择影响很大,带宽选的小时收敛速度慢;选的大时虽然会加速收敛但是聚类效果不会很好

3、代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# coding=utf-8
'''
meanshift算法
'''
from collections import defaultdict
from sklearn.datasets import make_blobs
import matplotlib.pyplot as plt
from itertools import cycle
import numpy as np
import math

#==============================================
# 数据可视化
#==============================================

def visualize(data, labels):
color = 'bgrymk'
unique_label = np.unique(labels)
for col, label in zip(cycle(color), unique_label):
partial_data = data[np.where(labels == label)]
plt.scatter(partial_data[:, 0], partial_data[:, 1], color=col)
plt.show()
return

#==============================================
# 算法核心部分
#==============================================

class MeanShift:
'''
均值漂移算法
'''
def __init__(self, band_width=2.0, min_fre=3, epsilon=None, bin_seeding=False, bin_size=None):
self.epsilon = epsilon if epsilon else 1e-3 * band_width
self.bin_size = bin_size if bin_size else self.band_width
self.band_width = band_width
self.min_fre = min_fre # 可以作为起始质心的球体内最少的样本数目
self.bin_seeding = bin_seeding
self.radius2 = self.band_width ** 2 # 高维球体半径的平方

self.N = None
self.labels = None
self.centers = []

def init_param(self, data):
# 初始化参数
self.N = data.shape[0]
self.labels = -1 * np.ones(self.N)
return

def get_seeds(self, data):
# 获取可以作为起始质心的点(seed)
if not self.bin_seeding:
return data
seed_list = []
seeds_fre = defaultdict(int)
for sample in data:
seed = tuple(np.round(sample / self.bin_size)) # 将数据粗粒化,以防止非常近的样本点都作为起始质心
seeds_fre[seed] += 1
for seed, fre in seeds_fre.items():
if fre >= self.min_fre:
seed_list.append(np.array(seed))
if not seed_list:
raise ValueError('the bin size and min_fre are not proper')
if len(seed_list) == data.shape[0]:
return data
return np.array(seed_list) * self.bin_size

def euclidean_dis2(self, center, sample):
# 计算均值点到每个样本点的欧式距离(平方)
delta = center - sample
return delta @ delta

def gaussian_kel(self, dis2):
# 计算高斯核
return 1.0 / self.band_width * (2 * math.pi) ** (-1.0 / 2) * math.exp(-dis2 / (2 * self.band_width ** 2))

def shift_center(self, current_center, data):
# 计算下一个漂移的坐标
denominator = 0 # 分母
numerator = np.zeros_like(current_center) # 分子, 一维数组形式
for sample in data:
dis2 = self.euclidean_dis2(current_center, sample)
if dis2 <= self.radius2:
d = self.gaussian_kel(dis2)
denominator += d
numerator += d * sample
if denominator > 0:
return numerator / denominator
else:
return None

def classify(self, data):
# 根据最近邻将数据分类到最近的簇中
center_arr = np.array(self.centers)
for i in range(self.N):
delta = center_arr - data[i]
dis2 = np.sum(delta * delta, axis=1)
self.labels[i] = np.argmin(dis2)
return

def fit(self, data):
# 训练主函数
self.init_param(data)
seed_list = self.get_seeds(data)
for seed in seed_list:
bad_seed = False
current_center = seed
# 进行一次独立的均值漂移
while True:
next_center = self.shift_center(current_center, data)
if next_center is None:
bad_seed = True
break
delta_dis = np.linalg.norm(next_center - current_center, 2)
if delta_dis < self.epsilon:
break
current_center = next_center
if not bad_seed:
# 若该次漂移结束后,最终的质心与已存在的质心距离小于带宽,则合并
for i in range(len(self.centers)):
if np.linalg.norm(current_center - self.centers[i], 2) < self.band_width:
break
else:
self.centers.append(current_center)
self.classify(data)
return

#========================================================
# 主程序
#========================================================

if __name__ == '__main__':

data, label = make_blobs(n_samples=500, centers=5, cluster_std=1.2, random_state=7)
MS = MeanShift(band_width=3, min_fre=3, bin_size=4, bin_seeding=True)
MS.fit(data)
labels = MS.labels

# print(MS.centers, np.unique(labels))

visualize(data, labels)
0%