1. 程式人生 > >Elasticsearch JDBC的使用-MySQL 資料來源匯入和增量索引、更新

Elasticsearch JDBC的使用-MySQL 資料來源匯入和增量索引、更新

在使用 Elasticsearch 的時候,經常會涉及到要將其它資料來源的資料匯入到 Elasticsearch 中,今天就來介紹一下關於 Elasticsearch 從 MySQL 匯入資料和增量索引的實現
這裡要用到一個 Elasticsearch 的外掛 elasticsearch-jdbc

需要的資源和版本
Elasticsearch 版本:2.2.0 CSDN下載
elasticsearch-jdbc 版本 : 2.2 CSDN下載

一、安裝 jdbc

jdbc 的壓縮包我已經放在了 /usr/local/src/ 目錄下,可以去它的 GitHub地址 獲取對應版本的壓縮包

cd /usr/local
/src/ unzip ./elasticsearch-jdbc-2.2.0.0-dist.zip cp -r ./elasticsearch-jdbc-2.2.0.0 /usr/local/elasticsearch-2.2.0/jdbc2.2

這樣就可以使用啦,jdbc 還提供了一些常用的例子,在 【ES安裝目錄/jdbc2.2/bin/ 】這個資料夾下,改一改就可以用,都是bash 檔案,記得加執行許可權哦

二、使用jdbc

我們先在 MySQL中建立一個用於測試的資料表 article ,並新增幾條資料
(注意, update_time 欄位我加了ON UPDATE CURRENT_TIMESTAMP,資料發生改變就會更新此欄位)

DROP TABLE IF EXISTS `article`;
CREATE TABLE `article` (
  `id` mediumint(8) unsigned NOT NULL AUTO_INCREMENT,
  `subject` varchar(150) NOT NULL,
  `author` varchar(15) DEFAULT NULL,
  `create_time` timestamp NULL DEFAULT NULL,
  `update_time` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP
, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
# 資料 INSERT INTO `article` VALUES ('1', '"閨蜜"崔順實被韓檢方傳喚 韓總統府促徹查真相', 'jam', '2016-10-31 17:49:21', '2016-10-31 17:50:21'); INSERT INTO `article` VALUES ('2', '韓舉行"護國訓練" 青瓦臺:決不許國家安全出問題', 'jam00', '2016-10-31 17:50:39', '2016-10-31 17:50:51'); INSERT INTO `article` VALUES ('3', '媒體稱FBI已經取得搜查令 檢視希拉里電郵', 'tomi', '2016-10-31 17:51:03', '2016-10-31 17:51:08'); INSERT INTO `article` VALUES ('4', '村上春樹獲安徒生獎 演講中談及歐洲排外問題', 'jason', '2016-10-31 17:51:38', '2016-10-31 17:51:41'); INSERT INTO `article` VALUES ('5', '希拉里團隊炮轟FBI 參院民主黨領袖批其“違法”', 'tommy', '2016-10-31 17:52:07', '2016-10-31 17:52:09');

1、資料來源匯入

首先執行全部資料匯入(注:ES 使用的是預設配置)
我們寫一個名叫 mysql-article.sh 的bash指令碼,並放在 /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh 下面,指令碼內容如下(內容註釋會在後面給出)

#執行
/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh 
#檔案內容如下
#!/bin/sh
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "root",
        "password" : "123456",
        "sql" : "select *, id as _id from article",
        "index" : "jdbctest",
        "type" : "article",
        "index_settings" : {
            "analysis" : {
                "analyzer" : {
                    "ik" : {
                        "tokenizer" : "ik"
                    }
                }
            }
        },
        "type_mapping": {
            "article" : {
                "properties" : {
                    "id" : {
                        "type" : "integer",
                        "index" : "not_analyzed"
                    },
                    "subject" : {
                        "type" : "string",
                        "analyzer" : "ik"
                    },
                    "author" : {
                        "type" : "string",
                        "analyzer" : "ik"
                    },
                    "create_time" : {
                        "type" : "date"
                    },
                    "update_time" : {
                        "type" : "date"
                    }
                }
            }
        }
    }
}
' | java \
    -cp "${lib}/*" \
    -Dlog4j.configurationFile=${bin}/log4j2.xml \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporter

