1. 程式人生 > >Elastic search(2)搜尋 包含 scroll 與scan 的使用

Elastic search(2)搜尋 包含 scroll 與scan 的使用

  • 基礎格式

    GET /_search
    {
        "query": YOUR_QUERY_HERE
    }
    
  • 空查詢

    GET /_search
    {
        "query": {
            "match_all": {}
        }
    }
    
  • 查詢子句

    {
        QUERY_NAME: {
            ARGUMENT: VALUE,
            ARGUMENT: VALUE,...
        }
    }
    
    {
        "match": {
            "tweet": "elasticsearch"
            }
    }
    
  • 合併多子句

    • 葉子子句(leaf clauses

      )(比如match子句)用以在將查詢字串與一個欄位(或多欄位)進行比較

    • 複合子句(compound)用以合併其他的子句。例如,bool子句允許你合併其他的合法子句,mustmust_not或者should,如果可能的話:

      {
          "bool": {
              "must":     { "match": { "tweet": "elasticsearch" }},
              "must_not": { "match": { "name":  "mary" }},
              "should":   { "match": { "tweet": "full text" }}
          }
      }
      
  • 查詢與過濾

    • 查詢一般條件為 match 該會計算 相關的sorce
    • 過濾條件為Term 自會根據索引 效能非常高 不在意裡面具體欄位
  • 排序

    GET /_search
    {
        "query" : {
            "filtered" : {
                "filter" : { "term" : { "user_id" : 1 }}
            }
        },
        "sort": [
            { "date":   { "order": "desc" ,"mode":"min"}},
            { "_score": { "order": "desc" }}
        ]}
    
  • 掃描和滾屏

    • scroll

      • 一個滾屏搜尋允許我們做一個初始階段搜尋並且持續批量從Elasticsearch里拉取結果直到沒有結果剩下。這有點像傳統資料庫裡的cursors(遊標)

        滾屏搜尋會及時製作快照。這個快照不會包含任何在初始階段搜尋請求後對index做的修改。它通過將舊的資料檔案儲存在手邊,所以可以保護index的樣子看起來像搜尋開始時的樣子。

    • scan

      • 深度分頁代價最高的部分是對結果的全域性排序,但如果禁用排序,就能以很低的代價獲得全部返回結果。為達成這個目的,可以採用scan(掃描)搜尋模式。掃描模式讓Elasticsearch不排序,只要分片裡還有結果可以返回,就返回一批結果。為了使用scan-and-scroll(掃描和滾屏),需要執行一個搜尋請求,將search_type 設定成scan,並且傳遞一個scroll引數來告訴Elasticsearch滾屏應該持續多長時間。
    • 使用

        #!/usr/bin/env python
        # -*- coding: utf-8 -*-
        
        import json
        import rfc3339
        from datetime import datetime
        import requests
        import time
        
        start_time = rfc3339.rfc3339(datetime.strptime("2018-11-02 12:05:00", "%Y-%m-%d %H:%M:%S"))
        end_time = rfc3339.rfc3339(datetime.strptime("2018-11-02 12:35:00", "%Y-%m-%d %H:%M:%S"))
        print(start_time)
        print(end_time)
        def make_query():
            # query = '"query":{"bool": {"must":{"match":{"message":"百度OCPC渠道 - 100011 - 啟用回撥成功"}}}}'
            q1 = {
                "query": {
                    "bool": {
                        "filter": {
                            "bool": {
                                "must": [{
                                    "range": {
                                        "@timestamp": {
                                            "gte": start_time,
                                            "lte": end_time
                                        }
                                    }
                                },
                                ]
                            }
                        },
                        "must":[{
                            "term":{
                                "act_id":"ad_trace"
                            },
                            "term":{
                                "stat":"297"
                            }}]
                    }
                },
                "size": 1000
            }
            # return query
            return json.dumps(q1)
        
        
        def main(host, tag):
            url = "http://" + host + "/" + tag + "/_search?scroll=10m"
            print(url)
            query_data = make_query()
            print(query_data)
            headers = {'Content-Type': 'application/json'}
            file = open("bak.txt", "w+")
            loop_times = 1
            loop_data_times = 0
            try:
                response = requests.get(url, data=query_data.encode('utf-8'), headers=headers)
                response_data = json.loads(response.text)
                print(response_data)
                total_num = response_data["hits"]["total"]
                datas = response_data["hits"]["hits"]
                scroll_id = response_data["_scroll_id"]
                for d in datas:
                    str_d = json.dumps(d)
                    file.write(str_d)
                    file.write("\n")
                    loop_data_times += 1
                    print(loop_data_times)
                loop = True
                while loop:
                    loop_times += 1
                    url2 = "http://" + host  + "/_search/scroll"
                    query = {
                        "scroll": "10m",
                        "scroll_id": '{scroll_id}'.format(scroll_id=scroll_id)
                    }
                    print("scroll_id ", scroll_id)
                    query_data2 = json.dumps(query)
                    response = requests.get(url2, data=query_data2.encode('utf-8'), headers=headers)
                    print("response.text ", response.text)
                    response_data = json.loads(response.text)
                    scroll_data = response_data["hits"]["hits"]
                    if len(scroll_data) == 0:
                        break
                    for d in scroll_data:
                        str_d = json.dumps(d)
                        file.write(str_d)
                        file.write("\n")
                        loop_data_times += 1
                        print(loop_data_times)
        
                print("total_num ", total_num)
                print("loop_times ", loop_times)
            except Exception as e:
                print(111)
                print(e)
        
        
        if __name__ == "__main__":
            master_host = "XXXX:9200"
            tag_name = "hwsjus_frontend-2018.11.02"
            main(master_host, tag_name)