Canopy聚類演算法與Mahout中的實現
前面提到的kmeans 演算法需要提前設定簇的個數,我們也可以根據資料進行簡單簇數目估計,但是有一類稱為近似聚類演算法技術可以根據給定資料集估計簇的數量以及近似的中心位置,其中有一個典型演算法就是canopy生成演算法。
Mahout中kmeans 演算法實現使用RandomSeedGenerator類生成包含k個向量的SequenceFile。儘管隨機中心生成速度很快,但是無法保證為k個簇估計出好的中心。中心估計極大的影響著kmeans演算法的執行時間。好的估計有助於演算法更快的收斂,對資料遍歷次數會更少。
canopy生成演算法稱為canopy聚類,是一種快速近似的聚類技術。它將輸入資料點劃分為一些重疊簇,稱為canopy。在上下文中canopy指一組相近的點,或一個簇。canopy聚類基於兩個距離閾值試圖估計出可能的簇中心(或canopy中心)。
canopy聚類優勢在於它得到簇的速度非常快,只需遍歷一次資料即可獲得結果。這個優勢也是它的弱點。該演算法無法給出精準的簇結果。但是它可以給出最優的簇數量,不需要像kmeans預先指定簇數量k。
演算法流程:使用一個快速的距離測度兩個距離閾值(T1和T2,其中T1>T2。它從一個包含一些點資料集和空的canopy列表開始,然後迭代這些資料,並在迭代過程中生成canopy。在每一輪迭代中,它從資料集中移除一個點並將一個以該點為中心的canopy加入列表。然後遍歷資料集中剩下的資料點。對每一個點,它會計算其到列表中每個canopy的中心距離。如果距離均小於T1,則將該點加入該canopy。如果距離小於T2,則將其移除資料集,以免接下來的迴圈中用它建立新的canopy。重複上述過程直到資料集為空)
這種方法防止了緊鄰一個現有canopy的點(距離小於T2)稱為新的canopy中心。
網上抓了一個演算法流程圖:
canopy演算法在Mahout中通過CanopyClusterer或CanopyDriver 類來實現,前者是基於in-memory演算法進行聚類,後者是基於mapreduce 方法進行演算法實現。這些實現既可以讀寫磁碟上的資料,也可以執行在hadoop叢集上讀寫HDFS的資料。
首先定義了一個描述canopy的類。
package org.apache.mahout.clustering.canopy; import org.apache.mahout.clustering.iterator.DistanceMeasureCluster; import org.apache.mahout.common.distance.DistanceMeasure; import org.apache.mahout.math.Vector; /** * This class models a canopy as a center point, the number of points that are contained within it according * to the application of some distance metric, and a point total which is the sum of all the points and is * used to compute the centroid when needed. */ public class Canopy extends DistanceMeasureCluster { /** Used for deserialization as a writable */ public Canopy() { } /** * Create a new Canopy containing the given point and canopyId * * @param center a point in vector space * @param canopyId an int identifying the canopy local to this process only * @param measure a DistanceMeasure to use */ public Canopy(Vector center, int canopyId, DistanceMeasure measure) { super(center, canopyId, measure); observe(center); } public String asFormatString() { return "C" + this.getId() + ": " + this.computeCentroid().asFormatString(); } @Override public String toString() { return getIdentifier() + ": " + getCenter().asFormatString(); } @Override public String getIdentifier() { return "C-" + getId(); } }
CanopyClusterer中演算法核心部分的實現
/**
* Iterate through the points, adding new canopies. Return the canopies.
*
* @param points
* a list<Vector> defining the points to be clustered
* @param measure
* a DistanceMeasure to use
* @param t1
* the T1 distance threshold
* @param t2
* the T2 distance threshold
* @return the List<Canopy> created
*/
public static List<Canopy> createCanopies(List<Vector> points,
DistanceMeasure measure,
double t1,
double t2) {
List<Canopy> canopies = Lists.newArrayList();
/**
* Reference Implementation: Given a distance metric, one can create
* canopies as follows: Start with a list of the data points in any
* order, and with two distance thresholds, T1 and T2, where T1 > T2.
* (These thresholds can be set by the user, or selected by
* cross-validation.) Pick a point on the list and measure its distance
* to all other points. Put all points that are within distance
* threshold T1 into a canopy. Remove from the list all points that are
* within distance threshold T2. Repeat until the list is empty.
*/
int nextCanopyId = 0;
while (!points.isEmpty()) {
Iterator<Vector> ptIter = points.iterator();
Vector p1 = ptIter.next();
ptIter.remove();
Canopy canopy = new Canopy(p1, nextCanopyId++, measure);
canopies.add(canopy);
while (ptIter.hasNext()) {
Vector p2 = ptIter.next();
double dist = measure.distance(p1, p2);
// Put all points that are within distance threshold T1 into the
// canopy
if (dist < t1) {
canopy.observe(p2);
}
// Remove from the list all points that are within distance
// threshold T2
if (dist < t2) {
ptIter.remove();
}
}
for (Canopy c : canopies) {
c.computeParameters();
}
}
return canopies;
}
程式碼中發現最關鍵就是canopy兩個操作:observe和computerParameters方法。
canopy類前面知道 通過多層extend 自Model介面,該介面如下:
package org.apache.mahout.clustering;
import org.apache.hadoop.io.Writable;
import org.apache.mahout.math.VectorWritable;
/**
* A model is a probability distribution over observed data points and allows
* the probability of any data point to be computed. All Models have a
* persistent representation and extend
* WritablesampleFromPosterior(Model<VectorWritable>[])
*/
public interface Model<O> extends Writable {
/**
* Return the probability that the observation is described by this model
*
* @param x
* an Observation from the posterior
* @return the probability that x is in the receiver
*/
double pdf(O x);
/**
* Observe the given observation, retaining information about it
*
* @param x
* an Observation from the posterior
*/
void observe(O x);
/**
* Observe the given observation, retaining information about it
*
* @param x
* an Observation from the posterior
* @param weight
* a double weighting factor
*/
void observe(O x, double weight);
/**
* Observe the given model, retaining information about its observations
*
* @param x
* a Model<0>
*/
void observe(Model<O> x);
/**
* Compute a new set of posterior parameters based upon the Observations that
* have been observed since my creation
*/
void computeParameters();
/**
* Return the number of observations that this model has seen since its
* parameters were last computed
*
* @return a long
*/
long getNumObservations();
/**
* Return the number of observations that this model has seen over its
* lifetime
*
* @return a long
*/
long getTotalObservations();
/**
* @return a sample of my posterior model
*/
Model<VectorWritable> sampleFromPosterior();
}
而具體到AbstractCluster類中是這樣實現:
public void observe(Vector x) {
setS0(getS0() + 1);
if (getS1() == null) {
setS1(x.clone());
} else {
getS1().assign(x, Functions.PLUS);
}
Vector x2 = x.times(x);
if (getS2() == null) {
setS2(x2);
} else {
getS2().assign(x2, Functions.PLUS);
}
}
@Override
public void computeParameters() {
if (getS0() == 0) {
return;
}
setNumObservations((long) getS0());
setTotalObservations(getTotalObservations() + getNumObservations());
setCenter(getS1().divide(getS0()));
// compute the component stds
if (getS0() > 1) {
setRadius(getS2().times(getS0()).minus(getS1().times(getS1())).assign(new SquareRootFunction()).divide(getS0()));
}
setS0(0);
setS1(center.like());
setS2(center.like());
}
就是把簇的多個數學統計量進行儲存更新(主要是均值和方差)。Canopy聚類不要求指定簇個數,中心個數主要依賴於距離度量T1和T2的選擇。如果資料集很大無法裝入memory時,這時需要mapreduce框架進行執行,mapreduce實現使用了近似估算,所以對於同一個資料集來說,生成結果與in-memory的結果有細微差別。但是當資料集很大時這點區別可以忽略。Canopy聚類輸出的Canopy中心很適合用來作為kmeans演算法起始點,因為初始中心點準確率比隨機選擇要高,所以能夠改善聚類結果並且加快演算法速度。
Canopy聚類是一種很好的近似聚類技術,但是它有記憶體限制。如果距離閾值很接近,就會產生過多的Canopy中心,這樣可能會超出記憶體範圍。在實際當中演算法應用時需要調優引數以適應資料集和聚類問題。
mapreduce實現中:
每個mapper處理其相應的資料,在這裡處理的意思是使用Canopy演算法來對所有的資料進行遍歷,得到canopy。具體如下:首先隨機取出一個樣本向量作為一個canopy的中心向量,然後遍歷樣本資料向量集,若樣本資料向量和隨機樣本向量的距離小於T1,則把該樣本資料向量歸入此canopy中,若距離小於T2,則把該樣本資料從原始樣本資料向量集中去除,直到整個樣本資料向量集為空為止,輸出所有的canopy的中心向量。reducer呼叫Reduce過程處理Map過程的輸出,即整合所有Map過程產生的canopy的中心向量,生成新的canopy的中心向量,即最終的結果。