1. 程式人生 > >Elasticsearch java api操作

Elasticsearch java api操作

iter cti desc 導出 plain request bubuko tty net

版本:

Elasticsearch 6.3.1

pom文件:

 <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.3
.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7
</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>0.9</version> </dependency>

一、構建elasicsearch client工具類

import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;

/** * @Author: xiaolaotou * @Date: 2019/4/19 */ /** * 構建elasticsrarch client */ public class ClientUtil { private static TransportClient client; public TransportClient CreateClient() throws Exception { // 先構建client System.out.println("11111111111"); Settings settings=Settings.builder() .put("cluster.name","elasticsearch1") .put("client.transport.ignore_cluster_name", true) //如果集群名不對,也能連接 .build(); //創建Client TransportClient client = new PreBuiltTransportClient(settings) .addTransportAddress( new TransportAddress( InetAddress.getByName( "192.168.200.100"), 9300)); return client; } }

二、測試類

import net.sf.json.JSONObject;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHits;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;


/**
 * @Author: xiaolaotou
 * @Date: 2019/4/19
 * ElasticSearch 6.3.1
 */
public class Test {

    private static TransportClient client;

    static {
        try {
            client = new ClientUtil().CreateClient();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {


        //創建索引
//        createEmployee();
        //根據inde,type,id查詢一個document的data
//        FindIndex();
//        CreateJsonIndex();
        //批量導入
//        BulkCreateIndex();

        //批量導出
//        OutData();
        //創建帶ik分詞的index
//        CreateIndexIkTest();

        //更新索引
//        UpdateIndex();
//        createIndex2();
//        Search();
          get();
    }

    /**
     * 創建索引,普通格式
     *
     * @throws Exception
     */
    public static void createEmployee() throws Exception {
        IndexResponse response = client.prepareIndex("student", "doc", "1")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("name", "jack")
                        .field("age", 27)
                        .field("position", "technique")
                        .field("country", "china")
                        .field("join_date", "2017-01-01")
                        .field("salary", 10000)
                        .endObject())
                .get();
        System.out.println("創建成功!");
    }
技術分享圖片

/**
     * 根據 index ,type,id查詢
     *
     * @throws Exception
     */
    public static void FindIndex() throws Exception {
        GetResponse getResponse = client.prepareGet("student", "doc", "1").get();
        System.out.println(getResponse.getSourceAsString());
    }
技術分享圖片
/**
     * 創建索引,JSON
     *
     * @throws IOException
     */
    public static void CreateJsonIndex() throws IOException {
        JSONObject json = new JSONObject();
        json.put("user", "小明");
        json.put("title", "Java Engineer");
        json.put("desc", "web 開發");
        IndexResponse response = client.prepareIndex("studentjson", "doc", "1")
                .setSource(json, XContentType.JSON)
                .get();
        String _index = response.getIndex();
        System.out.println(_index);
    }
技術分享圖片
/**
     * elasticsearch批量導入
     */
    public static void BulkCreateIndex() {
        BulkRequestBuilder builder = client.prepareBulk();
        for (int i = 0; i < 100000; i++) {
            HashMap<String, Object> map = new HashMap<>();
            map.put("recordtime", "11");
            map.put("area", "22");
            map.put("usertype", "33");
            map.put("count", 44);
            builder.add(client.prepareIndex("bulktest", "1").setSource(map));
            //每10000條提交一次
            if (i % 10000 == 0) {
                builder.execute().actionGet();
                builder = client.prepareBulk();
            }
        }
    }
技術分享圖片
/**
     * 批量導出
     */
    public static void OutData() throws IOException {
        SearchResponse response = client.prepareSearch("bulktest").setTypes("1")
                .setQuery(QueryBuilders.matchAllQuery())
                .setSize(10000).setScroll(new TimeValue(600000))
                .setSearchType(SearchType.DEFAULT).execute().actionGet();
        // setScroll(new TimeValue(600000)) 設置滾動的時間
        String scrollid = response.getScrollId();
        //把導出的結果以JSON的格式寫到文件裏

        //每次返回數據10000條。一直循環查詢知道所有的數據都被查詢出來
        while (true) {
            SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000))
                    .execute().actionGet();
            SearchHits searchHit = response2.getHits();
            //再次查詢不到數據時跳出循環
            if (searchHit.getHits().length == 0) {
                break;
            }
            System.out.println("查詢數量 :" + searchHit.getHits().length);
            for (int i = 0; i < searchHit.getHits().length; i++) {
                String json = searchHit.getHits()[i].getSourceAsString();
                putData(json);
            }
            System.out.println("查詢結束");
        }
    }
技術分享圖片
public static void putData(String json) throws IOException {
        String str = json + "\n";
        //寫入本地文件
        String fileTxt = "D:\\data.txt";
        File file = new File(fileTxt);
        if (!file.getParentFile().exists()) {
            file.getParentFile().mkdirs();
        }
        if (!file.exists()) {
            file.createNewFile();
            FileWriter fw = new FileWriter(file, true);
            BufferedWriter bw = new BufferedWriter(fw);
            System.out.println("寫入完成啦啊");
            bw.write(String.valueOf(str));
            bw.flush();
            bw.close();
            fw.close();
        } else {
            FileWriter fw = new FileWriter(file, true);
            BufferedWriter bw = new BufferedWriter(fw);
            System.out.println("追加寫入完成啦啦");
            bw.write(String.valueOf(str));
            bw.flush();
            bw.close();
            fw.close();
        }
    }


    /**
     * 創建索引,並給某些字段指定ik分詞器,以後向該索引中查詢時,就會用ik分詞
     */
    public static void CreateIndexIkTest() throws Exception {
        //創建映射
        XContentBuilder mapping = XContentFactory.jsonBuilder()
                .startObject()
                .startObject("properties")
                //title:字段名,  type:文本類型       analyzer :分詞器類型
                .startObject("title").field("type", "text").field("analyzer", "ik_smart").endObject()   //該字段添加的內容,查詢時將會使用ik_smart分詞
                .startObject("content").field("type", "text").field("analyzer", "ik_max_word").endObject()
                .endObject()
                .endObject();

        //index:索引名   type:類型名(可以自己定義)
        PutMappingRequest putmap = Requests.putMappingRequest("index").type("type").source(mapping);
        //創建索引
        client.admin().indices().prepareCreate("index").execute().actionGet();
        //為索引添加映射
        client.admin().indices().putMapping(putmap).actionGet();

        //調用下面的方法為創建的索引添加內容
        CreateIndex1();
    }

    //這個方法是為上一步創建的索引中添加內容,包括id,id不能重復
    public static void CreateIndex1() throws IOException {
        IndexResponse response = client.prepareIndex("index", "type", "1") //索引,類型,id
                .setSource(jsonBuilder()
                        .startObject()
                        .field("title", "title")   //字段,值
                        .field("content", "content")
                        .endObject()
                ).get();
    }
技術分享圖片

/**
     * 更新索引
     */
    //更新索引,更新剛才創建的索引,如果id相同將會覆蓋掉剛才的內容
    public static void UpdateIndex() throws Exception {
        //每次添加id應該不同,相當於數據表中的主鍵,相同的話將會進行覆蓋
        UpdateResponse response=client.update(new UpdateRequest("index","type","1")
        .doc(XContentFactory.jsonBuilder()
            .startObject()
                .field("title","中華人民共和國國歌,國歌是最好聽的歌")
                .field("content","中華人民共和國國歌,國歌是最好聽的歌")
                .endObject()
        )).get();
    }

    //再插入一條數據
    public static void createIndex2() throws IOException {
        IndexResponse response = client.prepareIndex("index", "type", "2")
                .setSource(jsonBuilder()
                        .startObject()
                        .field("title", "中華民族是偉大的民族")
                        .field("content", "中華民族是偉大的民族")
                        .endObject()
                ).get();
    }

    /**
     * 下面使用index索引下的2個document進行查詢
     */
    public static  void  Search(){
        SearchResponse response1 = client.prepareSearch( "index")  //指定多個索引
                .setTypes("type")  //指定類型
                .setSearchType(SearchType.QUERY_THEN_FETCH)
                .setQuery(QueryBuilders.matchQuery("title", "中華人民共和國國歌"))  // Query
//                .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter
                .setFrom(0).setSize(60).setExplain(true)
                .get();
        long totalHits1= response1.getHits().totalHits;  //命中個數
        System.out.println("response1======="+totalHits1);

        SearchResponse response2 = client.prepareSearch( "index")  //指定多個索引
                .setTypes("type")  //指定類型
                .setSearchType(SearchType.QUERY_THEN_FETCH)
                .setQuery(QueryBuilders.matchQuery("content", "中華人民共和國國歌"))  // Query
//                .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter
                .setFrom(0).setSize(60).setExplain(true)
                .get();
        long totalHits2 = response2.getHits().totalHits;  //命中個數
        System.out.println("response2========="+totalHits2);
    }

    /**
     * GET操作
     */
    public static void get() {
        GetResponse response = client.prepareGet("index", "type", "2").get();
        Map<String, Object> source = response.getSource();
        Set<String> strings = source.keySet();
        Iterator<String> iterator = strings.iterator();
        while (iterator.hasNext()) {
            System.out.println(source.get(iterator.next()));
        }
    }
}

Elasticsearch java api操作