1. 程式人生 > >Elasticsearch 之(25)重寫IK分詞器原始碼來基於mysql熱更新詞庫

Elasticsearch 之(25)重寫IK分詞器原始碼來基於mysql熱更新詞庫

熱更新
在上一節《IK分詞器配置檔案講解以及自定義詞庫》自定義詞庫,每次都是在es的擴充套件詞典中,手動新增新詞語,很坑
(1)每次新增完,都要重啟es才能生效,非常麻煩
(2)es是分散式的,可能有數百個節點,你不能每次都一個一個節點上面去修改

es不停機,直接我們在外部某個地方新增新的詞語,es中立即熱載入到這些新詞語

熱更新的方案
(1)修改ik分詞器原始碼,然後手動支援從mysql中每隔一定時間,自動載入新的詞庫
(2)基於ik分詞器原生支援的熱更新方案,部署一個web伺服器,提供一個http介面,通過modified和tag兩個http響應頭,來提供詞語的熱更新

用第一種方案,第二種,ik git社群官方都不建議採用,覺得不太穩定

1、下載原始碼
https://github.com/medcl/elasticsearch-analysis-ik/tree/v5.2.0
ik分詞器,是個標準的java maven工程,直接匯入eclipse就可以看到原始碼

2、修改原始碼

Dictionary單例類的初始化方法initial,在這裡需要建立一個我們自定義的執行緒,並且啟動它
/**
 * 詞典初始化 由於IK Analyzer的詞典採用Dictionary類的靜態方法進行詞典初始化
 * 只有當Dictionary類被實際呼叫時,才會開始載入詞典, 這將延長首次分詞操作的時間 該方法提供了一個在應用載入階段就初始化字典的手段
 * 
 * @return Dictionary
 */
public static synchronized Dictionary initial(Configuration cfg) {
	if (singleton == null) {
		synchronized (Dictionary.class) {
			if (singleton == null) {


				singleton = new Dictionary(cfg);
				singleton.loadMainDict();
				singleton.loadSurnameDict();
				singleton.loadQuantifierDict();
				singleton.loadSuffixDict();
				singleton.loadPrepDict();
				singleton.loadStopWordDict();
				
				new Thread(new HotDictReloadThread()).start();
				
				if(cfg.isEnableRemoteDict()){
					// 建立監控執行緒
					for (String location : singleton.getRemoteExtDictionarys()) {
						// 10 秒是初始延遲可以修改的 60是間隔時間 單位秒
						pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
					}
					for (String location : singleton.getRemoteExtStopWordDictionarys()) {
						pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
					}
				}


				return singleton;
			}
		}
	}
	return singleton;
}
HotDictReloadThread類:就是死迴圈,不斷呼叫Dictionary.getSingleton().reLoadMainDict(),去重新載入詞典
public class HotDictReloadThread implements Runnable {

private static final Logger logger = ESLoggerFactory.getLogger(HotDictReloadThread.class.getName());

@Override
public void run() {
	while(true) {
		logger.info("[==========]reload hot dict from mysql......");   
		Dictionary.getSingleton().reLoadMainDict();
	}
}

}
Dictionary類:更新詞典 this.loadMySQLExtDict()
/**
 * 載入主詞典及擴充套件詞典
 */
private void loadMainDict() {
	// 建立一個主詞典例項
	_MainDict = new DictSegment((char) 0);

	// 讀取主詞典檔案
	Path file = PathUtils.get(getDictRoot(), Dictionary.PATH_DIC_MAIN);

	InputStream is = null;
	try {
		is = new FileInputStream(file.toFile());
	} catch (FileNotFoundException e) {
		logger.error(e.getMessage(), e);
	}

	try {
		BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"), 512);
		String theWord = null;
		do {
			theWord = br.readLine();
			if (theWord != null && !"".equals(theWord.trim())) {
				_MainDict.fillSegment(theWord.trim().toCharArray());
			}
		} while (theWord != null);

	} catch (IOException e) {
		logger.error("ik-analyzer", e);

	} finally {
		try {
			if (is != null) {
				is.close();
				is = null;
			}
		} catch (IOException e) {
			logger.error("ik-analyzer", e);
		}
	}
	// 載入擴充套件詞典
	this.loadExtDict();
	// 載入遠端自定義詞庫
	this.loadRemoteExtDict();
	// 從mysql載入詞典
	this.loadMySQLExtDict();
}

/**
 * 從mysql載入熱更新詞典
 */
