1. 程式人生 > >Spark2.0 特征提取、轉換、選擇之二:特征選擇、文本處理,以中文自然語言處理(情感分類)為例

Spark2.0 特征提取、轉換、選擇之二:特征選擇、文本處理,以中文自然語言處理(情感分類)為例

true 方便 linear value taf 文檔 ota ati inter

特征選擇

RFormula

RFormula是一個很方便,也很強大的Feature選擇(自由組合的)工具。
輸入string 進行獨熱編碼(見下面例子country)
輸入數值型轉換為double(見下面例子hour)
label為string,也用StringIndexer進行編號

RFormula produces a vector column of features and a double or string 
column of label. Like when formulas are used in R for linear regression, 
string input columns will be one-hot encoded, and numeric columns will be 
cast to doubles. If the label column 
is of type string, it will be first transformed to double with StringIndexer. If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.
如輸入數據為
+---+-------+----+------+-------+
| id|country|hour|salary|clicked|
+---+-------+----+------+-------+
|  7
| US| 18|2500.0| 1.0| | 8| CA| 12|1500.0| 0.0| | 9| NZ| 15|1250.0| 0.0| | 10| US| 10|3200.0| 1.0| +---+-------+----+------+-------+ 使用RFormula: RFormula formula = new RFormula() .setFormula("clicked ~ country + hour + salary") //clicked作為label,~之後的三個為選擇的特征
.setFeaturesCol("features") .setLabelCol("label"); +---------------------+-----+ |features |label| +---------------------+-----+ |[1.0,0.0,18.0,2500.0]|1.0 | |[0.0,0.0,12.0,1500.0]|0.0 | |[0.0,1.0,15.0,1250.0]|0.0 | |[1.0,0.0,10.0,3200.0]|1.0 | +---------------------+-----+ [1.0,0.0]為"US"的獨熱編碼,以此類推

卡方獨立檢驗

ChiSqSelector
參考:
http://www.blogjava.net/zhenandaci/archive/2008/08/31/225966.html
裏面舉得例子很好理解(原文真的很通俗易懂,直接參考原文吧,瞬間明白的感覺)。

在Spark中似乎非常慢???

ChiSqSelector chiSqSelector=new ChiSqSelector()
                .setFeaturesCol("TF")
                .setOutputCol("features")
                .setLabelCol("label")
                .setNumTopFeatures(2);
Dataset<Row> wordsChiSq=chiSqSelector.fit(wordsTF).transform(wordsTF);

文本轉換及特征提取

英文分詞

中文分詞

中文分詞工具比較多,Java,Python版本都有,這裏以IKAnalyzer2012+Java版本為例說明。
使用時參考IKAnalyzer2012自帶的中文幫助文檔(有比較詳細的用法)。
IKAnalyzer2012它 的 安 裝 部 署 十 分 簡 單 , 將 IKAnalyzer2012.jar 部 署 於 項 目 的 lib 目 錄 中 ;IKAnalyzer.cfg.xml 與 stopword.dic 文件放置在 class 根目錄。
依賴Lucene的類org.wltea.analyzer.luceneorg.wltea.analyzer,分詞主類。

//創建分詞對象  
Analyzer anal=new IKAnalyzer(true);       
StringReader reader=new StringReader(row.getString(1));  
//分詞  
TokenStream ts=anal.tokenStream("", reader);  
CharTermAttribute term=(CharTermAttribute) ts
                    .getAttribute(CharTermAttribute.class);  
                    //遍歷分詞數據
                    String words="";
                    while(ts.incrementToken()){  
                        words+=(term.toString()+"|");
                    }  

正則表達式分詞

word2vect

TF-IDF

去停用詞

應用例子

下面是一個綜合的實例子,用到了Spark的一些特征轉換API。由於需要處理中文,還需要一個分詞器。

準備語料

(I)首先準備一個數據集,譚松波老師收集的中文情感分析酒店評論語料
從CSDN上可以下載:http://download.csdn.net/download/x_i_y_u_e/9273533
1、-ChnSentiCorp-Htl-ba-2000: 平衡語料,正負類各1000篇。
2、ChnSentiCorp-Htl-ba-4000: 平衡語料,正負類各2000篇。
3、ChnSentiCorp-Htl-ba-6000: 平衡語料,正負類各3000篇。
4、ChnSentiCorp-Htl-unba-10000: 非平衡語料,正類為7000篇。

(II) 在linux下是亂碼的,需要轉換:
編碼查看:
%file -i neg.9.txt
neg.9.txt: text/plain; charset=iso-8859-1
需要轉換為utf8
%iconv -f gb18030 -t utf8 neg.9.txt -o neg.9.o.txt
-f :from -t :to
批量轉如下:
(1)復制文件目錄 find ChnSen* -type d -exec mkdir -p utf/{} \;
(2)轉換 find ChnSen* -type f -exec iconv -f GBK -t UTF-8 {} -o utf/{} \;
ChnSen*是單前文件夾下的目錄,utf是輸出目錄

