半监督学习算法(2)-- 分类

半监督分类是在无类标签的样例的帮助下训练有类标签的样本,获得比只用有类标签的样本训练得到的分类器性能更优的分类器,弥补有类标签的样本不足的缺陷。

一、半监督支持向量机

半监督支持向量机(Semi-Supervised Support Vector Machine,简称 S3VM)是支持向量机在半监督学习上的推广。在不考虑未标记样本时,支持向量机试图找到最大间隔划分超平面,而在考虑未标记样本后,S3VM试图找到能将两类有标记样本分开,且穿过数据低密度区域的划分超平面。如下图所示,这里的基本假设是”低密度分隔”(low-density separation)

半监督支持向量机中最著名的是TSVM(Transductive Support Vector Machine)。TSVM试图考虑对未标记样本进行各种可能的标记指派,然后从中找出在所有标记和未标记样本上间隔最大化的划分超平面。
TSVM采用局部搜索的策略来进行迭代求解,即首先使用有标记样本集训练出一个初始SVM,接着使用该学习器对未标记样本进行打标,这样所有样本都有了标记,并基于这些有标记的样本重新训练SVM,之后再寻找易出错样本不断调整。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# coding:utf-8
import random
import numpy as np
import sklearn.svm as svm
from sklearn.datasets.samples_generator import make_classification
from sklearn.externals import joblib
import warnings; warnings.filterwarnings(action='ignore')

class TSVM(object):
def __init__(self, kernel='linear'):
self.Cl, self.Cu = 1.5, 0.001
self.kernel = kernel
self.clf = svm.SVC(C=1.5, kernel=self.kernel)

def train(self, X1, Y1, X2):
N = len(X1) + len(X2)
# 样本权值初始化
sample_weight = np.ones(N)
sample_weight[len(X1):] = self.Cu

# 用已标注部分训练出一个初始SVM
self.clf.fit(X1, Y1)

# 对未标记样本进行标记
Y2 = self.clf.predict(X2)
Y2 = Y2.reshape(-1,1)

X = np.vstack([X1, X2])
Y = np.vstack([Y1, Y2])

# 未标记样本的序号
Y2_id = np.arange(len(X2))

while self.Cu < self.Cl:
# 重新训练SVM, 之后再寻找易出错样本不断调整
self.clf.fit(X, Y, sample_weight=sample_weight)
while True:
Y2_decision = self.clf.decision_function(X2) # 参数实例到决策超平面的距离
Y2 = Y2.reshape(-1)
epsilon = 1 - Y2 * Y2_decision
negative_max_id = Y2_id[epsilon==min(epsilon)]

print(epsilon[negative_max_id][0])
if epsilon[negative_max_id][0] > 0:
# 寻找很可能错误的未标记样本,改变它的标记成其他标记
pool = list(set(np.unique(Y1))-set(Y2[negative_max_id]))
Y2[negative_max_id] = random.choice(pool)
Y2 = Y2.reshape(-1, 1)
Y = np.vstack([Y1, Y2])

self.clf.fit(X, Y, sample_weight=sample_weight)
else:
break
self.Cu = min(2*self.Cu, self.Cl)
sample_weight[len(X1):] = self.Cu

def score(self, X, Y):
return self.clf.score(X, Y)

def predict(self, X):
return self.clf.predict(X)

def save(self, path='./TSVM.model'):
joblib.dump(self.clf, path)

def load(self, model_path='./TSVM.model'):
self.clf = joblib.load(model_path)

if __name__ == '__main__':
features, labels = make_classification(n_samples=200, n_features=3, n_redundant=1, n_repeated=0, n_informative=2, n_clusters_per_class=2)
n_given = 70
# 取前n_given个数字作为标注集
X1 = np.copy(features)[:n_given]
X2 = np.copy(features)[n_given:]
Y1 = np.array(np.copy(labels)[:n_given]).reshape(-1,1)
Y2_labeled = np.array(np.copy(labels)[n_given:]).reshape(-1,1)

model = TSVM()
model.train(X1, Y1, X2)

# Y2_hat = model.predict(X2)
accuracy = model.score(X2, Y2_labeled)
print(accuracy)

二、标签传播算法

图论半监督学习需要首先构建一个图,图的节点集就是所有样本集( 包括标记样本和无标记 样本),图的边是样本两两间的相似性,然后把分类问题看作是类别信息在图上由标记节点向无标记节点的扩散或传播过程。
基于图的半监督学习核心思想就是给相似顶点尽可能赋予相同的标记,使得图的标记尽可能地平滑