private void loadMySQLExtDict() {
	Connection conn = null;
	Statement stmt = null;
	ResultSet rs = null;
	
	try {
		Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");   
		prop.load(new FileInputStream(file.toFile()));
		
		logger.info("[==========]jdbc-reload.properties");
		for(Object key : prop.keySet()) {
			logger.info("[==========]" + key + "=" + prop.getProperty(String.valueOf(key)));      
		}
		
		logger.info("[==========]query hot dict from mysql, " + prop.getProperty("jdbc.reload.sql") + "......");  
		
		conn = DriverManager.getConnection(
				prop.getProperty("jdbc.url"),   
				prop.getProperty("jdbc.user"),  
				prop.getProperty("jdbc.password"));  
		stmt = conn.createStatement();
		rs = stmt.executeQuery(prop.getProperty("jdbc.reload.sql"));  
		
		while(rs.next()) {
			String theWord = rs.getString("word"); 
			logger.info("[==========]hot word from mysql: " + theWord); 
			_MainDict.fillSegment(theWord.trim().toCharArray());
		}
		 
		Thread.sleep(Integer.valueOf(String.valueOf(prop.get("jdbc.reload.interval"))));   
	} catch (Exception e) {
		logger.error("erorr", e); 
	} finally {
		if(rs != null) {
			try {
				rs.close();
			} catch (SQLException e) {
				logger.error("error", e); 
			}
		}
		if(stmt != null) {
			try {
				stmt.close();
			} catch (SQLException e) {
				logger.error("error", e); 
			}
		}
		if(conn != null) {
			try {
				conn.close();
			} catch (SQLException e) {
				logger.error("error", e); 
			}
		}
	}
}

Dictionary類:更新分詞 this.loadMySQLStopwordDict();
/**
 * 從mysql載入停用詞
 */
private void loadMySQLStopwordDict() {
	Connection conn = null;
	Statement stmt = null;
	ResultSet rs = null;
	
	try {
		Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");   
		prop.load(new FileInputStream(file.toFile()));
		
		logger.info("[==========]jdbc-reload.properties");
		for(Object key : prop.keySet()) {
			logger.info("[==========]" + key + "=" + prop.getProperty(String.valueOf(key)));      
		}
		
		logger.info("[==========]query hot stopword dict from mysql, " + prop.getProperty("jdbc.reload.stopword.sql") + "......");  
		
		conn = DriverManager.getConnection(
				prop.getProperty("jdbc.url"),   
				prop.getProperty("jdbc.user"),  
				prop.getProperty("jdbc.password"));  
		stmt = conn.createStatement();
		rs = stmt.executeQuery(prop.getProperty("jdbc.reload.stopword.sql"));  
		
		while(rs.next()) {
			String theWord = rs.getString("word"); 
			logger.info("[==========]hot stopword from mysql: " + theWord); 
			_StopWords.fillSegment(theWord.trim().toCharArray());
		}
		 
		Thread.sleep(Integer.valueOf(String.valueOf(prop.get("jdbc.reload.interval"))));   
	} catch (Exception e) {
		logger.error("erorr", e); 
	} finally {
		if(rs != null) {
			try {
				rs.close();
			} catch (SQLException e) {
				logger.error("error", e); 
			}
		}
		if(stmt != null) {
			try {
				stmt.close();
			} catch (SQLException e) {
				logger.error("error", e); 
			}
		}
		if(conn != null) {
			try {
				conn.close();
			} catch (SQLException e) {
				logger.error("error", e); 
			}
		}
	}
}
配置
jdbc.url=jdbc:mysql://localhost:3306/test?serverTimezone=GMT
jdbc.user=root
jdbc.password=root
jdbc.reload.sql=select word from hot_words
jdbc.reload.stopword.sql=select stopword as word from hot_stopwords
jdbc.reload.interval=1000

3、mvn package打包程式碼

target\releases\elasticsearch-analysis-ik-5.2.0.zip


4、解壓縮ik壓縮包

將mysql驅動jar,放入ik的目錄下


5、重啟es



6、在mysql中新增詞庫與停用詞



7、kibana分詞驗證

GET /my_index/_analyze
{
  "text": "一人飲酒醉",
  "analyzer": "ik_max_word"
}

{
  "tokens": [
    {
      "token": "一人飲酒醉",
      "start_offset": 0,
      "end_offset": 5,
      "type": "CN_WORD",
      "position": 0
    },
    {
      "token": "一人",
      "start_offset": 0,
      "end_offset": 2,
      "type": "CN_WORD",
      "position": 1
    },
    {
      "token": "一",
      "start_offset": 0,
      "end_offset": 1,
      "type": "TYPE_CNUM",
      "position": 2
    },
    {
      "token": "人",
      "start_offset": 1,
      "end_offset": 2,
      "type": "COUNT",
      "position": 3
    },
    {
      "token": "飲酒",
      "start_offset": 2,
      "end_offset": 4,
      "type": "CN_WORD",
      "position": 4
    },
    {
      "token": "飲",
      "start_offset": 2,
      "end_offset": 3,
      "type": "CN_WORD",
      "position": 5
    },
    {
      "token": "酒醉",
      "start_offset": 3,
      "end_offset": 5,
      "type": "CN_WORD",
      "position": 6
    },
    {
      "token": "酒",
      "start_offset": 3,
      "end_offset": 4,
      "type": "CN_WORD",
      "position": 7
    },
    {
      "token": "醉",
      "start_offset": 4,
      "end_offset": 5,
      "type": "CN_WORD",
      "position": 8
    }
  ]
}