1. 程式人生 > >ElasticSearch-jdbc從Mysql資料庫匯入和更新資料

ElasticSearch-jdbc從Mysql資料庫匯入和更新資料

環境

需要的資源和版本:
- git客戶端
- Elasticsearch 版本 2.2.0 安裝教程
- elasticsearch-jdbc 版本 2.2
- ik分詞器 版本 1.8
(下載後解壓將ik資料夾放在elasticsearch-2.2.0\plugins目下,重啟elasticsearch即可)
這裡寫圖片描述

一、安裝 jdbc

下載完後解壓,示例如下:

這裡寫圖片描述

提示:如果匯入的資料量很大,先修改elasticsearch的記憶體
開啟目錄D:\elasticsearch-2.2.0\bin
修改elasticsearch.in.bat和elasticsearch.in.sh檔案(好像只改前面一個就可以)

if "%ES_MIN_MEM%" == "" (
set ES_MIN_MEM=4g
)

if "%ES_MAX_MEM%" == "" (
set ES_MAX_MEM=10g
)

這是設定後的,實際情況根據自己電腦配置。

二、使用jdbc

1.配置環境變數

變數名:JDBC_IMPORTER_HOME
變數值:D:\elasticsearch-2.2.0\elasticsearch-jdbc-2.2.0.0

這裡寫圖片描述

2.在mysql資料庫里加時間戳(只匯入資料不更新的可以不加)

名:updateupdate_time
型別:timestamp
預設:CURRENT_TIMESTAMP

這裡寫圖片描述

3.資料匯入

進入D:\elasticsearch-2.2.0\elasticsearch-jdbc-2.2.0.0\bin目錄
建立 mysql-import.sh 的base指令碼,內容如下(使用時把註釋去掉):

#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test", #資料庫名
        "user" : "root"
, #資料庫使用者名稱 "password" : "root", #資料庫密碼 "sql" : "select *, id as _id from user", #sql插入語句 "index" : "user", #索引名 "type" : "user", #type名 "index_settings" : { "analysis" : { "analyzer" : { "ik" : { "tokenizer" : "ik" } } } }, "type_mapping": { "article" : { "properties" : { "id" : { "type" : "integer", "index" : "not_analyzed" }, "subject" : { "type" : "string", "analyzer" : "ik" }, "author" : { "type" : "string", "analyzer" : "ik" }, "create_time" : { "type" : "date" }, "update_time" : { "type" : "date" } } } } } } ' | java \ -cp "${lib}/*" \ -Dlog4j.configurationFile=${bin}/log4j2.xml \ org.xbib.tools.Runner \ org.xbib.tools.JDBCImporter

不使用ik分詞器時:

#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib
echo '{
"type" : "jdbc",
"jdbc": {
"elasticsearch.autodiscover":true,
"elasticsearch.cluster":"elasticsearch",    #叢集名 
"url":"jdbc:mysql://127.0.0.1:3306/test",   #資料庫名 
"user":"root",                              #資料庫使用者名稱
"password":"root",                          #資料庫密碼
"sql":"select *, id as _id from user",      #sql插入語句
"elasticsearch" : {
  "host" : "localhost",
  "port" : 9300
},
"index" : "user",                           #索引名
"type" : "user"                             #type名
}
}'| java \
  -cp "${lib}/*" \
  -Dlog4j.configurationFile=${bin}/log4j2.xml \
  org.xbib.tools.Runner \
  org.xbib.tools.JDBCImporter

直接點選import.sh執行指令碼即可。

4.增量索引、更新

進入D:\elasticsearch-2.2.0\elasticsearch-jdbc-2.2.0.0\bin目錄
建立 mysql-update.sh 的base指令碼,內容如下(使用時把註釋去掉):

#!/bin/sh
bin=$JDBC_IMPORTER_HOME/bin
lib=$JDBC_IMPORTER_HOME/lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "root",
        "password" : "root",
        "statefile" : "user.json",
        "schedule" : "0 0-59 0-23 ? * *",
        "sql" : [
            {
                "statement" : "select * from user where update_time > ?",
                "parameter" : [ "$metrics.lastexecutionstart" ]
            }
        ],
        "index" : "user",
        "type" : "user",
        "index_settings" : {
            "analysis" : {
            "analyzer" : {
                "ik" : {
                    "tokenizer" : "ik"
                }
            }
        }
        },
        "type_mapping": {
            "article" : {
                "properties" : {
                    "id" : {
                        "type" : "integer",
                        "index" : "not_analyzed"
                    },
                    "subject" : {
                        "type" : "string",
                        "analyzer" : "ik"
                    },
                    "author" : {
                        "type" : "string",
                        "analyzer" : "ik"
                    },
                    "create_time" : {
                        "type" : "date"
                    },
                    "update_time" : {
                        "type" : "date"
                    }
                }
            }
        }
    }
}
' | java \
    -cp "${lib}/*" \
    -Dlog4j.configurationFile=${bin}/log4j2.xml \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporter

執行mysql-update.sh指令碼
可以看到 命令列端 被佔用,一直在執行,並且在 mysql-update.sh 的同級目錄下生成了一個user .json 的檔案,sql 語句中需要的資料 lastexecutionstart 就儲存在該檔案中 。