ElasticSearch-jdbc從Mysql資料庫匯入和更新資料
阿新 • • 發佈:2019-02-15
環境:
需要的資源和版本:
- git客戶端
- Elasticsearch 版本 2.2.0 安裝教程
- elasticsearch-jdbc 版本 2.2
- ik分詞器 版本 1.8
(下載後解壓將ik資料夾放在elasticsearch-2.2.0\plugins目下,重啟elasticsearch即可)
一、安裝 jdbc
下載完後解壓,示例如下:
提示:如果匯入的資料量很大,先修改elasticsearch的記憶體
開啟目錄D:\elasticsearch-2.2.0\bin
修改elasticsearch.in.bat和elasticsearch.in.sh檔案(好像只改前面一個就可以)
if "%ES_MIN_MEM%" == "" (
set ES_MIN_MEM=4g
)
if "%ES_MAX_MEM%" == "" (
set ES_MAX_MEM=10g
)
這是設定後的,實際情況根據自己電腦配置。
二、使用jdbc
1.配置環境變數
變數名:JDBC_IMPORTER_HOME
變數值:D:\elasticsearch-2.2.0\elasticsearch-jdbc-2.2.0.0
2.在mysql資料庫里加時間戳(只匯入資料不更新的可以不加)
名:updateupdate_time 型別:timestamp 預設:CURRENT_TIMESTAMP
3.資料匯入
進入D:\elasticsearch-2.2.0\elasticsearch-jdbc-2.2.0.0\bin目錄
建立 mysql-import.sh 的base指令碼,內容如下(使用時把註釋去掉):
#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
echo '
{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test", #資料庫名
"user" : "root" , #資料庫使用者名稱
"password" : "root", #資料庫密碼
"sql" : "select *, id as _id from user", #sql插入語句
"index" : "user", #索引名
"type" : "user", #type名
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
"tokenizer" : "ik"
}
}
}
},
"type_mapping": {
"article" : {
"properties" : {
"id" : {
"type" : "integer",
"index" : "not_analyzed"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter
不使用ik分詞器時:
#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
echo '{
"type" : "jdbc",
"jdbc": {
"elasticsearch.autodiscover":true,
"elasticsearch.cluster":"elasticsearch", #叢集名
"url":"jdbc:mysql://127.0.0.1:3306/test", #資料庫名
"user":"root", #資料庫使用者名稱
"password":"root", #資料庫密碼
"sql":"select *, id as _id from user", #sql插入語句
"elasticsearch" : {
"host" : "localhost",
"port" : 9300
},
"index" : "user", #索引名
"type" : "user" #type名
}
}'| java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter
直接點選import.sh執行指令碼即可。
4.增量索引、更新
進入D:\elasticsearch-2.2.0\elasticsearch-jdbc-2.2.0.0\bin目錄
建立 mysql-update.sh 的base指令碼,內容如下(使用時把註釋去掉):
#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
echo '
{
"type" : "jdbc",
"jdbc" : {
"url" : "jdbc:mysql://localhost:3306/test",
"user" : "root",
"password" : "root",
"statefile" : "user.json",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : [
{
"statement" : "select * from user where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
}
],
"index" : "user",
"type" : "user",
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
"tokenizer" : "ik"
}
}
}
},
"type_mapping": {
"article" : {
"properties" : {
"id" : {
"type" : "integer",
"index" : "not_analyzed"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter
執行mysql-update.sh指令碼
可以看到 命令列端 被佔用,一直在執行,並且在 mysql-update.sh 的同級目錄下生成了一個user .json 的檔案,sql 語句中需要的資料 lastexecutionstart 就儲存在該檔案中 。