1. 程式人生 > >Elasticsearch 批量匯入資料

Elasticsearch 批量匯入資料

前言

 可以非常方便地進行資料的多維分析,所以大資料分析領域也經常會見到它的身影,生產環境中絕大部分新產生的資料可以通過應用直接匯入,但是歷史或初始資料可能會需要單獨處理,這種情況下可能遇到需要匯入大量資料的情況

這裡簡單分享一下批量匯入資料的操作方法與相關基礎,還有可能會碰到的問題,詳細內容可以參考 官方文件

Tip: 當前的最新版本為 Elasticsearch 2.2.0

概要

bulk API

ES提供了一個叫 bulk 的 API 來進行批量操作

它用來在一個API呼叫中進行大量的索引更新或刪除操作,這極大的提升了操作效率

形式

API

API 可以是 /_bulk, /{index}/_bulk, 或 {index}/{type}/_bulk

 這三種形式,當索引或型別已經指定後,資料檔案中如不明確指定或申明的內容,就會預設使用API中的值

API 以是 /_bulk 結尾的,並且跟上如下形式的 JSON 資料

資料內容格式

action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n

Note: 最後的一行也必須以 \n 結尾

可用方法

可用的操作有 index, create, delete 和 update :

  • index
     和 create 得在操作與元資料(action_and_meta_data)之後另起一行然後接上內容(必須遵循這樣的格式 ,後面會演示不這麼做導致操作失敗的示例)
  • delete 只用接上元資料就可以了,不必接上內容(原因自不用說,定位到文件就OK了)
  • update 得接上要變更的區域性資料,也得另起一行

文字指定

由於是批量操作,所以不太會直接使用命令列的方式手動指定,更多的是使用檔案,如果使用文字檔案,則得遵循如下格式

curl -s -XPOST localhost:9200/_bulk --data-binary "@requests"

Tip: requests 是檔名 , -s 是靜默模式,不產生輸出,也可以使用 > /dev/null

 替代

匯入資料

嘗試不按要求索引資料

[root@es-bulk tmp]# curl localhost:9200/stuff_orders/order_list/903713?pretty
{
  "_index" : "stuff_orders",
  "_type" : "order_list",
  "_id" : "903713",
  "found" : false
}
[root@es-bulk tmp]# cat test.json 
{"index":{"_index":"stuff_orders","_type":"order_list","_id":903713}}{"real_name":"劉備","user_id":48430,"address_province":"上海","address_city":"浦東新區","address_district":null,"address_street":"上海市浦東新區廣蘭路1弄2號345室","price":30.0,"carriage":6.0,"state":"canceled","created_at":"2013-10-24T09:09:28.000Z","payed_at":null,"goods":["營養早餐:火腿麥滿分"],"position":[121.53,31.22],"weight":70.0,"height":172.0,"sex_type":"female","birthday":"1988-01-01"}
[root@es-bulk tmp]# curl -XPOST 'localhost:9200/stuff_orders/_bulk?pretty' --data-binary @test.json
{
  "error" : {
    "root_cause" : [ {
      "type" : "action_request_validation_exception",
      "reason" : "Validation Failed: 1: no requests added;"
    } ],
    "type" : "action_request_validation_exception",
    "reason" : "Validation Failed: 1: no requests added;"
  },
  "status" : 400
}
[root@es-bulk tmp]# curl localhost:9200/stuff_orders/order_list/903713?pretty
{
  "_index" : "stuff_orders",
  "_type" : "order_list",
  "_id" : "903713",
  "found" : false
}
[root@es-bulk tmp]#

產生了報錯,並且資料也的確沒有加成功,原因是在校驗操作請求(action_and_meta_data)時,由於不符合規範,所以報異常

正確匯入方法

解決辦法是將格式糾正過來,加上換行

[root@es-bulk tmp]# vim test.json 
[root@es-bulk tmp]# cat test.json 
{"index":{"_index":"stuff_orders","_type":"order_list","_id":903713}}
{"real_name":"劉備","user_id":48430,"address_province":"上海","address_city":"浦東新區","address_district":null,"address_street":"上海市浦東新區廣蘭路1弄2號345室","price":30.0,"carriage":6.0,"state":"canceled","created_at":"2013-10-24T09:09:28.000Z","payed_at":null,"goods":["營養早餐:火腿麥滿分"],"position":[121.53,31.22],"weight":70.0,"height":172.0,"sex_type":"female","birthday":"1988-01-01"}
[root@es-bulk tmp]# curl -XPOST 'localhost:9200/stuff_orders/_bulk?pretty' --data-binary @test.json
{
  "took" : 36,
  "errors" : false,
  "items" : [ {
    "index" : {
      "_index" : "stuff_orders",
      "_type" : "order_list",
      "_id" : "903713",
      "_version" : 1,
      "_shards" : {
        "total" : 2,
        "successful" : 1,
        "failed" : 0
      },
      "status" : 201
    }
  } ]
}
[root@es-bulk tmp]# curl localhost:9200/stuff_orders/order_list/903713?pretty
{
  "_index" : "stuff_orders",
  "_type" : "order_list",
  "_id" : "903713",
  "_version" : 1,
  "found" : true,
  "_source":{"real_name":"劉備","user_id":48430,"address_province":"上海","address_city":"浦東新區","address_district":null,"address_street":"上海市浦東新區廣蘭路1弄2號345室","price":30.0,"carriage":6.0,"state":"canceled","created_at":"2013-10-24T09:09:28.000Z","payed_at":null,"goods":["營養早餐:火腿麥滿分"],"position":[121.53,31.22],"weight":70.0,"height":172.0,"sex_type":"female","birthday":"1988-01-01"}
}
[root@es-bulk tmp]# 

