1. 程式人生 > >Elasticsearch java api(五) Bulk批量索引

Elasticsearch java api(五) Bulk批量索引

這篇部落格介紹一下Elasticsearch對多個文件進行索引的簡便方法。Bulk api的支援可以實現一次請求執行批量的新增、刪除、更新等操作.Bulk操作使用的是UDP協議,UDP無法確保與ElasticSearch伺服器通訊時不丟失資料.

一、Bulk API

使用bulk命令時,REST API以_bulk結尾,批量操作寫在json檔案中,官網給出的語法格式:

action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n

也就是說每一個操作都有2行資料組成,末尾要回車換行。第一行用來說明操作命令和原資料、第二行是自定義的選項.舉個例子,同時執行插入2條資料、刪除一條資料, 新建bulkdata.json,寫入如下內容:

{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "3" }}
{ "title":"title1","posttime":"2016-07-02","content":"內容一" }

{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "4" }}
{ "title":"title2","posttime":"2016-07-03","content":"內容2" }

{ "delete":{"_index" : "blog", "_type" : "article", "_id" : "1" }}

執行:

$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json
{
  "took" : 11,
  "errors" : false,
  "items" : [ {
    "create" : {
      "_index" : "blog",
      "_type" : "article",
      "_id" : "13",
      "_version" : 1,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "failed" : 0
      },
      "status" : 201
    }
  } ]
}

注意:行末要回車換行,不然會因為命令不能識別而出現錯誤.

$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json 
{
  "error" : {
    "root_cause" : [ {
      "type" : "action_request_validation_exception",
      "reason" : "Validation Failed: 1: no requests added;"
    } ],
    "type" : "action_request_validation_exception",
    "reason" : "Validation Failed: 1: no requests added;"
  },
  "status" : 400
}

二、批量匯出

下面的例子是把索引庫中的文件以json格式批量匯出到檔案中,其中叢集名稱為”bropen”,索引庫名為”blog”,type為”article”,專案根目錄下新建files/bulk.txt,索引內容寫入bulk.txt中:

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;

public class ElasticSearchBulkOut {

    public static void main(String[] args) {

        try {
            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml

            Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(
                            InetAddress.getByName("127.0.0.1"), 9300));

            QueryBuilder qb = QueryBuilders.matchAllQuery();
            SearchResponse response = client.prepareSearch("blog")
                    .setTypes("article").setQuery(QueryBuilders.matchAllQuery())
                    .execute().actionGet();
            SearchHits resultHits = response.getHits();

            File article = new File("files/bulk.txt");
            FileWriter fw = new FileWriter(article);
            BufferedWriter bfw = new BufferedWriter(fw);

            if (resultHits.getHits().length == 0) {
                System.out.println("查到0條資料!");

            } else {
                for (int i = 0; i < resultHits.getHits().length; i++) {
                    String jsonStr = resultHits.getHits()[i]
                            .getSourceAsString();
                    System.out.println(jsonStr);
                    bfw.write(jsonStr);
                    bfw.write("\n");
                }
            }
            bfw.close();
            fw.close();

        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

這裡寫圖片描述

三、批量匯入

從剛才匯出的bulk.txt檔案中按行讀取,然後bulk匯入。首先通過呼叫client.prepareBulk()例項化一個BulkRequestBuilder物件,呼叫BulkRequestBuilder物件的add方法新增資料。實現程式碼:

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;

public class ElasticSearchBulkIn {

    public static void main(String[] args) {

        try {

            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置

            Client client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(
                            InetAddress.getByName("127.0.0.1"), 9300));

            File article = new File("files/bulk.txt");
            FileReader fr=new FileReader(article);
            BufferedReader bfr=new BufferedReader(fr);
            String line=null;
            BulkRequestBuilder bulkRequest=client.prepareBulk();
            int count=0;
            while((line=bfr.readLine())!=null){
                bulkRequest.add(client.prepareIndex("test","article").setSource(line));
                if (count%10==0) {
                    bulkRequest.execute().actionGet();
                }
                count++;
                //System.out.println(line);
            }
            bulkRequest.execute().actionGet();

            bfr.close();
            fr.close();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

參考文件:

掃碼向博主提問

中科院碩士_姚攀

部落格專家

熟悉Lucene、ES、ELK