(III) python 處理文件,合並為一個文件,去掉一條評論中所有換行

# -*- coding: utf-8 -*-
#將所有評論讀入到一個文件中,每個原始文件文件為一條評論,中間不要出現換行
#repalce("\r"," "),是將^M(window下產生的 linux不認識的換行)去掉
#輸出csv格式

path_out="E:/data/utf/ChnSentiCorp_htl_ba_2000/all.csv"
fw=open(path_out,w+)

#負樣本
for i in range(999):    

    path1="E:/data/utf/ChnSentiCorp_htl_ba_2000/neg."+str(i+1)+".txt"
    fr=open(path1)
    lines=fr.readlines()

    fw.write("0.0,")#label    

    for line in lines:
        #repalce("\r"," "),是將^M(window下產生的 linux不認識的換行)去掉
        #replace(",",""),是為輸出CSV格式做準備
        line=line.replace("\r",‘‘).strip().replace(",","")
        fw.write(line)

    fw.write("\n")   
    fr.close()

#正樣本
for i in range(999):   

    path2="E:/data/utf/ChnSentiCorp_htl_ba_2000/pos."+str(i+1)+".txt"
    fr=open(path2)

    fw.write("1.0,")#label    

    lines=fr.readlines()
    for line in lines:
        line=line.replace("\r",‘‘).strip().replace(",","")
        fw.write(line)        
    fw.write("\n")

    fr.close()

fw.close

完整流程

可參考論文(只是分詞工具不同):
基於 Spark 的文本情感分析
http://www.ibm.com/developerworks/cn/cognitive/library/cc-1606-spark-seniment-analysis/index.html
思路是一樣的,不過我是用Java實現的,寫起來遠遠不如Python簡潔。

//初步完整的流程,還需要進一步優化
//IKAnalyzer2012分詞->TF-IDF特征->NaiveBayes ML

package my.spark.ml.practice.classification;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.ml.classification.NaiveBayes;
import org.apache.spark.ml.classification.NaiveBayesModel;

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;

import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.io.IOException;
import java.io.StringReader;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;

//引用IKAnalyzer2012的類
import org.wltea.analyzer.lucene.IKAnalyzer;

import my.spark.ml.practice.classification.LabelValue;;

//文本處理,酒店評論
public class myHotelTextClassifer3 {        

    public static void main(String[] args) throws IOException {

        SparkSession spark=SparkSession
                .builder()
                .appName("Chinese Text Processing")
                .master("local[4]")
                .config("spark.sql.warehouse.dir",
                        "file///:G:/Projects/Java/Spark/spark-warehouse" )
                .getOrCreate();  

        //屏蔽日誌
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF);
        //--------------------------(0)讀入數據,數據預處理--------------------------------
        //原始數據文件包含兩行,一行是label,一行是sentence,csv格式的
        Dataset<Row> raw=spark.read().format("csv")
                .load("E:/data/utf/ChnSentiCorp_htl_ba_2000/all.csv");        

        //去掉空值,不然一定後面一定拋出nullpointer異常
        //distinct數據去重 ,並打亂數據的順序:不然數據是先負樣本後正樣本有規律排列的,預測效果偏高
        Dataset<Row> rawFilterNaN=raw
                .filter(raw.col("_c0").isNotNull())
                .filter(raw.col("_c1").isNotNull())
                .distinct();              

        //--------------------------(1)分詞----------------------------------------
        //為Map自定義了Class LabelValue,見後面       
        Encoder<LabelValue> LongStringEncoder=Encoders.bean(LabelValue.class);
        Dataset<LabelValue> wordsDF=rawFilterNa.map(new MapFunction<Row,LabelValue>() {         
            @Override           
            public LabelValue call(Row row) throws Exception {  
                if (row!=null) {
                    LabelValue ret = new LabelValue();  
                    double Label=1.0;
                    if (row.getString(0).equals("0.0")) {
                        Label=0.0;
                    }else{
                        Label=1.0;
                    }                   
                    ret.setLabel(Label);

                    //-------------KAnalyzer分詞--------------------
                    //創建分詞對象  
                    Analyzer anal=new IKAnalyzer(true);       
                    StringReader reader=new StringReader(row.getString(1));  
                    //分詞  
                    TokenStream ts=anal.tokenStream("", reader);  
                    CharTermAttribute term=(CharTermAttribute) ts
                            .getAttribute(CharTermAttribute.class);  
                    //遍歷分詞數據
                    String words="";
                    while(ts.incrementToken()){  
                        words+=(term.toString()+"|");
                    }  
                    ret.setValue(words);
                    reader.close();

                    return ret;                 
                }  
                else {
                    return null;
                }
            }

        }, LongStringEncoder);        


