XGBoost及CNN演算法的文字分類試驗
2018年6月份SMP會議有一個比賽,是頭條抓取的新聞進行分類決定是人類作者、機器翻譯、自動摘要、或者機器作者。多年沒有做過這方面的工作,看到朋友的介紹,就想拿那個資料來練習一兩個演算法。
正好在網路上牛人看到有介紹連他以前參加類似文字分類比較的介紹以及在github上共享的原始碼,於是就clone了他的程式碼來做測試。但是因為小白,基本上看不太懂,加上我裝的python 3以上的版本,所以錯誤百出。一番除錯之後,重寫了載入資料的部分,然後簡單選取了一些特徵用上xgboost演算法。比賽中效果最好的達到了99%得分,而我只是簡單選取了五十來個維度的特徵,主要包括詞法分析後文章中包含不同pos的數量,xgboost迭代後演算法也能夠達到95%的得分。可能現在主流的效果好的方法都是深度學習,但是這個演算法簡單,所以在這裡也貼上作為呼叫的參考吧。程式碼很亂,但是對於xgboost可以參考
# -*-coding=UTF-8-*- import jieba import jieba.posseg as pseg import numpy as np import xgboost as xgb import sys def readtrain(path, posd): labels = [] contents = [] ids = [] weights = [] with open(path, 'r') as f: for line in f: line = line[1:-2] tagwithv = line.split('\", \"') label = bytes(tagwithv[0].split('\": \"')[1].rstrip(' \"'), 'latin1').decode('unicode-escape') content = bytes(tagwithv[1].split('\": \"')[1].rstrip(' \"\\'), 'latin1').decode('unicode-escape') id = int(tagwithv[2].split(':')[1].strip()) labels.append(label) contents.append(content) ids.append(id) weights.append(getweights(content, posd)) if id % 1000 == 999: print('Now %d articles have been processed!'%id) #break print('Totally there are %d articles in training set' % len(labels)) trains =(labels,contents,ids, weights) return trains def getweights(content, posd): weights = [] weights.append(sys.getsizeof(content)) # 0. number of bytes weights.append(len(content)) # 1. number of characters words = pseg.cut(content) wnum = 0 # 2. number of words engnum = 0 # 3. number of english words digitnum = 0 # 4. number of digital words chnum = 0 # 5. number of chinese words posn = [0 for i in range(0,50)] for w in words: wnum += 1 #print(w.word + " " + w.flag) if is_chinese(w.word[0]): chnum += 1 if is_alphabet(w.word[0]): engnum += 1 if is_number(w.word[0]): digitnum += 1 if w.flag.lower() in posd: posn[posd[w.flag.lower()]] += 1 elif w.flag.lower()[0] in posd: posn[posd[w.flag.lower()[0]]] += 1 weights.append(wnum) weights.append(engnum) weights.append(digitnum) weights.append(chnum) weights = weights + posn # 6-*: number of pos return weights # 判斷一個unicode是否是漢字 def is_chinese(uchar): if '\u4e00' <= uchar<='\u9fff': return True else: return False # 判斷一個unicode是否是數字 def is_number(uchar): if '\u0030' <= uchar<='\u0039': return True else: return False # 判斷一個unicode是否是英文字母 def is_alphabet(uchar): if ('\u0041' <= uchar<='\u005a') or ('\u0061' <= uchar<='\u007a'): return True else: return False def readvalidation(path, posd): contents = [] ids = [] weights = [] with open(path, 'r') as f: for line in f: line = line[1:-2] tagwithv = line.split(', \"') #print(tagwithv[1].split('\": \"')[1].rstrip(' \"\\')) content = bytes(tagwithv[1].split('\": \"')[1].rstrip(' \"\\'), 'latin1').decode('unicode-escape') id = int(tagwithv[0].split('\": ')[1].strip()) contents.append(content) ids.append(id) weights.append(getweights(content, posd)) if id % 1000 == 999: print('Now validation number %d articles have been processed!'%id) #break validations =(ids,contents, weights) print('Totally there are %d articles in validation set' % len(ids)) return validations def getutferror(contents): utf = [] with open('D:\\src\\git\\text_classification_AI100\\train.txt', 'w', encoding='utf-8') as f: for x in contents: try: f.write(x) utf.append(0) except UnicodeEncodeError: #f.write(x.replace('\ud83d', '')) utf.append(1) return utf def outputweights(path, weights): with open(path, 'w', encoding='utf-8') as f: for xweights in weights: for yweight in xweights: f.write(str(yweight)) f.write(" ") f.write("\n") def outputlabels(path, labels): with open(path, 'w', encoding='utf-8') as f: for label in labels: f.write(str(label)) f.write("\n") def readweights(path): weights = [] with open(path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() # ignore last space #print(line) weigts = line.split(' ') weight = [] for w in weigts: weight.append(int(w)) if weight[1] == 0: print(str(len(weights))) weight.append(0 if weight[1] == 0 else weight[0]*1.0/weight[1]) # average character length weight.append(0 if weight[2] == 0 else weight[1]*1.0/weight[2]) # average word length weight.append(0 if weight[2] == 0 else weight[3]*1.0/weight[2]) # english word percentage weight.append(0 if weight[2] == 0 else weight[4]*1.0/weight[2]) # digital word percentage weight.append(0 if weight[2] == 0 else weight[5]*1.0/weight[2]) # chinese word percentage weight.append(0 if weight[2] == 0 else (weight[6+17]+weight[24]+weight[25]+weight[26]+weight[27]+weight[28]+weight[49]+weight[51])*1.0/weight[2])# percentage of n* weight.append(0 if weight[2] == 0 else (weight[6+31]+weight[38]+weight[39]+weight[40])*1.0/weight[2])# percentage of v* weight.append(0 if weight[2] == 0 else (weight[6+0]+weight[7]+weight[8]+weight[9])*1.0/weight[2])# percentage of a* weights.append(weight) print('Totally there are %d articles in weights set' % len(weights)) return weights def readlabels(path): labels = [] with open(path, 'r', encoding='utf-8') as f: for line in f: labels.append(int(line)) return labels ''' # class label dict = {u'人類作者': 0, u'機器作者': 1, u'自動摘要': 2, u'機器翻譯': 3} # jieba fenci pos tag posd = {'ag':0, 'a': 1, 'ad':2, 'an':3, 'b':4, 'c':5, 'dg':6, 'd':7, 'e':8, 'f':9, 'g':10, 'h':11, 'i':12, 'j':13, 'k':14, 'l':15, 'm':16, 'ng':17, 'n':18, 'nr':19, 'ns':20, 'nt':21, 'nz':22, 'o':23, 'p':24, 'q':25, 'r':26, 's':27, 'tg': 28, 't':29, 'u':30, 'vg':31, 'v':32, 'vd':33, 'vn':34, 'w':35, 'x':36, 'y':37, 'z':38, 'un':39, 'uj':40, 'eng':41, 'ul':42, 'nrfg':43, 'zg':44, 'nrt':45, 'uv':46, 'ud':47, 'mq':48, 'df':49} # read train data and extract the label and weights train = readtrain('D:\\src\\git\\text_classification_AI100\\data\\training.txt', posd) outputweights('D:\\src\\git\\text_classification_AI100\\data\\train_weights.txt', train[3]) outputlabels('D:\\src\\git\\text_classification_AI100\\data\\train_labels.txt', list(map(lambda x: dict[x], train[0]))) print('train data load finished %d' % len(train[0])) # read validation data and extract weights test = readvalidation('D:\\src\\git\\text_classification_AI100\\data\\validation.txt', posd) outputweights('D:\\src\\git\\text_classification_AI100\\data\\validation_weights.txt', test[2]) print('validation data load finished') ''' # read weights and labels train_opinion = np.array(readlabels('D:\\src\\git\\text_classification_AI100\\data\\train_labels.txt')) train_weights = np.array(readweights('D:\\src\\git\\text_classification_AI100\\data\\train_weights.txt')) test_weights = np.array(readweights('D:\\src\\git\\text_classification_AI100\\data\\validation_weights.txt')) # load train and validation weights dtrain = xgb.DMatrix(train_weights, label=train_opinion) dtest = xgb.DMatrix(test_weights) # label可以不要,此處需要是為了測試效果 param = {'max_depth':7, 'eta':0.5, 'eval_metric':'merror', 'silent':1, 'objective':'multi:softmax', 'num_class':4} # 引數 evallist = [(dtrain,'train')] # 這步可以不要,用於測試效果 num_round = 300 # 迴圈次數 bst = xgb.train(param, dtrain, num_round, evallist) preds = bst.predict(dtest, ntree_limit=bst.best_ntree_limit) with open('D:\\src\\git\\text_classification_AI100\\XGBOOST_Toutiao_OUTPUT.csv', 'w', encoding='utf-8') as f: for i, pre in enumerate(preds): f.write(str(i + 146421)) f.write(',') f.write(list(dict.keys())[int(pre)]) f.write('\n')
作為練習,我後來又試驗了該github庫中的CNN的方法。關於介紹和使用,主要參考這裡。在data_helpers.py裡面加上讀取資料的部分:
#load_AI100_data_and_labels('data/AI100/training.csv') def load_SMP_EUPT_data_and_lables(path, istraining=True): y = [] x_text = [] dict = {u'人類作者': 0, u'機器作者': 1, u'自動摘要': 2, u'機器翻譯': 3} id = 0 stopwords_list = stop_words() with open(path, 'r') as f: for line in f: line = line[1:-2] tagwithv = line.split('\", \"') if istraining else line.split(', \"') content = bytes(tagwithv[1].split('\": \"')[1].rstrip(' \"\\'), 'latin1').decode('unicode-escape') #text = jieba_fenci(content) #text = list(jieba.cut(content, cut_all=False)) text = jieba_fenci(content, stopwords_list) #print(text) x_text.append(text) if istraining: label = bytes(tagwithv[0].split('\": \"')[1].rstrip(' \"'), 'latin1').decode('unicode-escape') one_hot = [0]*4 one_hot[dict[label]] = 1 y.append(np.array(one_hot)) if id % 10000 == 9999: print('Now %d articles have been processed!'%id) #break id += 1 print('Totally there are %d articles in training set' % len(x_text)) return [x_text, np.array(y)]
主要是把分詞的系列給到模型做train,train完之後(./train.py)就會一個新生成的資料夾來儲存訓練好的模型的結果,然後執行eval來進行預測或者驗證(./eval.py --eval_train --checkpoint_dir="./runs/1459637919/checkpoints/")。這裡提供的是驗證的方式,可以最後直接給出準確率。而這個則給的是預測的程式碼。
checkpoint_file = tf.train.latest_checkpoint(FLAGS.checkpoint_dir)
graph = tf.Graph()
with graph.as_default():
session_conf = tf.ConfigProto(
allow_soft_placement=FLAGS.allow_soft_placement,
log_device_placement=FLAGS.log_device_placement)
sess = tf.Session(config=session_conf)
with sess.as_default():
# Load the saved meta graph and restore variables
saver = tf.train.import_meta_graph("{}.meta".format(checkpoint_file))
saver.restore(sess, checkpoint_file)
# Get the placeholders from the graph by name
input_x = graph.get_operation_by_name("input_x").outputs[0]
# input_y = graph.get_operation_by_name("input_y").outputs[0]
dropout_keep_prob = graph.get_operation_by_name("dropout_keep_prob").outputs[0]
# Tensors we want to evaluate
predictions = graph.get_operation_by_name("output/predictions").outputs[0]
# Generate batches for one epoch
batches = data_helpers.batch_iter(list(x_test), FLAGS.batch_size, 1, shuffle=False)
# Collect the predictions here
all_predictions = []
for x_test_batch in batches:
batch_predictions = sess.run(predictions, {input_x: x_test_batch, dropout_keep_prob: 1.0})
all_predictions = np.concatenate([all_predictions, batch_predictions])
# class label
dict = {u'人類作者': 0, u'機器作者': 1, u'自動摘要': 2, u'機器翻譯': 3}
with open('CNN_OUTPUT.csv', 'w', encoding='utf-8') as f:
for i, pre in enumerate(all_predictions):
f.write(str(i+146421))
f.write(',')
f.write(list(dict.keys())[int(pre)])
f.write('\n')
# # Print accuracy if y_test is defined
# if y_test is not None:
# correct_predictions = float(sum(all_predictions == y_test))
# print("Total number of test examples: {}".format(len(y_test)))
# print("Accuracy: {:g}".format(correct_predictions/float(len(y_test))))
#
# # Save the evaluation to a csv
# predictions_human_readable = np.column_stack((np.array(x_raw), all_predictions))
# out_path = os.path.join(FLAGS.checkpoint_dir, "..", "prediction.csv")
# print("Saving evaluation to {0}".format(out_path))
# with open(out_path, 'w') as f:
# csv.writer(f).writerows(predictions_human_readable)
我可以執行AI100的程式碼以及資料,但是對於畫像的資料,當我用10000篇文章時候也可以執行,但是再增加就記憶體吃不消了。可能這是因為我就用的一臺14年的老機器承受不住那麼多的詞彙量。總之,因為懂的少,所以就是簡單玩了下,也不知道內部機理,希望以後有機會可以深入學習一下吧。