1. 程式人生 > >資料探勘筆記-聚類-Canopy-並行處理分析

資料探勘筆記-聚類-Canopy-並行處理分析

Canopy並行化處理在Mahout裡面有很好的實現,網上有很多人都做過相關的分析,有的寫的很詳細,本來只想看看Mahout Canopy原始碼就好了,但還是覺得自己記錄下也好。我看的是mahout-distribution-0.9版本。 首先先看下CanopyDriver類: run(String[] args)方法裡面是一些引數的設定。
public static void run(Configuration conf, Path input, Path output, DistanceMeasure measure, double t1, double t2, double t3, double t4, 
      int clusterFilter, boolean runClustering, double clusterClassificationThreshold, boolean runSequential)
    throws IOException, InterruptedException, ClassNotFoundException {
    Path clustersOut = buildClusters(conf, input, output, measure, t1, t2, t3,
        t4, clusterFilter, runSequential);
    if (runClustering) {
      clusterData(conf, input, clustersOut, output, clusterClassificationThreshold, runSequential);
    }
  }
buildClusters:構建一個Canopy聚類的目錄,根據runSequential引數來決定是通過什麼方式來構建。可以是順序單機序列化執行,也可以是基於MapReduce執行。 clusterData:根據buildClusters產生的Canopy聚類的目錄去聚類資料。它的執行過程主要是在ClusterClassificationDriver這個類裡面去執行的。
public static Path buildClusters(Configuration conf, Path input, Path output,
      DistanceMeasure measure, double t1, double t2, double t3, double t4,
      int clusterFilter, boolean runSequential) throws IOException,
      InterruptedException, ClassNotFoundException {
    log.info("Build Clusters Input: {} Out: {} Measure: {} t1: {} t2: {}",
             input, output, measure, t1, t2);
    if (runSequential) {
      return buildClustersSeq(input, output, measure, t1, t2, clusterFilter);
    } else {
      return buildClustersMR(conf, input, output, measure, t1, t2, t3, t4,
          clusterFilter);
    }
  }
buildClustersSeq:實際就是順序執行單機版本的Canopy演算法,最後將Canopy聚類序列化寫入到HDFS上。 buildClustersMR:Canopy並行處理核心,基於Hadoop MapReduce的處理。主要涉及有兩個類CanopyMapper、CanopyReducer。 接著看CanopyMapper類:
protected void setup(Context context) throws IOException,
      InterruptedException {
    super.setup(context);
    canopyClusterer = new CanopyClusterer(context.getConfiguration());
    clusterFilter = Integer.parseInt(context.getConfiguration().get(
        CanopyConfigKeys.CF_KEY));
  }
setup:主要是做一些引數的初始化工作。
protected void map(WritableComparable<?> key, VectorWritable point,
      Context context) throws IOException, InterruptedException {
    canopyClusterer.addPointToCanopies(point.get(), canopies);
  }
map:是對每一個點向量做Canopy演算法,將其歸入相應的Canopy中。
protected void cleanup(Context context) throws IOException,
      InterruptedException {
    for (Canopy canopy : canopies) {
      canopy.computeParameters();
      if (canopy.getNumObservations() > clusterFilter) {
        context.write(new Text("centroid"), new VectorWritable(canopy.getCenter()));
      }
    }
    super.cleanup(context);
  }
cleanup:遍歷所有Canopy,呼叫computeParameters去計算並更新相關引數,然後把符合條件的Canopy寫入。 下面看下幾個核心的方法:
public void addPointToCanopies(Vector point, Collection<Canopy> canopies) {
    boolean pointStronglyBound = false;
    for (Canopy canopy : canopies) {
      double dist = measure.distance(canopy.getCenter().getLengthSquared(), canopy.getCenter(), point);
      if (dist < t1) {
        if (log.isDebugEnabled()) {
          log.debug("Added point: {} to canopy: {}", AbstractCluster.formatVector(point, null), canopy.getIdentifier());
        }
        canopy.observe(point);
      }
      pointStronglyBound = pointStronglyBound || dist < t2;
    }
    if (!pointStronglyBound) {
      if (log.isDebugEnabled()) {
        log.debug("Created new Canopy:{} at center:{}", nextCanopyId, AbstractCluster.formatVector(point, null));
      }
      canopies.add(new Canopy(point, nextCanopyId++, measure));
    }
  }
計算點到每個Canopy的距離,如果小於T1,則將其加入到相應的Canopy中,同時更新相關引數S0、S1、S2,如果距離小於T2,pointStronglyBound為true,則不在往下走了,反之則新增一個Canopy。即點到所有Canopy的距離都大於T2或者等於T2的都作為新的一個Canopy。
public void observe(Vector x) {
    setS0(getS0() + 1);
    if (getS1() == null) {
      setS1(x.clone());
    } else {
      getS1().assign(x, Functions.PLUS);
    }
    Vector x2 = x.times(x);
    if (getS2() == null) {
      setS2(x2);
    } else {
      getS2().assign(x2, Functions.PLUS);
    }
  }
Canopy每次新增一個點都去更新相關的引數。S0:表示Canopy包含點的權重之和。S1:表示各點的加權和。S2:表示各點平方的加權和。
public void computeParameters() {
    if (getS0() == 0) {
      return;
    }
    setNumObservations((long) getS0());
    setTotalObservations(getTotalObservations() + getNumObservations());
    setCenter(getS1().divide(getS0()));
    // compute the component stds
    if (getS0() > 1) {
      setRadius(getS2().times(getS0()).minus(getS1().times(getS1())).assign(new SquareRootFunction()).divide(getS0()));
    }
    setS0(0);
    setS1(center.like());
    setS2(center.like());
  }