        //--------------------------(1)-2 RegexTokenizer分詞器-----------------------------
        RegexTokenizer regexTokenizer = new RegexTokenizer()
                      .setInputCol("value")
                      .setOutputCol("words")
                      .setPattern("\\|");

        Dataset<Row> wordsDF2 = regexTokenizer.transform(wordsDF); 


        //--------------------------(2) HashingTF訓練詞頻矩陣---------------------------------       

        HashingTF tf=new HashingTF()
                .setInputCol("words")
                .setOutputCol("TF");
        Dataset<Row> wordsTF=tf.transform(wordsDF2).select("TF","label");  
        wordsTF.show();wordsTF.printSchema();
        Dataset<Row> wordsTF2=wordsTF
                .filter(wordsTF.col("TF").isNotNull())
                .filter(wordsTF.col("label").isNotNull());        


        //------------------------- (4)計算 TF-IDF 矩陣--------------------------------------
        IDFModel idf=new IDF()
                .setInputCol("TF")
                .setOutputCol("features")
                .fit(wordsTF2);
        Dataset<Row> wordsTfidf=idf.transform(wordsTF2);         


       //----------------------------(5)NaiveBayesModel ML---------------------
        Dataset<Row>[] split=wordsTfidf.randomSplit(new double[]{0.6,0.4});
        Dataset<Row> training=split[0];
        Dataset<Row> test=split[1];          

        NaiveBayes naiveBayes=new NaiveBayes()
                .setLabelCol("label")
                .setFeaturesCol("features");  
        NaiveBayesModel naiveBayesModel=naiveBayes.fit(training);

        Dataset<Row> predictDF=naiveBayesModel.transform(test);

        //自定義計算accuracy        
        double total=(double) predictDF.count();
        Encoder<Double> doubleEncoder=Encoders.DOUBLE();

        Dataset<Double> accuracyDF=predictDF.map(new MapFunction<Row,Double>() {            
            @Override
            public Double call(Row row) throws Exception {
                if((double)row.get(1)==(double)row.get(5)){return 1.0;}
                else {return 0.0;}
            }
        }, doubleEncoder);       

        accuracyDF.createOrReplaceTempView("view");
        double correct=(double) spark.sql("SELECT value FROM view WHERE value=1.0").count();
        System.out.println("accuracy "+(correct/total));

        //計算areaUnderRoc    
        double areaUnderRoc=new BinaryClassificationEvaluator()
                    .setLabelCol("label")
                    .setRawPredictionCol("prediction")
                    .evaluate(predictDF);
        //(areaUnderROC|areaUnderPR) (default: areaUnderROC)                            
        System.out.println("areaUnderRoc "+areaUnderRoc);  
   }
}



//結果分析
//accuracy 0.7957860615883307
//areaUnderRoc 0.7873761854583772
//應該還有提升的空間

//Class LabelValue
package my.spark.ml.practice.classification;

import java.io.Serializable;

public class LabelValue implements Serializable {
  private String value; 
  private double label;

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }


  public double getLabel() {
        return label;
      }

  public void setLabel(double label) {
        this.label = label;
      }
}

先使用word2vect,然後將詞產生的向量作為特征,分別用隨機森林,GBTC,
LogisticRegression,其中GBTC效果最好。但是普遍不如Naive-Bayes,可能還需要某些地方進行改進。

//轉換為詞向量,並進行標準化
Word2Vec word2Vec=new Word2Vec()
                .setInputCol("words")
                .setOutputCol("vect")
                .setVectorSize(10);
        Dataset<Row> vect=word2Vec
                .fit(wordsDF2)
                .transform(wordsDF2);
        //vect.show();vect.printSchema();
        //正則化
        Dataset<Row> vect2=new MinMaxScaler()
                .setInputCol("vect")
                .setOutputCol("features")
                .setMax(1.0)
                .setMin(0.0)
                .fit(vect)
                .transform(vect);   
        //vect2.show();vect2.printSchema();

//GBTC分類
GBTClassifier gbtc=new GBTClassifier()
                .setLabelCol("label")
                .setFeaturesCol("vect")
                .setMaxDepth(10)
                .setMaxIter(10)
                .setStepSize(0.1);
        Dataset<Row> predictDF=gbtc.fit(training0).transform(test0);
 //其余代碼是一樣的,可以嘗試不同的參數組合。 

Spark2.0 特征提取、轉換、選擇之二:特征選擇、文本處理,以中文自然語言處理(情感分類)為例