1. 程式人生 > >Elasticsearch 兩個叢集之間資料匯入匯出

Elasticsearch 兩個叢集之間資料匯入匯出

直接上程式碼
官網地址

參考地址

package net.abc;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.transport.client.PreBuiltTransportClient;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;

/**
 */
public class Es2OtherEs
{
    public static void main( String[] args )
    {
        RestClient restClient =null;
        TransportClient destClient =null;
        BulkProcessor bulkProcessor=null;
        try{
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                    new UsernamePasswordCredentials("elastic", "pass1"));

            restClient= RestClient.builder(new HttpHost("host1", 9200))
                    .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                            return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                        }
                    })
                    .build();

            BasicCredentialsProvider destCredentialsProvider = new BasicCredentialsProvider();

            destCredentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("",""));
            HttpEntity entity = new NStringEntity("{\n" +
                    "  \"query\": {\n" +
                    "    \"match_all\": {}\n" +
                    "  },\n" +
                    "  \"size\": 1000\n" +
                    "}", ContentType.APPLICATION_JSON);
            long start = System.currentTimeMillis();
            Response searchResponse = restClient.performRequest("POST", "/so_blog/_search?scroll=1m",
                    Collections.<String, String>emptyMap(),entity);
            searchResponse.getEntity();
            long interval = System.currentTimeMillis() -start;
            System.out.println("查詢用時:" + interval  + "毫秒");
            JSONObject mapTypes = JSON.parseObject(EntityUtils.toString(searchResponse.getEntity()));

            String scroll_id =  mapTypes.getString("_scroll_id");
            Settings settings = Settings.builder()
                    .put("cluster.name", "dm-test")
            .put("client.transport.sniff", true)
                    .build();
            destClient = new PreBuiltTransportClient(settings);
            destClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
             bulkProcessor=BulkProcessor.builder(destClient, new BulkProcessor.Listener() {
                public void beforeBulk(long l, BulkRequest bulkRequest) {
                }

                public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {

                }

                public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {

                }
            }).setBulkActions(5000) //<5>
                    .setBulkSize(new ByteSizeValue(500, ByteSizeUnit.MB)) //<6>
                    .setFlushInterval(TimeValue.timeValueSeconds(5)) //<7>
                    .setConcurrentRequests(1) //<8>
                    .setBackoffPolicy(
                            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .build();
            while (scroll_id != null){
                JSONObject searchHits = JSON.parseObject(mapTypes.get("hits").toString());

                JSONArray hits = JSON.parseArray(searchHits.get("hits").toString());
                if (hits.size()<1){
                    break;
                }
//                BulkRequestBuilder bulkRequest = destClient.prepareBulk();

                for(int i= 0 ; i< hits.size(); i++){
                    JSONObject source =  hits.getJSONObject(i);
                    JSONObject _source=  source.getJSONObject("_source");
                    IndexRequest indexRequest = new IndexRequest("so_blog_20181130", "blog","blog_" + _source.getString("id"));
                    JSONObject jsonObject=new JSONObject();
                    jsonObject.put("baidu_words",_source.getString("baidu_words"));
                    jsonObject.put("blogid",_source.getString("blogid"));
                    jsonObject.put("bury",_source.getString("bury"));
                    jsonObject.put("channelid",_source.getString("channelid"));
                    jsonObject.put("commentauth",_source.getString("commentauth"));
                    jsonObject.put("commentcount",_source.getString("commentcount"));
                    jsonObject.put("created_at",_source.getString("created_at"));
                    jsonObject.put("description",_source.getString("description"));
                    jsonObject.put("digg",_source.getString("digg"));
                    jsonObject.put("edit_time",_source.getString("edit_time"));
                    jsonObject.put("filename",_source.getString("filename"));
                    jsonObject.put("id",_source.getString("id"));
                    jsonObject.put("ip",_source.getString("ip"));
                    jsonObject.put("istop",_source.getString("istop"));
                    jsonObject.put("level",_source.getString("level"));
                    jsonObject.put("note",_source.getString("note"));
                    jsonObject.put("outlinkcount",_source.getString("outlinkcount"));
                    jsonObject.put("status",_source.getString("status"));
                    jsonObject.put("title",_source.getString("title"));
                    jsonObject.put("type",_source.getString("type"));
                    jsonObject.put("typecopy",_source.getString("typecopy"));
                    jsonObject.put("user_name",_source.getString("user_name"));
                    jsonObject.put("viewcount",_source.getString("viewcount"));
                    indexRequest.source(jsonObject.toString(), XContentType.JSON);
                    bulkProcessor.add(indexRequest);
//                    HttpEntity httpEntity=new StringEntity(jsonObject.toString(), ContentType.APPLICATION_JSON);
//                    Response response = destClient.performRequest("PUT", "/so_blog_20181131/blog/" + "blog_" + _source.getString("id"), Collections.<String, String>emptyMap(), httpEntity);
//                    System.out.println(response);
                }

//                bulkProcessor.close();
//                BulkResponse bulkItemResponses = bulkRequest.execute().actionGet();
//                if(bulkItemResponses.hasFailures()){
//                    System.out.println("hasFail:"+bulkItemResponses.hasFailures()+"\t status:"+bulkItemResponses.status()+"\t message:"+bulkItemResponses.buildFailureMessage()+"\t");
//                }else{
//                    System.out.println("寫入資料條數"+(count++));
//                }
                Thread.sleep(10);
//                BulkResponse bulk = restHighLevelClient.bulk(bulkRequest);
                entity = new NStringEntity("{\n" +
                        "    \"scroll\" : \"1m\", \n" +
                        "    \"scroll_id\": \"" + scroll_id +"\" \n" +
                        "}", ContentType.APPLICATION_JSON);

//                System.out.println(scroll_id);
                searchResponse = restClient.performRequest("POST", "/_search/scroll",
                        Collections.<String, String>emptyMap(),entity);
                mapTypes = JSON.parseObject(EntityUtils.toString(searchResponse.getEntity()));
                scroll_id =  mapTypes.getString("_scroll_id");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                if(restClient!=null){
                    restClient.close();
                }
                if(destClient!=null){
                    destClient.close();
                }
                if(bulkProcessor!=null){
                    bulkProcessor.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}

方法二:

package net.csdn;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import net.csdn.search.common.EsClientFactory;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import scala.actors.threadpool.TimeUnit;

import java.io.IOException;
import java.net.InetAddress;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 */
public class Es2OtherEs
{
    public static void main( String[] args )
    {
        RestClient restClient =null;
        TransportClient destClient =null;
        BulkProcessor bulkProcessor=null;
        try{
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY,
                    new UsernamePasswordCredentials("elastic", "p1"));

            restClient= RestClient.builder(new HttpHost("host1", 9200))
                    .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                            return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                        }
                    })
                    .build();

            BasicCredentialsProvider destCredentialsProvider = new BasicCredentialsProvider();

            destCredentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("",""));
            HttpEntity entity = new NStringEntity("{\n" +
                    "  \"query\": {\n" +
                    "    \"match_all\": {}\n" +
                    "  },\n" +
//                    "  \"sort\":[\"_doc\"],\n" +
                    "  \"size\":1000\n" +
                    "}", ContentType.APPLICATION_JSON);
            long start = System.currentTimeMillis();
            Response searchResponse = restClient.performRequest("POST", "/so_ask_20181204/_search?scroll=1m",
                    Collections.<String, String>emptyMap(),entity);
            searchResponse.getEntity();
            long interval = System.currentTimeMillis() -start;
            System.out.println("查詢用時:" + interval  + "毫秒");
            JSONObject mapTypes = JSON.parseObject(EntityUtils.toString(searchResponse.getEntity()));

            String scroll_id =  mapTypes.getString("_scroll_id");
            Settings settings = Settings.builder()
                    .put("cluster.name", "dm-test")
            .put("client.transport.sniff", true)
                    .build();
            destClient = new PreBuiltTransportClient(settings);
            destClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
             int count=1;
            BulkRequestBuilder bulkRequest=destClient.prepareBulk();
            while (true){
                System.out.println("while次數:"+(count++));
                JSONObject searchHits = JSON.parseObject(mapTypes.get("hits").toString());

                JSONArray hits = JSON.parseArray(searchHits.get("hits").toString());
                System.out.println("hitSize:"+hits.size()+"scrollId:"+scroll_id);
                if (hits.size()<1){
                    break;
                }
                for(int i= 0 ; i< hits.size(); i++){
                    JSONObject source =  hits.getJSONObject(i);
                    JSONObject _source=  source.getJSONObject("_source");
                    IndexRequest req = destClient.prepareIndex().setIndex("so_ask_20181204").setType("ask")
                            .setId("ask_"+_source.getString("id")).setSource(_source).request();
                    System.out.println("計數:"+i);
                    bulkRequest.add(req);
                }
                BulkResponse bulkItemResponses = bulkRequest.execute().actionGet();
                if(bulkItemResponses.hasFailures()){
                    System.out.println(bulkItemResponses.buildFailureMessage());
                }
                Thread.sleep(10);
                entity = new NStringEntity("{\n" +
                        "    \"scroll\" : \"1m\", \n" +
                        "    \"scroll_id\": \"" + scroll_id +"\" \n" +
                        "}", ContentType.APPLICATION_JSON);
                searchResponse = restClient.performRequest("POST", "/_search/scroll",
                        Collections.<String, String>emptyMap(),entity);
                mapTypes = JSON.parseObject(EntityUtils.toString(searchResponse.getEntity()));
                scroll_id =  mapTypes.getString("_scroll_id");
                System.out.println("scroll_id:"+scroll_id);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            try {
                if(restClient!=null){
                    restClient.close();
                }
                if(destClient!=null){
                    destClient.close();
                }
                if(bulkProcessor!=null){
                    bulkProcessor.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

}