1. 程式人生 > >聚類——標籤傳播演算法以及Python實現

聚類——標籤傳播演算法以及Python實現

標籤傳播演算法(label propagation)是典型的半監督聚類演算法。半監督是指訓練資料集中小部分樣本點已知標籤,大部分樣本點未知標籤。

核心思想

相似性較大的樣本點間應該具有相同的標籤,將已知標籤通過相似性矩陣傳播到未知的標籤。

演算法簡介

基本概念

轉化矩陣:用來更新標籤,實質就是度量樣本點間相似性程度的矩陣(圖的邊的權重)。

Yi+1TYi
通常使用高斯徑向基以及k近鄰方法度量。
高斯徑向基計算兩樣本點間權重:wij=e(xixj)2σ2採用徑向基時,σ的影響非常大,且不好設定。一種啟發式的方法就是找到距離最近(
d0
)的兩個不同標籤的樣本點(具體怎麼找可以使用Kruskal最小生成樹演算法),設定σ>3d0/2.因為這樣滿足3σ準則,不同標籤樣本點間權重幾乎為0。
k近鄰方法計算兩樣本點間權重:(1)wij={1KjNi0jNi其中,Ni為樣本i的K近鄰節點集合。
clamp:標籤矩陣更新過程中,原資料集中已知標籤樣本點的標籤不能改變,故需要將其”夾住”不讓其改變。

演算法流程

  • Input: 訓練資料集X_data,y_data(未知標籤的設為-1), 閾值epsilon, 最大迭代次數maxstep, 權值計算函式即相應引數
  • Output: 樣本標籤陣列
  • Step1: 計算轉換矩陣T。
  • Step2: 更新標籤,然後clamp。若標籤矩陣更新前後差異小於epsilon或超出迭代超出maxstep則終止。

程式碼

"""
標籤傳播聚類演算法, 典型的半監督學習演算法
核心思想:相似的資料應該具有相同的標籤,構建節點間的相似性矩陣(邊的權重)
"""
import numpy as np


class LablePropagation:
    def __init__(self, epsilon=1e-3, maxstep=500, kernel_option='rbf', sigma=1.0, k=10):
        self.epsilon = epsilon
        self.maxstep = maxstep
        self.kernel_option = kernel_option
        self.sigma = sigma  # rbf 核函式的引數
self.k = k # knn 核函式引數 self.T = None # 未標記點間的轉換矩陣 self.Y = None # 標籤數矩陣 self.Y_clamp = None # 已知標籤資料點的標籤矩陣 self.N = None self.labeled_inds = None # 已知標籤樣本的索引 self.labels = None def init_param(self, X_data, y_data): # 初始化引數 self.N = X_data.shape[0] self.labeled_inds = np.where(y_data >= 0)[0] # 未知標籤設為-1 n_class = len(np.unique(y_data[self.labeled_inds])) self.Y = np.zeros((self.N, n_class)) for i in self.labeled_inds: self.Y[i][int(y_data[i])] = 1.0 # 啞編碼,對應標籤設為1 self.Y_clamp = self.Y[self.labeled_inds] # n*l self.T = self.cal_tran_mat(X_data) # n*n return def cal_dis2(self, node1, node2): # 計算節點間的歐式距離平方 return (node1 - node2) @ (node1 - node2) def cal_tran_mat(self, data): # 計算轉換矩陣, 即構建圖 dis_mat = np.zeros((self.N, self.N)) for i in range(self.N): for j in range(i + 1, self.N): dis_mat[i, j] = self.cal_dis2(data[i], data[j]) dis_mat[j, i] = dis_mat[i, j] if self.kernel_option == 'rbf': assert (self.sigma is not None) T = np.exp(-dis_mat / self.sigma ** 2) normalizer = T.sum(axis=0) T = T / normalizer elif self.kernel_option == 'knn': assert (self.k is not None) T = np.zeros((self.N, self.N)) for i in range(self.N): inds = np.argpartition(dis_mat[i], self.k + 1)[:self.k + 1] T[i][inds] = 1.0 / self.k # 最近的k個擁有相同的權重 T[i][i] = 0 else: raise ValueError('kernel is not supported') return T def fit(self, X_data, y_data): # 訓練主函式 self.init_param(X_data, y_data) step = 0 while step < self.maxstep: step += 1 new_Y = self.T @ self.Y # 更新標籤矩陣 new_Y[self.labeled_inds] = self.Y_clamp # clamp if np.abs(new_Y - self.Y).sum() < self.epsilon: break self.Y = new_Y self.labels = np.argmax(self.Y, axis=1) return if __name__ == '__main__': from sklearn.datasets import make_circles n_samples = 100 X, y = make_circles(n_samples=n_samples, shuffle=False) outer, inner = 0, 1 labels = -np.ones(n_samples) labels[0] = outer labels[-1] = inner LPA = LablePropagation(maxstep=1000, kernel_option='knn', k=2, sigma=0.07) LPA.fit(X, labels) labels = LPA.labels import matplotlib.pyplot as plt def visualize(data, labels): color = 'bg' unique_label = np.unique(labels) for col, label in zip(color, unique_label): partial_data = data[np.where(labels == label)] plt.scatter(partial_data[:, 0], partial_data[:, 1], color=col, alpha=1) plt.scatter(data[0, 0], data[0, 1], color='b', marker='*', s=200, alpha=0.5) # outer plt.scatter(data[-1, 0], data[-1, 1], color='g', marker='*', s=200, alpha=0.5) # inner plt.show() return visualize(X, labels)

我的GitHub
注:如有不當之處,請指正。