非监督学习之聚类算法(5)-- 基于模型

基于模型的方法(Model-based methods)主要是指基于概率模型的方法和基于神经网络模型的方法。前者的代表是高斯混合模型(GMM,Gaussian Mixture Models),后者的代表是自组织映射网络(SOM,Self Organized Maps)。

高斯混合模型

1、高斯分布

关于高斯分布有一个经典例子,“假设知道全班男生的身高,要统计身高大于1米8的概率”,也就是,统计全班男生的身高分布。不难发现一个现象,在数据空间中某个值出现的概率都比较高,越偏离这个值,样本出现的概率越少。比如全班男生的身高,分布在1.7m左右的居多,越矮的越少,更高的也不多。
现实生活中很多数据的分布都符合这样的规律,样本集中在某个区段,越偏离这个区段,样本分布越少,比如房子的价格、图片背景的像素,在数学概念里,它们都符合中心极限定理。
高斯在1733年发现了这种分布,并且给这种分布命名为”正态分布”,后人也称其为”高斯分布”。

现在的问题变成了,已知一堆样本数据并且可以假设这个样本空间符合高斯分布,要解的是,它是符合什么样参数(μ和σ)的正态分布。

2、高斯混合模型

高斯混合模型,英文全称:​​Gaussian mixture model,简称GMM。高斯混合模型就是用高斯概率密度函数(二维时也称为:正态分布曲线)精确的量化事物,将一个事物分解为若干基于高斯概率密度函数行程的模型。这句话看起来有些深奥,这样去理解,事物的数学表现形式就是曲线,其意思就是任何一个曲线,无论多么复杂,我们都可以用若干个高斯曲线来无限逼近它,这就是高斯混合模型的基本思想。

换一种方式理解,曲线是模拟一组数据的结果,而这些数据分布情况如下图所示。那么此时GMM模拟出的曲线就有了现实的意义,这时就可以用构造好的GMM模型来表达这些数据,相比于存储数据,使用GMM中的参数来表达数据要方便简单的多,并且是数学上有完整的表达式。

增加数据维度,得到更为复杂一点的结果如下所示​,这也是我们经常看到GMM情况。

在二维的情况下,一个复杂的曲线可以用若干个组合起来的高斯函数​来逼近。
在三维的情况下,同样的可理解为任何一个曲面都可以用高斯函数来逼近。
推广到在N维的情况下,任何一个模型都可以用高斯函数来逼近。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
'''
高斯混合模型+EM算法
以alpha(k)的概率选择第k个高斯模型,再以该高斯模型概率分布产生数据。其中alpha(k)就是隐变量
'''
import numpy as np
import math
import copy
from collections import defaultdict
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
from itertools import cycle
colors = cycle('grbk')

class GMM:
def __init__(self, maxstep=1000, epsilon=1e-3, K=4):
self.maxstep = maxstep
self.epsilon = epsilon
self.K = K # 混合模型中的分模型的个数

self.alpha = None # 每个分模型前系数
self.mu = None # 每个分模型的均值向量
self.sigma = None # 每个分模型的协方差
self.gamma_all_final = None # 存储最终的每个样本对分模型的响应度,用于最终的聚类

self.D = None # 输入数据的维度
self.N = None # 输入数据总量

def inin_param(self, data):
# 初始化参数
self.D = data.shape[1]
self.N = data.shape[0]
self.init_param_helper(data)
return

def init_param_helper(self, data):
# KMeans初始化模型参数
KMEANS = KMeans(n_clusters=self.K).fit(data)
clusters = defaultdict(list)
for ind, label in enumerate(KMEANS.labels_):
clusters[label].append(ind)
mu = []
alpha = []
sigma = []
for inds in clusters.values():
partial_data = data[inds]
mu.append(partial_data.mean(axis=0)) # 分模型的均值向量
alpha.append(len(inds) / self.N) # 权重
sigma.append(np.cov(partial_data.T)) # 协方差,D个维度间的协方差
self.mu = np.array(mu)
self.alpha = np.array(alpha)
self.sigma = np.array(sigma)
return

def _phi(self, y, mu, sigma):
# 获取分模型的概率
s1 = 1.0 / math.sqrt(np.linalg.det(sigma))
s2 = np.linalg.inv(sigma) # d*d
delta = np.array([y - mu]) # 1*d
return s1 * math.exp(-1.0 / 2 * delta @ s2 @ delta.T)

def fit(self, data):
# 迭代训练
self.inin_param(data)
step = 0
gamma_all_arr = None
while step < self.maxstep:
step += 1
old_alpha = copy.copy(self.alpha)
# E步
gamma_all = []
for j in range(self.N):
gamma_j = [] # 依次求每个样本对K个分模型的响应度

for k in range(self.K):
gamma_j.append(self.alpha[k] * self._phi(data[j], self.mu[k], self.sigma[k]))

s = sum(gamma_j)
gamma_j = [item/s for item in gamma_j]
gamma_all.append(gamma_j)

gamma_all_arr = np.array(gamma_all)
# M步
for k in range(self.K):
gamma_k = gamma_all_arr[:, k]
SUM = np.sum(gamma_k)
# 更新权重
self.alpha[k] = SUM / self.N # 更新权重
# 更新均值向量
new_mu = sum([gamma * y for gamma, y in zip(gamma_k, data)]) / SUM # 1*d
self.mu[k] = new_mu
# 更新协方差阵
delta_ = data - new_mu # n*d
self.sigma[k] = sum([gamma * (np.outer(np.transpose([delta]), delta)) for gamma, delta in zip(gamma_k, delta_)]) / SUM # d*d
alpha_delta = self.alpha - old_alpha
if np.linalg.norm(alpha_delta, 1) < self.epsilon:
break
self.gamma_all_final = gamma_all_arr
return

def predict(self):
cluster = defaultdict(list)
for j in range(self.N):
max_ind = np.argmax(self.gamma_all_final[j])
cluster[max_ind].append(j)
return cluster

if __name__ == '__main__':

def generate_data(N=500):
X = np.zeros((N, 2)) # N*2, 初始化X
mu = np.array([[5, 35], [20, 40], [20, 35], [45, 15]])
sigma = np.array([[30, 0], [0, 25]])
for i in range(N): # alpha_list=[0.3, 0.2, 0.3, 0.2]
prob = np.random.random(1)
if prob < 0.1: # 生成0-1之间随机数
X[i, :] = np.random.multivariate_normal(mu[0], sigma, 1) # 用第一个高斯模型生成2维数据
elif 0.1 <= prob < 0.3:
X[i, :] = np.random.multivariate_normal(mu[1], sigma, 1) # 用第二个高斯模型生成2维数据
elif 0.3 <= prob < 0.6:
X[i, :] = np.random.multivariate_normal(mu[2], sigma, 1) # 用第三个高斯模型生成2维数据
else:
X[i, :] = np.random.multivariate_normal(mu[3], sigma, 1) # 用第四个高斯模型生成2维数据
return X

data = generate_data()
gem = GMM()
gem.fit(data)
# print(gem.alpha, '\n', gem.sigma, '\n', gem.mu)
cluster = gem.predict()

for color, inds in zip(colors, cluster.values()):
partial_data = data[inds]
plt.scatter(partial_data[:,0], partial_data[:, 1], edgecolors=color)
plt.show()
0%