1. 程式人生 > >Elasticsearch —— bulk批量匯入資料

Elasticsearch —— bulk批量匯入資料

在使用Elasticsearch的時候,一定會遇到這種場景——希望批量的匯入資料,而不是一條一條的手動匯入。那麼此時,就一定會需要bulk命令!
更多內容參考我整理的Elk教程

bulk批量匯入

批量匯入可以合併多個操作,比如index,delete,update,create等等。也可以幫助從一個索引匯入到另一個索引。

語法大致如下;

action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n

需要注意的是,每一條資料都由兩行構成(delete除外),其他的命令比如index和create都是由元資訊行和資料行組成,update比較特殊它的資料行可能是doc也可能是upsert或者script,如果不瞭解的朋友可以參考前面的update的翻譯。

注意,每一行都是通過\n回車符來判斷結束,因此如果你自己定義了json,千萬不要使用回車符。不然_bulk命令會報錯的!

一個小例子

比如我們現在有這樣一個檔案,data.json:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }

它的第一行定義了_index,_type,_id等資訊;第二行定義了欄位的資訊。

然後執行命令:

curl -XPOST localhost:9200/_bulk --data-binary @data.json

就可以看到已經匯入進去資料了。

對於其他的index,delete,create,update等操作也可以參考下面的格式:

{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "type1", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "type1", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field2" : "value2"} }

在Url中設定預設的index和type

如果在路徑中設定了index或者type,那麼在JSON中就不需要設定了。如果在JSON中設定,會覆蓋掉路徑中的配置。

比如上面的例子中,檔案中定義了索引為test,型別為type1;而我們在路徑中定義了預設的選項,索引為test333,型別為type333。執行命令後,發現檔案中的配置會覆蓋掉路徑中的配置。這樣也提供了統一的預設配置以及個性化的特殊配置的需求。

其他

由於bulk是一次性提交很多的命令,它會把這些資料都發送到一個節點,然後這個節點解析元資料(index或者type或者id之類的),然後分發給其他的節點的分片,進行操作。

由於很多命令執行後,統一的返回結果,因此資料量可能會比較大。這個時候如果使用的是chunk編碼的方式,分段進行傳輸,可能會造成一定的延遲。因此還是對條件在客戶端進行一定的緩衝,雖然bulk提供了批處理的方法,但是也不能給太大的壓力!

最後要說一點的是,Bulk中的操作執行成功與否是不影響其他的操作的。而且也沒有具體的引數統計,一次bulk操作,有多少成功多少失敗。

擴充套件:在Logstash中,傳輸的機制其實就是bulk,只是他使用了Buffer,如果是伺服器造成的訪問延遲可能會採取重傳,其他的失敗就只丟棄了....