Tip: 當資料量極大時,這樣一個個改肯定不方便,這時可以使用sed指令碼,能很方便的進行批量修改

[[email protected] summary]# sed -ir  's/[}][}][{]/\}\}\n\{/' jjjj.json 
[[email protected] summary]# less jjjj.json

其實就是匹配到合適的地方加上一個換行

記憶體不足

基本上只要遵循前面的操作方式,理想情況下都會很順利地將資料匯入ES,但是實現環境中,總會有各種意外,我就遇到了其中一種:記憶體不足

[root@es-bulk tmp]# time curl -XPOST 'localhost:9200/stuff_orders/_bulk?pretty' --data-binary @es_data.json > /dev/null
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
 38  265M    0     0   38  102M      0  43.8M  0:00:06  0:00:02  0:00:04 43.9M
curl: (56) Failure when receiving data from the peer

real	0m5.351s
user	0m0.161s
sys	0m0.919s
[root@es-bulk tmp]#

當時百思不得其解,已經反覆確認了資料格式無誤,並且隨機選取其中一些進行匯入測試也沒發現問題,但只要整體一導就出問題,而且每次都一樣

[root@es-bulk tmp]# free -m 
             total       used       free     shared    buffers     cached
Mem:          3949       3548        400          0          1        196
-/+ buffers/cache:       3349        599
Swap:         3951        237       3714
[root@es-bulk tmp]#

系統記憶體明明還有多餘,但是再看到JAVA記憶體時,就隱約感覺到了原因

[root@es-bulk tmp]# ps faux | grep elas
root     14479  0.0  0.0 103252   816 pts/1    S+   16:05   0:00          \_ grep elas
495      19045  0.2 25.6 3646816 1036220 ?     Sl   Mar07  25:45 /usr/bin/java -Xms256m -Xmx1g -Djava.awt.headless=true -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError -XX:+DisableExplicitGC -Dfile.encoding=UTF-8 -Djna.nosys=true -Des.path.home=/usr/share/elasticsearch -cp /usr/share/elasticsearch/lib/elasticsearch-2.1.1.jar:/usr/share/elasticsearch/lib/* org.elasticsearch.bootstrap.Elasticsearch start -p /var/run/elasticsearch/elasticsearch.pid -d -Des.default.path.home=/usr/share/elasticsearch -Des.default.path.logs=/var/log/elasticsearch -Des.default.path.data=/var/lib/elasticsearch -Des.default.path.conf=/etc/elasticsearch
[root@es-bulk tmp]#

ES和lucene是使用的JAVA,JAVA的記憶體分配大小決定了它們的發揮空間,這裡的初始記憶體為 256M ,這也是大多數情況下的預設配置,但是應對當前的實際資料大小 265M 時就不夠了,雖然官方說會盡量減小使用buffer,但實測下來,系統應該會是首先儘量使用記憶體,通過匯入記憶體的方式來起到顯著加速的效果,但是記憶體不夠時,就直接報錯退出了

解決記憶體不足有兩種思路:

  • 1.調整 Xms 和 Xmx 引數,使其適應業務需求,然後重啟服務使之生效
  • 2.將原來的資料切小,分批匯入

第一種方式,要求停應用和業務,在某些情況下是不具備條件的(得統一協調時間視窗),那麼就嘗試使用第二種方式,好在text文件的切分也可以使用sed快速完成

[root@es-bulk tmp]# sed  -rn '1,250000p' es_data.json  > es_data1.json
[root@es-bulk tmp]# sed  -rn '250001,500000p' es_data.json  > es_data2.json
[root@es-bulk tmp]# sed  -rn '500001,750000p' es_data.json  > es_data3.json
[root@es-bulk tmp]# sed  -rn '750001,943210p' es_data.json  > es_data4.json
[root@es-bulk tmp]# 
[root@es-bulk tmp]# du -sh es_data*.json
71M	es_data1.json
68M	es_data2.json
71M	es_data3.json
58M	es_data4.json
266M	es_data.json
[root@es-bulk tmp]#
[root@es-bulk tmp]# tail es_data1.json
...
...
[root@es-bulk tmp]# tail es_data2.json
...
...

再依次進行匯入,就發現沒問題了

[root@es-bulk tmp]# time curl -XPOST 'localhost:9200/stuff_orders/_bulk?pretty' --data-binary @es_data1.json > /dev/null
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  101M  100 30.6M  100 70.3M   981k  2253k  0:00:31  0:00:31 --:--:--     0

real	0m33.308s
user	0m0.100s
sys	0m0.390s
[root@es-bulk tmp]#

命令彙總

  • curl -XPOST 'localhost:9200/stuff_orders/_bulk?pretty' --data-binary @test.json
  • curl localhost:9200/stuff_orders/order_list/903713?pretty
  • sed -ir 's/[}][}][{]/\}\}\n\{/' jjjj.json
  • less jjjj.json
  • time curl -XPOST 'localhost:9200/stuff_orders/_bulk?pretty' --data-binary @es_data.json > /dev/null
  • free -m
  • ps faux | grep elas
  • sed -rn '1,250000p' es_data.json > es_data1.json
  • sed -rn '250001,500000p' es_data.json > es_data2.json
  • sed -rn '500001,750000p' es_data.json > es_data3.json
  • sed -rn '750001,943210p' es_data.json > es_data4.json
  • du -sh es_data*.json
  • tail es_data1.json
  • time curl -XPOST 'localhost:9200/stuff_orders/_bulk?pretty' --data-binary @es_data1.json > /dev/null
原文地址 http://soft.dog/2016/03/15/elasticsearch-bulk/