Elasticsearch 兩個叢集之間資料匯入匯出
阿新 • • 發佈:2018-12-07
直接上程式碼
官網地址
package net.abc; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.io.IOException; import java.net.InetAddress; import java.util.Collections; /** */ public class Es2OtherEs { public static void main( String[] args ) { RestClient restClient =null; TransportClient destClient =null; BulkProcessor bulkProcessor=null; try{ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "pass1")); restClient= RestClient.builder(new HttpHost("host1", 9200)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }) .build(); BasicCredentialsProvider destCredentialsProvider = new BasicCredentialsProvider(); destCredentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("","")); HttpEntity entity = new NStringEntity("{\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " },\n" + " \"size\": 1000\n" + "}", ContentType.APPLICATION_JSON); long start = System.currentTimeMillis(); Response searchResponse = restClient.performRequest("POST", "/so_blog/_search?scroll=1m", Collections.<String, String>emptyMap(),entity); searchResponse.getEntity(); long interval = System.currentTimeMillis() -start; System.out.println("查詢用時:" + interval + "毫秒"); JSONObject mapTypes = JSON.parseObject(EntityUtils.toString(searchResponse.getEntity())); String scroll_id = mapTypes.getString("_scroll_id"); Settings settings = Settings.builder() .put("cluster.name", "dm-test") .put("client.transport.sniff", true) .build(); destClient = new PreBuiltTransportClient(settings); destClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); bulkProcessor=BulkProcessor.builder(destClient, new BulkProcessor.Listener() { public void beforeBulk(long l, BulkRequest bulkRequest) { } public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) { } public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { } }).setBulkActions(5000) //<5> .setBulkSize(new ByteSizeValue(500, ByteSizeUnit.MB)) //<6> .setFlushInterval(TimeValue.timeValueSeconds(5)) //<7> .setConcurrentRequests(1) //<8> .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build(); while (scroll_id != null){ JSONObject searchHits = JSON.parseObject(mapTypes.get("hits").toString()); JSONArray hits = JSON.parseArray(searchHits.get("hits").toString()); if (hits.size()<1){ break; } // BulkRequestBuilder bulkRequest = destClient.prepareBulk(); for(int i= 0 ; i< hits.size(); i++){ JSONObject source = hits.getJSONObject(i); JSONObject _source= source.getJSONObject("_source"); IndexRequest indexRequest = new IndexRequest("so_blog_20181130", "blog","blog_" + _source.getString("id")); JSONObject jsonObject=new JSONObject(); jsonObject.put("baidu_words",_source.getString("baidu_words")); jsonObject.put("blogid",_source.getString("blogid")); jsonObject.put("bury",_source.getString("bury")); jsonObject.put("channelid",_source.getString("channelid")); jsonObject.put("commentauth",_source.getString("commentauth")); jsonObject.put("commentcount",_source.getString("commentcount")); jsonObject.put("created_at",_source.getString("created_at")); jsonObject.put("description",_source.getString("description")); jsonObject.put("digg",_source.getString("digg")); jsonObject.put("edit_time",_source.getString("edit_time")); jsonObject.put("filename",_source.getString("filename")); jsonObject.put("id",_source.getString("id")); jsonObject.put("ip",_source.getString("ip")); jsonObject.put("istop",_source.getString("istop")); jsonObject.put("level",_source.getString("level")); jsonObject.put("note",_source.getString("note")); jsonObject.put("outlinkcount",_source.getString("outlinkcount")); jsonObject.put("status",_source.getString("status")); jsonObject.put("title",_source.getString("title")); jsonObject.put("type",_source.getString("type")); jsonObject.put("typecopy",_source.getString("typecopy")); jsonObject.put("user_name",_source.getString("user_name")); jsonObject.put("viewcount",_source.getString("viewcount")); indexRequest.source(jsonObject.toString(), XContentType.JSON); bulkProcessor.add(indexRequest); // HttpEntity httpEntity=new StringEntity(jsonObject.toString(), ContentType.APPLICATION_JSON); // Response response = destClient.performRequest("PUT", "/so_blog_20181131/blog/" + "blog_" + _source.getString("id"), Collections.<String, String>emptyMap(), httpEntity); // System.out.println(response); } // bulkProcessor.close(); // BulkResponse bulkItemResponses = bulkRequest.execute().actionGet(); // if(bulkItemResponses.hasFailures()){ // System.out.println("hasFail:"+bulkItemResponses.hasFailures()+"\t status:"+bulkItemResponses.status()+"\t message:"+bulkItemResponses.buildFailureMessage()+"\t"); // }else{ // System.out.println("寫入資料條數"+(count++)); // } Thread.sleep(10); // BulkResponse bulk = restHighLevelClient.bulk(bulkRequest); entity = new NStringEntity("{\n" + " \"scroll\" : \"1m\", \n" + " \"scroll_id\": \"" + scroll_id +"\" \n" + "}", ContentType.APPLICATION_JSON); // System.out.println(scroll_id); searchResponse = restClient.performRequest("POST", "/_search/scroll", Collections.<String, String>emptyMap(),entity); mapTypes = JSON.parseObject(EntityUtils.toString(searchResponse.getEntity())); scroll_id = mapTypes.getString("_scroll_id"); } }catch (Exception e){ e.printStackTrace(); }finally { try { if(restClient!=null){ restClient.close(); } if(destClient!=null){ destClient.close(); } if(bulkProcessor!=null){ bulkProcessor.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
方法二:
package net.csdn; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import net.csdn.search.common.EsClientFactory; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.entity.ContentType; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.transport.client.PreBuiltTransportClient; import scala.actors.threadpool.TimeUnit; import java.io.IOException; import java.net.InetAddress; import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** */ public class Es2OtherEs { public static void main( String[] args ) { RestClient restClient =null; TransportClient destClient =null; BulkProcessor bulkProcessor=null; try{ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "p1")); restClient= RestClient.builder(new HttpHost("host1", 9200)) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }) .build(); BasicCredentialsProvider destCredentialsProvider = new BasicCredentialsProvider(); destCredentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("","")); HttpEntity entity = new NStringEntity("{\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " },\n" + // " \"sort\":[\"_doc\"],\n" + " \"size\":1000\n" + "}", ContentType.APPLICATION_JSON); long start = System.currentTimeMillis(); Response searchResponse = restClient.performRequest("POST", "/so_ask_20181204/_search?scroll=1m", Collections.<String, String>emptyMap(),entity); searchResponse.getEntity(); long interval = System.currentTimeMillis() -start; System.out.println("查詢用時:" + interval + "毫秒"); JSONObject mapTypes = JSON.parseObject(EntityUtils.toString(searchResponse.getEntity())); String scroll_id = mapTypes.getString("_scroll_id"); Settings settings = Settings.builder() .put("cluster.name", "dm-test") .put("client.transport.sniff", true) .build(); destClient = new PreBuiltTransportClient(settings); destClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); int count=1; BulkRequestBuilder bulkRequest=destClient.prepareBulk(); while (true){ System.out.println("while次數:"+(count++)); JSONObject searchHits = JSON.parseObject(mapTypes.get("hits").toString()); JSONArray hits = JSON.parseArray(searchHits.get("hits").toString()); System.out.println("hitSize:"+hits.size()+"scrollId:"+scroll_id); if (hits.size()<1){ break; } for(int i= 0 ; i< hits.size(); i++){ JSONObject source = hits.getJSONObject(i); JSONObject _source= source.getJSONObject("_source"); IndexRequest req = destClient.prepareIndex().setIndex("so_ask_20181204").setType("ask") .setId("ask_"+_source.getString("id")).setSource(_source).request(); System.out.println("計數:"+i); bulkRequest.add(req); } BulkResponse bulkItemResponses = bulkRequest.execute().actionGet(); if(bulkItemResponses.hasFailures()){ System.out.println(bulkItemResponses.buildFailureMessage()); } Thread.sleep(10); entity = new NStringEntity("{\n" + " \"scroll\" : \"1m\", \n" + " \"scroll_id\": \"" + scroll_id +"\" \n" + "}", ContentType.APPLICATION_JSON); searchResponse = restClient.performRequest("POST", "/_search/scroll", Collections.<String, String>emptyMap(),entity); mapTypes = JSON.parseObject(EntityUtils.toString(searchResponse.getEntity())); scroll_id = mapTypes.getString("_scroll_id"); System.out.println("scroll_id:"+scroll_id); } }catch (Exception e){ e.printStackTrace(); }finally { try { if(restClient!=null){ restClient.close(); } if(destClient!=null){ destClient.close(); } if(bulkProcessor!=null){ bulkProcessor.close(); } } catch (IOException e) { e.printStackTrace(); } } } }