1. 程式人生 > >ElasticSearch增刪改查之python sort、scroll、scan

ElasticSearch增刪改查之python sort、scroll、scan

1、用python操作elasticsearch有兩個庫可以呼叫

# ElasticSearch不支援scroll(分頁查詢)查詢
from pyelasticsearch import ElasticSearch
# Elasticsearch支援scroll查詢,一般建議使用這個庫
from elasticsearch import helpers,Elasticsearch

""" 注意:以上兩個庫各自在查詢或更新傳遞的引數是不同的 """
# ElasticSearch查詢使用方式
ES = ElasticSearch(URL)
res = ES.search(
            query,
            index=index,
            size=size
        )

# Elasticsearch查詢使用方式
ES = ElasticSearch(URL)
res = ES.search(
            body=query,
            index=index,
            size=size
        )
  • ES中的高效能的部分大部分在helpers中實現
  1. 如果要批量查詢大量的資料,建議使用helpers.scan,search查詢最大隻能返回10000條資料
  2. 是有效能限制的

2、Elasticsearch中search scroll使用

  • scroll的優勢:支援分頁查詢,自動排序,並把查詢結果返回
  • scroll使用方式:每次查詢獲取下一次查詢需要使用的scroll_id,查詢時傳遞引數scroll='2m',後臺ES即可以將查詢的結果儲存2分鐘
  • 查詢時常用技巧
    1、將必須包含欄位新增到 must
    2、將必須不包含欄位新增到 must_not
    3、單一條件匹配選用 term,多個單一條件任何一個匹配選用 terms

    4、from 指定從結果資料中的第多少條開始返回,from的最大值超不過2000,所以在使用大資料查詢基本使用不上
    5、size 指定結果資料中共返回多少條資料
# 在使用時一定要注意Elasticsearch與ElasticSearch還是有一定的區別的,傳遞引數不一樣
from elasticsearch import Elasticsearch

ES_SEARCH_HOSTURL = 'http://domain:9000/'
ES = Elasticsearch(ES_SEARCH_HOSTURL)

query = {
        "query": {
            "bool": {
                "must": [],
                "must_not": []
            }
        }
    }

# index可以為索引的列表或者單個索引,如果是索引的列表,則使用search時不能傳遞doc_type,也就是如果同時查詢多個索引,不能指定文件的型別
def scroll_search(index, query, size, page):
    """ 使用scroll查詢ES,實現分頁查詢

    :param index: type of list or str
    :param query: type of dict,查詢條件
    :param size: type of int(1-100),指定返回資料中每頁的資料條數
    :param page: type of int(>0),指定返回第幾頁資料
    :return: 查詢結果總數和某頁的資料
    """

    try:
        res = ES.search(
            index=index,
            scroll='5m',            # 查詢一次資料在ES中快取5分鐘再銷燬
            size=size,
            body=query,
            sort="modified:desc",      # sort增加排序功能,多個欄位排序可以以逗號隔開
            # sort="modified:desc,_score:desc",  # 指定某個欄位按照升序或者降序排列,modifie為資料欄位
            # sort="_doc",        # ES會計算一個最優的排序方案
            # search_type='scan',   # 如果不關注排序的話,可以增加該欄位,查詢速度十分高效,效能比較好
        )
    except Exception as e:
        raise e
    else:
        sid = res['_scroll_id']           # 獲得查詢下一條資料的scroll_id
        total = res['hits']['total']      # 獲取查詢結果中總資料的條數
        hits = res["hits"]["hits"]        # 首次查詢返回第一頁的結果資料
        results = [hit["_source"] for hit in hits]

        first_page = 1
        while page > first_page:
            try:
                res = ES.scroll(scroll_id=sid, scroll='2m')
            except Exception as e:
                raise e
            else:
                sid = res['_scroll_id']
                hits = res["hits"]["hits"]
                results = [hit["_source"] for hit in hits]
                first_page += 1

    return total, results

# terms使用,其中categories為list型別,含義為categories中任何一個滿足條件即可
temp = {"terms": {"categories": categories}}
query["query"]["bool"]["must"].append(temp)

3、Elasticsearch中update區域性更新

""" 功能:從多個索引中查詢需要更新的對應資料的id,再更新此資料 """
from elasticsearch import Elasticsearch

ES_SEARCH_HOSTURL = 'http://domain:9000/'
ES = Elasticsearch(ES_SEARCH_HOSTURL)

indexs = [index1, index2]

query = {
        "query": {
            "bool": {
                "must": []
            }
        }
    }

for index in indexs:
	try:
		res = ES.search(body=query, index=index, doc_type='info')
	except Exception as e:
		print(e)
		# logger.error("Request search_indicator function error. Error: %s" % e)
		message = "Internal server error"
		results = data_formatter(message=message)
		return Response(results, status=500)
	else:
		if res["hits"]["total"] > 0:
			hits = res["hits"]["hits"][0]
			update_id = hits["_id"]

			try:
                                # 注意,如果ES為pyelasticsearch的物件,則需要更新的引數傳遞形式應該為doc= {"revoked": revoked}
				ES.update(index=index, doc_type='indicator_info', id=update_id, body={"doc": {"revoked": revoked}})
			except Exception as e:
				print(e)
				message = "Update {} failed".format(id)
				results = data_formatter(message=message)
				return Response(results, status=500)
			else:
				results = data_formatter()
				return Response(results, status=200)