computeParameters:主要是計算中心點與半徑,同時也更新了S0、S1、S2。 下面接著看CanopyReducer類:
protected void setup(Context context) throws IOException,
      InterruptedException {
    super.setup(context);
    canopyClusterer = new CanopyClusterer(context.getConfiguration());
    canopyClusterer.useT3T4();
    clusterFilter = Integer.parseInt(context.getConfiguration().get(
        CanopyConfigKeys.CF_KEY));
  }
setup:同Mapper一樣是引數的初始化,但注意這裡useT3T4(),用的是T3、T4,而Mapper用的是T1、T2,兩者可以不同。
protected void reduce(Text arg0, Iterable<VectorWritable> values,
      Context context) throws IOException, InterruptedException {
    for (VectorWritable value : values) {
      Vector point = value.get();
      canopyClusterer.addPointToCanopies(point, canopies);
    }
    for (Canopy canopy : canopies) {
      canopy.computeParameters();
      if (canopy.getNumObservations() > clusterFilter) {
        ClusterWritable clusterWritable = new ClusterWritable();
        clusterWritable.setValue(canopy);
        context.write(new Text(canopy.getIdentifier()), clusterWritable);
      }
    }
  }
reduce:迭代點向量集合,將其歸入相應的Canopy聚類中。然後有遍歷所有的Canopy聚類,將符合條件的全域性Canopy寫入到序列化檔案中。 這樣Canopy聚類的MR過程就完成了。下面看下之後的方法:
private static void clusterData(Configuration conf,
                                  Path points,
                                  Path canopies,
                                  Path output,
                                  double clusterClassificationThreshold,
                                  boolean runSequential)
    throws IOException, InterruptedException, ClassNotFoundException {
    ClusterClassifier.writePolicy(new CanopyClusteringPolicy(), canopies);
    ClusterClassificationDriver.run(conf, points, output, new Path(output, PathDirectory.CLUSTERED_POINTS_DIRECTORY),
                                    clusterClassificationThreshold, true, runSequential);
  }
writePolicy:是將Canopy演算法中的T1、T2序列化寫入到HDFS中 ClusterClassificationDriver啟動另外一個Job來進行聚類資料。
public static void run(Configuration conf, Path input, Path clusteringOutputPath, Path output,
      double clusterClassificationThreshold, boolean emitMostLikely, boolean runSequential) throws IOException,
      InterruptedException, ClassNotFoundException {
    if (runSequential) {
      classifyClusterSeq(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
    } else {
      classifyClusterMR(conf, input, clusteringOutputPath, output, clusterClassificationThreshold, emitMostLikely);
    }
   
  }
同樣是分為順序單機序列化版本與MapReduce版本,這裡只看下MR版本。裡面只有一個Mapper,沒有Reducer. 接著看ClusterClassificationMapper類:
protected void setup(Context context) throws IOException, InterruptedException {
    super.setup(context);
   
    Configuration conf = context.getConfiguration();
    String clustersIn = conf.get(ClusterClassificationConfigKeys.CLUSTERS_IN);
    threshold = conf.getFloat(ClusterClassificationConfigKeys.OUTLIER_REMOVAL_THRESHOLD, 0.0f);
    emitMostLikely = conf.getBoolean(ClusterClassificationConfigKeys.EMIT_MOST_LIKELY, false);
   
    clusterModels = Lists.newArrayList();
   
    if (clustersIn != null && !clustersIn.isEmpty()) {
      Path clustersInPath = new Path(clustersIn);
      clusterModels = populateClusterModels(clustersInPath, conf);
      ClusteringPolicy policy = ClusterClassifier
          .readPolicy(finalClustersPath(clustersInPath));
      clusterClassifier = new ClusterClassifier(clusterModels, policy);
    }
    clusterId = new IntWritable();
  }
setup:一些引數的初始化工作,其中包括讀取上一步MR產生的全域性Canopy聚類集合和讀取聚類策略生成clusterClassifier等。
protected void map(WritableComparable<?> key, VectorWritable vw, Context context)
    throws IOException, InterruptedException {
    if (!clusterModels.isEmpty()) {
      Class<? extends Vector> vectorClass = vw.get().getClass();
      Vector vector = vw.get();
      if (!vectorClass.equals(NamedVector.class)) {
        if (key.getClass().equals(Text.class)) {
          vector = new NamedVector(vector, key.toString());
        } else if (key.getClass().equals(IntWritable.class)) {
          vector = new NamedVector(vector, Integer.toString(((IntWritable) key).get()));
        }
      }
      Vector pdfPerCluster = clusterClassifier.classify(vector);
      if (shouldClassify(pdfPerCluster)) {
        if (emitMostLikely) {
          int maxValueIndex = pdfPerCluster.maxValueIndex();
          write(new VectorWritable(vector), context, maxValueIndex, 1.0);
        } else {
          writeAllAboveThreshold(new VectorWritable(vector), context, pdfPerCluster);
        }
      }
    }
  }
map方法裡面主要就是將輸入路徑中的向量分入到不同的聚類中。然後將其序列化到HDFS上。 自此Mahout裡的Canopy演算法處理的整個過程基本看完了。雖然很粗糙,但是大體上還是理解了整個執行過程。