執行後會自動建立 jdbctest 索引(若不存在) ,article 型別 和幾個對應的欄位,這裡因為有中文,我使用了 ik 分詞器(如何使用?
若執行失敗,請檢視日誌檔案,jdbc 的日誌存放在 /usr/local/elasticsearch-2.2.0/logs/jdbc.log
檢視是否匯入成功

curl -XGET 'http://localhost:9200/jdbctest/article/_search?pretty'
#返回
{
  "took" : 33,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 1.0,
    "hits" : [ {
      "_index" : "jdbctest",
      "_type" : "article",
      "_id" : "5",
      "_score" : 1.0,
      "_source" : {
        "id" : 5,
        "subject" : "希拉里團隊炮轟FBI 參院民主黨領袖批其“違法”",
        "author" : "tommy",
        "create_time" : "2016-10-31T17:52:07.000+08:00",
        "update_time" : "2016-10-31T17:52:09.000+08:00"
      }
    }, {
      "_index" : "jdbctest",
      "_type" : "article",
      "_id" : "2",
      "_score" : 1.0,
      "_source" : {
        "id" : 2,
        "subject" : "韓舉行"護國訓練" 青瓦臺:決不許國家安全出問題",
        "author" : "jam00",
        "create_time" : "2016-10-31T17:50:39.000+08:00",
        "update_time" : "2016-10-31T17:50:51.000+08:00"
      }
    }, {
      "_index" : "jdbctest",
      "_type" : "article",
      "_id" : "4",
      "_score" : 1.0,
      "_source" : {
        "id" : 4,
        "subject" : "村上春樹獲安徒生獎 演講中談及歐洲排外問題",
        "author" : "jason",
        "create_time" : "2016-10-31T17:51:38.000+08:00",
        "update_time" : "2016-10-31T17:51:41.000+08:00"
      }
    }, {
      "_index" : "jdbctest",
      "_type" : "article",
      "_id" : "1",
      "_score" : 1.0,
      "_source" : {
        "id" : 1,
        "subject" : ""閨蜜"崔順實被韓檢方傳喚 韓總統府促徹查真相",
        "author" : "jam",
        "create_time" : "2016-10-31T17:49:21.000+08:00",
        "update_time" : "2016-10-31T17:50:21.000+08:00"
      }
    }, {
      "_index" : "jdbctest",
      "_type" : "article",
      "_id" : "3",
      "_score" : 1.0,
      "_source" : {
        "id" : 3,
        "subject" : "媒體稱FBI已經取得搜查令 檢視希拉里電郵",
        "author" : "tomi",
        "create_time" : "2016-10-31T17:51:03.000+08:00",
        "update_time" : "2016-10-31T17:51:08.000+08:00"
      }
    } ]
  }
}

內容已成功匯入到 Elasticsearch 中

2、增量索引、更新

如果我們對資料做了更改或是有新資料加入,若再執行全部匯入,就有點得不償失了
這裡我們就要用到jdbc 的兩個屬性 statefile(狀態檔案) 和 schedule(計劃任務時間),並且 sql 語句也要改成動態的
改動如下

"statefile" : "statefile-article.json",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : [
    {
        "statement" : "select *, id as _id from article where update_time > ?",
        "parameter" : [ "$metrics.lastexecutionstart" ]
    }
],

改動後的完整檔案 mysql-article.sh

#!/bin/sh
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        "url" : "jdbc:mysql://localhost:3306/test",
        "user" : "root",
        "password" : "123456",
        "statefile" : "statefile-article.json",
        "schedule" : "0 0-59 0-23 ? * *",
        "sql" : [
            {
                "statement" : "select *, id as _id from article where update_time > ?",
                "parameter" : [ "$metrics.lastexecutionstart" ]
            }
        ],
        "index" : "jdbctest",
        "type" : "article",
        "index_settings" : {
            "analysis" : {
            "analyzer" : {
                "ik" : {
                    "tokenizer" : "ik"
                }
            }
        }
        },
        "type_mapping": {
            "article" : {
                "properties" : {
                    "id" : {
                        "type" : "integer",
                        "index" : "not_analyzed"
                    },
                    "subject" : {
                        "type" : "string",
                        "analyzer" : "ik"
                    },
                    "author" : {
                        "type" : "string",
                        "analyzer" : "ik"
                    },
                    "create_time" : {
                        "type" : "date"
                    },
                    "update_time" : {
                        "type" : "date"
                    }
                }
            }
        }
    }
}
' | java \
    -cp "${lib}/*" \
    -Dlog4j.configurationFile=${bin}/log4j2.xml \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporter

執行該檔案 :/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh
可以看到 命令列端 被佔用,一直在執行,並且在 mysql-article.sh 的同級目錄下生成了一個 statefile-article.json 的檔案,sql 語句中需要的資料 lastexecutionstart 就儲存在該檔案中
現在我們來改動一下MySQL 中的資料,增加一條資料,並修改一條 id 等於 5 的資料

INSERT INTO article() VALUES(NULL,'測試JDBC','jam00','2016-11-01 13:34:15','2016-11-01 13:34:15');
UPDATE article SET `subject`='測試JDBC-改動' WHERE id=5;

最多等一分鐘,再看看ES 中的資料

curl -XGET 'http://localhost:9200/jdbctest/article/_search?pretty' -d '{
    "sort": { 
        "id": { "order": "desc" }
    }
}'
# 返回 
...
"hits" : [ {
      "_index" : "jdbctest",
      "_type" : "article",
      "_id" : "6",
      "_score" : null,
      "_source" : {
        "id" : 6,
        "subject" : "測試JDBC",
        "author" : "jam00",
        "create_time" : "2016-11-01T13:34:15.000+08:00",
        "update_time" : "2016-11-01T13:34:15.000+08:00"
      },
      "sort" : [ 6 ]
    }, {
      "_index" : "jdbctest",
      "_type" : "article",
      "_id" : "5",
      "_score" : null,
      "_source" : {
        "id" : 5,
        "subject" : "測試JDBC-改動",
        "author" : "tommy",
        "create_time" : "2016-10-31T17:52:07.000+08:00",
        "update_time" : "2016-11-01T13:35:41.000+08:00"
      },
      "sort" : [ 5 ]
    }
...

測試成功。
為了讓 mysql-article.sh 後臺執行,我們可以使用 nohup 命令

nohup /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh &

當我們想停止執行的時候。

ps aux |grep jdbc2.2
#返回
root     26118  0.0  0.1 106092  1212 pts/0    S    14:03   0:00 /bin/sh /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/mysql-article.sh
root     26123 11.0  4.4 1079192 44932 pts/0   Sl   14:03   0:00 java -cp /usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../lib/* -Dlog4j.configurationFile=/usr/local/elasticsearch-2.2.0/jdbc2.2/bin/../bin/log4j2.xml org.xbib.tools.Runner org.xbib.tools.JDBCImporter

# 使用 kill 命令關閉程序, 26123 就是上面一句返回的程序號,不用殺掉 26118 ,殺掉26123 這個程序,26118 程序會自動關閉
kill -9 26123 

至此,MySQL 資料來源的 增量索引和更新就完成了。

3、bash 檔案釋義

增量索引的bash檔案註釋如下,更多詳細配置請查閱官方文件

#!/bin/sh
# 當前指令碼的絕對路徑
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib

echo '
{
    "type" : "jdbc",
    "jdbc" : {
        # 連結 mysql 的 test 資料庫
        "url" : "jdbc:mysql://localhost:3306/test",
        # mysql 使用者
        "user" : "root",
        # mysql 密碼
        "password" : "123456",
        # 計劃任務狀態檔案
        "statefile" : "statefile-article.json",
        # 計劃任務時間 這裡是每分鐘執行一次
        "schedule" : "0 0-59 0-23 ? * *",
        # 執行匯入的sql 語句
        "sql" : [
            {
                "statement" : "select *, id as _id from article where update_time > ?",
                "parameter" : [ "$metrics.lastexecutionstart" ]
            }
        ],
        # 索引名稱 jdbctest
        "index" : "jdbctest",
        # 型別名稱 article
        "type" : "article",
        # 型別設定
        "index_settings" : {
            "analysis" : {
            "analyzer" : {
                "ik" : {
                        # 涉及到中文使用ik 分詞
                    "tokenizer" : "ik"
                }
            }
        }
        },
        # 型別中的欄位對映
        "type_mapping": {
            # 型別名稱
            "article" : {
                "properties" : {
                    # 對應的欄位
                    "id" : {
                        # 欄位型別
                        "type" : "integer",
                        # 當成一個準確的值進行索引(全匹配)
                        "index" : "not_analyzed"
                    },
                    "subject" : {
                        "type" : "string",
                        "analyzer" : "ik"
                    },
                    "author" : {
                        "type" : "string",
                        "analyzer" : "ik"
                    },
                    "create_time" : {
                        "type" : "date"
                    },
                    "update_time" : {
                        "type" : "date"
                    }
                }
            }
        }
    }
}
' | java \
    -cp "${lib}/*" \
    -Dlog4j.configurationFile=${bin}/log4j2.xml \
    org.xbib.tools.Runner \
    org.xbib.tools.JDBCImporter

這裡選幾個屬性來介紹一下
url:資料庫連結串,所以把這個連結串改成其它資料來源,這個指令碼也可以使用(前提是那個資料來源中有對應的 article 表)
statefile :計劃任務狀態檔名稱。它長這樣:

{
  "type" : "jdbc",
  "jdbc" : {
    "index_settings" : {
      "analysis" : {
        "analyzer" : {
          "ik" : {
            "tokenizer" : "ik"
          }
        }
      }
    },
    "index" : "jdbctest",
    "schedule" : "0 0-59 0-23 ? * *",
    "sql" : [ {
      "statement" : "select *, id as _id from article where update_time > ?",
      "parameter" : [ "$metrics.lastexecutionstart" ]
    } ],
    "metrics" : {
      "lastexecutionend" : "2016-11-01T06:01:01.441Z",
      "lastexecutionstart" : "2016-11-01T06:01:01.125Z",
      "counter" : "23"
    },
    "type" : "article",
    "statefile" : "statefile-article.json",
    "user" : "root",
    "password" : "123456",
    "url" : "jdbc:mysql://localhost:3306/test",
    "type_mapping" : {
      "article" : {
        "properties" : {
          "create_time" : {
            "type" : "date"
          },
          "id" : {
            "type" : "integer",
            "index" : "not_analyzed"
          },
          "author" : {
            "type" : "string",
            "analyzer" : "ik"
          },
          "update_time" : {
            "type" : "date"
          },
          "subject" : {
            "type" : "string",
            "analyzer" : "ik"
          }
        }
      }
    }
  }
}

其實 jdbc 每次執行的就是這個檔案,執行完成後就覆蓋此檔案,改變的只是 metrics 屬性內的時間,而 lastexecutionstart 這個時間就是我們下面 sql 語句要用到的最後更新時間
schedule : 計劃任務時間表。表示多久執行一次更新。下面有幾個例子
0 0-59 0-23 ? * *:每分鐘執行一次
0 0/5 0-23 ? * * :每五分鐘執行一次;當分鐘等於 0,5,10,15…55的時候執行
我還是貼一個官方的欄位描述

欄位名稱 允許的值 允許的特殊字元
Seconds 0-59 , - * /
Minutes 0-59 , - * /
Hours 0-23 , - * /
Day-of-month 1-31 , - * ? / L W
Month 1-12 or JAN-DEC , - * /
Day-of-Week 1-7 or SUN-SAT , - * ? / L #
Year (Optional) empty, 1970-2199 , - * /

詳細註釋請點選檢視

sql:支援兩種方式,一種是直接寫sql語句,一種是有條件的sql語句。一般我們會在sql語句中使用”field as _id “這樣的方式來指定這條資料在ES 中的唯一標識(field欄位為唯一標識)
parameter 屬性中的可選的動態引數有

$now - the current timestamp
$state - the state, one of: BEFORE_FETCH, FETCH, AFTER_FETCH, IDLE, EXCEPTION
$metrics.counter - a counter
$lastrowcount - number of rows from last statement
$lastexceptiondate - SQL timestamp of last exception
$lastexception - full stack trace of last exception
$metrics.lastexecutionstart - SQL timestamp of the time when last execution started
$metrics.lastexecutionend - SQL timestamp of the time when last execution ended
$metrics.totalrows - total number of rows fetched
$metrics.totalbytes - total number of bytes fetched
$metrics.failed - total number of failed SQL executions
$metrics.succeeded - total number of succeeded SQL executions

在上面例子中的 sql

select *, id as _id from article where update_time > ?

表示獲取更新時間(update_time)大於 最後執行時間($metrics.lastexecutionstart)的所有資料

其它如 index、type_mapping 之類的屬性就不一一介紹了,很容易理解

趕快動手試一下吧!