1. 程式人生 > >ElasticSearch 資料增刪改實現

ElasticSearch 資料增刪改實現

前言

Restful API 實現

建立索引

 建立索引

    curl -XPOST 'localhost:9200/customer?pretty'

這裡寫圖片描述

插入資料

 單條插入-指定id

curl -XPOST 'localhost:9200/customer/external/1?pretty' -d' {"name": "John Doe" }'

 單條插入-不指定id

curl -XPOST 'localhost:9200/customer/external?pretty' -d' {"name": "Jane Doe" }'

 批量插入:

curl -XPOST
'localhost:9200/bank/account/_bulk?pretty' --data-binary “@accounts.json"

刪除資料

 刪除資料:下面的語句將執行刪除Customer中ID為2的資料

curl -XDELETE 'localhost:9200/customer/external/2?pretty'

 根據查詢條件刪除(PS:這條本人沒試過,我用的還是2.4版本,這是參照官網資料的5.4版本寫的)

curl -XPOST 'localhost:9200/customer/external/_delete_by_query?pretty' -d '{
    "
query": { "match": { "name": "John" } } }'

 刪除全部

{
    "query": {
        "match_all": {}
    }
}

更新資料

 更新文件: 修改id=1的name屬性,並直接增加屬性和屬性值

curl -XPOST 'localhost:9200/customer/external/1/_update?pretty' -d ' {
    "doc": {
        "name": "xyd",
        "age": 20
    }
}'

 更新索引–指令碼方式

curl -XPOST 'localhost:9200/customer/external/1/_update?pretty' -d' {
    "script": "ctx._source.age += 5"
}'

Python API 實現

說明

  以下程式碼實現是:單條增加、根據_id刪除、根據_id更新、批量增加等介面。除錯的時候建議一個一個功能執行。

程式碼

# -*- coding: utf-8 -*-

from elasticsearch.helpers import bulk
import elasticsearch


class ElasticSearchClient(object):
    @staticmethod
    def get_es_servers():
        es_servers = [{
            "host": "localhost",
            "port": "9200"
        }]
        es_client = elasticsearch.Elasticsearch(hosts=es_servers)
        return es_client


class LoadElasticSearch(object):
    def __init__(self):
        self.index = "hz"
        self.doc_type = "xyd"
        self.es_client = ElasticSearchClient.get_es_servers()
        self.set_mapping()

    def set_mapping(self):
        """
        設定mapping
        """
        mapping = {
            self.doc_type: {
                "properties": {
                    "document_id": {
                        "type": "integer"
                    },
                    "title": {
                        "type": "string"
                    },
                    "content": {
                        "type": "string"
                    }
                }
            }
        }

        if not self.es_client.indices.exists(index=self.index):
            # 建立Index和mapping
            self.es_client.indices.create(index=self.index, body=mapping, ignore=400)
            self.es_client.indices.put_mapping(index=self.index, doc_type=self.doc_type, body=mapping)

    def add_date(self, row_obj):
        """
        單條插入ES
        """
        _id = row_obj.get("_id", 1)
        row_obj.pop("_id")
        self.es_client.index(index=self.index, doc_type=self.doc_type, body=row_obj, id=_id)

    def add_date_bulk(self, row_obj_list):
        """
        批量插入ES
        """
        load_data = []
        i = 1
        bulk_num = 2000  # 2000條為一批
        for row_obj in row_obj_list:
            action = {
                "_index": self.index,
                "_type": self.doc_type,
                "_id": row_obj.get('_id', 'None'),
                "_source": {
                    'document_id': row_obj.get('document_id', None),
                    'title': row_obj.get('title', None),
                    'content': row_obj.get('content', None),
                }
            }
            load_data.append(action)
            i += 1
            # 批量處理
            if len(load_data) == bulk_num:
                print '插入', i / bulk_num, '批資料'
                print len(load_data)
                success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
                del load_data[0:len(load_data)]
                print success, failed

        if len(load_data) > 0:
            success, failed = bulk(self.es_client, load_data, index=self.index, raise_on_error=True)
            del load_data[0:len(load_data)]
            print success, failed

    def update_by_id(self, row_obj):
        """
        根據給定的_id,更新ES文件
        :return:
        """
        _id = row_obj.get("_id", 1)
        row_obj.pop("_id")
        self.es_client.update(index=self.index, doc_type=self.doc_type, body={"doc": row_obj}, id=_id)

    def delete_by_id(self, _id):
        """
        根據給定的id,刪除文件
        :return:
        """
        self.es_client.delete(index=self.index, doc_type=self.doc_type, id=_id)

if __name__ == '__main__':
    write_obj = {
        "_id": 1,
        "document_id": 1,
        "title": u"Hbase 測試資料",
        "content": u"Hbase 日常運維,這是個假資料監控Hbase執行狀況。通常IO增加時io wait也會增加,現在FMS的機器正常情況......",
    }

    load_es = LoadElasticSearch()

    # 插入單條資料測試
    load_es.add_date(write_obj)

    # 根據id更新測試
    # write_obj["title"] = u"更新標題"
    # load_es.update_by_id(write_obj)

    # 根據id刪除測試
    # load_es.delete_by_id(1)

    # 批量插入資料測試
    # row_obj_list = []
    # for i in range(2, 2200):
    #     temp_obj = write_obj.copy()
    #     temp_obj["_id"] = i
    #     temp_obj["document_id"] = i
    #     row_obj_list.append(temp_obj)
    # load_es.add_date_bulk(row_obj_list)

結果顯示

單條增加:
這裡寫圖片描述
單條修改:
這裡寫圖片描述
單條刪除:

批量增加:
這裡寫圖片描述