标签传播算法(Label Propagation Algorithm)是图论半监督学习的主要代表,基本思路是从已标记的节点的标签信息来预测未标记的节点的标签信息,利用样本间的关系,建立完全图模型。
每个节点标签按相似度传播给相邻节点,在节点传播的每一步,每个节点根据相邻节点的标签来更新自己的标签,与该节点相似度越大,其相邻节点对其标注的影响权值越大,相似节点的标签越趋于一致,其标签就越容易传播。在标签传播过程中,保持已标记的数据的标签不变,使其将标签传给未标注的数据。最终当迭代结束时,相似节点的概率分布趋于相似,可以划分到一类中。

1、sklearn版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# coding:utf-8
import numpy as np
import matplotlib.pyplot as plt
from sklearn.semi_supervised import LabelPropagation
from sklearn.datasets.samples_generator import make_classification
from sklearn.externals import joblib

class LP(object):
def __init__(self, kernel='rbf'):
self.kernel = kernel

def train(self, X, Y_train):
self.clf = LabelPropagation(max_iter=100, kernel=self.kernel, gamma=0.1)
self.clf.fit(X, Y_train)

def score(self, X, Y):
return self.clf.score(X, Y)

def predict(self, X):
return self.clf.predict(X)

def save(self, path='./LP.model'):
joblib.dump(self.clf, path)

def load(self, model_path='./LP.model'):
self.clf = joblib.load(model_path)

if __name__ == '__main__':
features, labels = make_classification(n_samples=200, n_features=3, n_redundant=1, n_repeated=0, n_informative=2, n_clusters_per_class=2)
n_given = 70
# 取前n_given个数字作为标注集
index = np.arange(len(features))
X = features[index]
Y = labels[index]

unlabeled_index = np.arange(len(Y))[n_given:]
Y_train = np.copy(Y)
Y_train[unlabeled_index] = -1

LP = LP()
LP.train(X, Y_train)
print(LP.predict(X[unlabeled_index]))
print(LP.score(X[unlabeled_index], Y[unlabeled_index]))

2、源码版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# coding:utf-8
import numpy as np
import matplotlib.pyplot as plt
from sklearn.datasets.samples_generator import make_classification

class LP(object):
def __init__(self, kernel='rbf'):
self.kernel = kernel

def buildGraph(self, X, rbf_sigma):
Graph = np.zeros((len(X), len(X)), np.float32)
for i in range(len(X)):
row_sum = 0.0
for j in range(len(X)):
diff = X[i, :] - X[j, :]
Graph[i][j] = np.exp(sum(diff**2) / (-2.0 * rbf_sigma **2))
row_sum += Graph[i][j]
Graph[i][:] /= row_sum

return Graph

def main(self, X1, X2, Y1, rbf_sigma=1.5, max_iter = 500):
N = len(X1) + len(X2)
num_classes = len(np.unique(Y1))

X = np.vstack((X1, X2))

label_known = np.zeros((len(X1), num_classes), np.float32)
for i in range(len(X1)):
label_known[i][Y1[i]] = 1.0

label_function = np.zeros((N, num_classes), np.float32)
label_function[:len(X1)] = label_known
label_function[len(X1):] = -1.0

# 构建图
Graph = self.buildGraph(X, rbf_sigma)

# 开始传播
iter = 0
pre_label_function = np.zeros((N, num_classes), np.float32)
changed = np.abs(pre_label_function - label_function).sum()
while iter < max_iter and changed > 1e-3:
if iter % 1 == 0:
print("---> Iteration %d/%d, changed: %f" % (iter, max_iter, changed))
pre_label_function = label_function
iter += 1

# 传播
label_function = np.dot(Graph, label_function)

label_function[:len(X1)] = label_known
changed = np.abs(pre_label_function - label_function).sum()
unlabel_data_labels = np.zeros(len(X2))

for i in range(len(X2)):
unlabel_data_labels[i] = np.argmax(label_function[i+len(X1)])
return unlabel_data_labels

if __name__ == '__main__':
features, labels = make_classification(n_samples=200, n_features=3, n_redundant=1, n_repeated=0, n_informative=2, n_clusters_per_class=2)
n_given = 100
# 取前n_given个数字作为标注集
index = np.arange(len(features))
X = features[index]
Y = labels[index]

X1 = np.copy(X)[:n_given]
X2 = np.copy(X)[n_given:]
Y1 = np.copy(Y)[:n_given]

LP = LP()
print(LP.main(X1, X2, Y1))
print(np.copy(Y)[n_given:])
0%