Elasticsearch ik分詞器修改原始碼實現從mysql中定時更新詞庫
阿新 • • 發佈:2019-02-16
下載原始碼匯入eclispe請參考我的上一篇文章ik分詞器安裝
第一步 修改pom檔案
加入mysql驅動jar 如圖所示
第二步 修改Java類
1.在Dictionary.java檔案中新增一個方法
/**
* 批量載入新停用詞條
*
* @param words
* Collection<String>詞條列表
*/
public void addStopWords(Collection<String> words) {
if (words != null ) {
for (String word : words) {
if (word != null) {
// 批量載入詞條到主記憶體詞典中
_StopWords.fillSegment(word.trim().toCharArray());
}
}
}
}
2.新建一個包 並新增幾個Java檔案
第一個 StringUtils.java
package org.wltea.analyzer.ext;
public class StringUtils {
/**
* 判斷字串是否為空 為空返回true 否則返回false
* @param str
* @return
*/
public static boolean isBlank(String str) {
int strLen;
if (str == null || (strLen = str.length()) == 0) {
return true;
}
for (int i = 0; i < strLen; i++) {
if ((Character.isWhitespace(str.charAt(i)) == false)) {
return false;
}
}
return true;
}
/**
* 判斷字串是否不為空 為空返回false 否則返回true
* @param str
* @return
*/
public static boolean isNotBlank(String str) {
return !StringUtils.isBlank(str);
}
}
第二個 DBHelper.java
package org.wltea.analyzer.ext;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
public class DBHelper {
Logger logger=Loggers.getLogger(DBRunnable.class);
public static String url = null;
public static String dbUser = null;
public static String dbPwd = null;
public static String dbTable = null;
/*public static String url = "jdbc:mysql:///elasticsearch";
public static String dbUser = "root";
public static String dbPwd = "whdhz19";
public static String dbTable = "t_es_ik_dic";*/
private Connection conn;
public static Map<String, Date> lastImportTimeMap = new HashMap<String, Date>();
static{
try {
Class.forName("com.mysql.jdbc.Driver");// 載入Mysql資料驅動
} catch (Exception e) {
e.printStackTrace();
}
}
private Connection getConn() throws Exception {
try {
conn = DriverManager.getConnection(url, dbUser, dbPwd);// 建立資料連線
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
/**
*
* @param key 資料庫中的屬性 擴充套件詞 停用詞 同義詞等
* @param flag
* @param synony
* @return
* @throws Exception
*/
public String getKey(String key, boolean flag, boolean... synony) throws Exception {
conn = getConn();
StringBuilder data = new StringBuilder();
PreparedStatement ps = null;
ResultSet rs = null;
try {
StringBuilder sql = new StringBuilder("select * from " + dbTable + " where delete_type=0");
//lastImportTime 最新更新時間
Date lastImportTime = DBHelper.lastImportTimeMap.get(key);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (lastImportTime != null && flag) {
sql.append(" and update_time > '" + sdf.format(lastImportTime) + "'");
}
sql.append(" and " + key + " !=''");
lastImportTime = new Date();
lastImportTimeMap.put(key,lastImportTime);
//如果打印出來的時間 和本地時間不一樣,則要注意JVM時區是否和伺服器系統時區一致
logger.warn("sql==={}",sql.toString());
ps = conn.prepareStatement(sql.toString());
rs = ps.executeQuery();
while (rs.next()) {
String value = rs.getString(key);
if (StringUtils.isNotBlank(value)) {
if (synony != null&&synony.length>0) {
data.append(value + "\n");
} else {
data.append(value + ",");
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (ps != null) {
ps.close();
}
if (rs != null) {
rs.close();
}
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
return data.toString();
}
public static void main(String[] args) throws Exception {
DBHelper dbHelper=new DBHelper();
String extWords=dbHelper.getKey("ext_word",true);
List<String> extList = Arrays.asList(extWords.split(","));
System.out.println(extList);
// System.out.println(getKey("stopword"));
// System.out.println(getKey("synonym"));
LocalDate now=LocalDate.now();
}
}
第三個 DBRunnable.java
package org.wltea.analyzer.ext;
import java.util.Arrays;
import java.util.List;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.wltea.analyzer.dic.Dictionary;
public class DBRunnable implements Runnable {
Logger logger = Loggers.getLogger(DBRunnable.class);
private String extField;
private String stopField;
public DBRunnable(String extField, String stopField) {
super();
this.extField = extField;
this.stopField = stopField;
}
@Override
public void run() {
logger.warn("開始載入詞庫========");
//獲取詞庫
Dictionary dic = Dictionary.getSingleton();
DBHelper dbHelper = new DBHelper();
try {
String extWords = dbHelper.getKey(extField, true);
String stopWords = dbHelper.getKey(stopField, true);
if(StringUtils.isNotBlank(extWords)){
List<String> extList = Arrays.asList(extWords.split(","));
//把擴充套件詞載入到主詞庫中
dic.addWords(extList);
logger.warn("載入擴充套件詞成功========");
logger.warn("extWords為==={}",extWords);
}
if(StringUtils.isNotBlank(stopWords)){
List<String> stopList = Arrays.asList(stopWords.split(","));
//把擴充套件詞載入到主詞庫中
dic.addStopWords(stopList);
logger.warn("載入停用詞成功========");
logger.warn("stopWords為==={}",stopWords);
}
} catch (Exception e) {
logger.warn("載入擴充套件詞失敗========{}",e);
}
}
}
第三步
複製AnalysisIkPlugin.java檔案
增加一個方法 不然elasticsearch 不能識別配置檔案中自己新增的屬性
@Override
public List<Setting<?>> getSettings() {
Setting<String> dbUrl=new Setting<>("dbUrl", "", Function.identity(), Property.NodeScope);
Setting<String> dbUser = new Setting<>("dbUser", "", Function.identity(),Property.NodeScope);
Setting<String> dbPwd = new Setting<>("dbPwd", "", Function.identity(),Property.NodeScope);
Setting<String> dbTable = new Setting<>("dbTable", "", Function.identity(),Property.NodeScope);
Setting<String> extField = new Setting<>("extField", "", Function.identity(),Property.NodeScope);
Setting<String> stopField = new Setting<>("stopField", "", Function.identity(),Property.NodeScope);
Setting<Integer> flushTime =Setting.intSetting("flushTime", 5, Property.NodeScope);
Setting<Boolean> autoReloadDic = Setting.boolSetting("autoReloadDic", false, Property.NodeScope);
return Arrays.asList(dbUrl,dbUser,dbPwd,dbTable,extField,stopField,flushTime,autoReloadDic);
}
第四步
備份IkTokenizerFactory.java檔案
修改IkTokenizerFactory.java的構造方法 修改後程式碼如下
public IkTokenizerFactory(IndexSettings indexSettings, Environment env, String name, Settings settings) {
super(indexSettings, name, settings);
configuration=new Configuration(env,settings);
//從es配置檔案elasticserach.yml中獲取mysql資訊
Settings s = indexSettings.getSettings();
String dbUrl = s.get("dbUrl");
boolean autoReloadDic=s.getAsBoolean("autoReloadDic", false);
if(autoReloadDic&&StringUtils.isBlank(DBHelper.url)&&StringUtils.isNotBlank(dbUrl)){
String dbUser = s.get("dbUser");
String dbPwd = s.get("dbPwd");
//獲取每隔多久從資料庫更新資訊 預設60S
Integer flushTime = s.getAsInt("flushTime", 60);
String dbTable = s.get("dbTable");
DBHelper.dbTable=dbTable;
DBHelper.dbUser=dbUser;
DBHelper.dbPwd=dbPwd;
DBHelper.url=dbUrl;
logger.warn("dbUrl=========={}",dbUrl);
String extField = s.get("extField");
String stopField = s.get("stopField");
logger.warn("extField=========={}",extField);
logger.warn("stopField=========={}",stopField);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(new DBRunnable(extField,stopField), 0, flushTime, TimeUnit.SECONDS);
}
}
第五步
在Dictionary.java檔案中新增一個方法
/**
* 批量載入新停用詞條
*
* @param words
* Collection<String>詞條列表
*/
public void addStopWords(Collection<String> words) {
if (words != null) {
for (String word : words) {
if (word != null) {
// 批量載入詞條到主記憶體詞典中
_StopWords.fillSegment(word.trim().toCharArray());
}
}
}
}
以上,ik分詞器修改結束 打包 複製 elasticsearch-analysis-ik-5.5.2.jar 替換掉伺服器上plugins資料夾下ik外掛裡面的同名jar包即可
如果伺服器還沒有安裝ik分詞器外掛,則將下圖所示打好的壓縮包上傳到plugins資料夾下解壓即可
注意:將mysql驅動jar mysql-connector-java-5.1.8.jar放入到解壓好的ik外掛資料夾裡 如圖所示
下面進行測試
第六步 修改elasticsearch配置檔案
[root@model elasticsearch-5.5.2]# vim config/elasticsearch.yml
最下面新增
dbUrl: jdbc:mysql://192.168.254.1/elasticsearch
dbUser: root
dbPwd: whdhz19
dbTable: t_es_ik_dic
extField: ext_word
stopField: stop_word
flushTime: 5
autoReloadDic: true
儲存退出即可
下面進行測試
第七步 mysql建表
CREATE TABLE t_es_ik_dic (
id int(11) PRIMARY KEY AUTO_INCREMENT COMMENT '自增id',
ext_word varchar(100) DEFAULT '' COMMENT '擴充套件分詞',
stop_word varchar(100) DEFAULT '' COMMENT '停用詞',
synonym varchar(100) DEFAULT '' COMMENT '同義詞',
dic_status tinyint(4) DEFAULT '0' COMMENT '狀態,0表示未新增,1表示新增',
delete_type tinyint(4) DEFAULT '0' COMMENT '0表示未刪除,1表示刪除',
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',
update_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間'
)
第八步 啟動
如果命令列列印如圖所示,則表示啟動成功
這時候可以在測試表中新增一些測試資料
如果命令列輸出如圖所示,表示新增擴充套件詞成功成功
第九步 使用kibana進行測試
分詞效果,如圖所示
第十步 測試停用詞
比如上例,認為 “米” 這個詞沒什麼意義 不用分詞,則在資料庫新增 如圖所示
命令列輸出如圖所示
kibana分詞如圖所示