1. 程式人生 > >Elasticsearch+Mongo億級別數據導入及查詢實踐

Elasticsearch+Mongo億級別數據導入及查詢實踐

參數配置 doc 時間 col lin 時區 start sta bulk

數據方案:
  • 在Elasticsearch中通過code及time字段查詢對應doc的mongo_id字段獲得mongodb中的主鍵_id
  • 通過獲得id再進入mongodb進行查詢
1,數據情況:
  • 全部為股票及指數的分鐘K線數據(股票代碼區分度較高)
  • Elasticsearch及mongodb都未分片且未優化參數配置
  • mongodb數據量:

    技術分享圖片

  • Elasticsearch數據量:

    技術分享圖片

2,將數據從mongo源庫導入Elasticsearch

import time
from pymongo import MongoClient
from elasticsearch import
Elasticsearch from elasticsearch.helpers import bulk es = Elasticsearch() conn = MongoClient(127.0.0.1, 27017) db = conn.kline_db my_set = db.min_kline x = 1 tmp = [] #此處有個坑mongo查詢時由於數據量比較大時間較長需要設置遊標不過期:no_cursor_timeout=True for i in my_set.find(no_cursor_timeout=True): x+=1 #每次插入100000條 if
x%100000 == 99999: #es批量插入 success, _ = bulk(es, tmp, index=test_2, raise_on_error=True) print(Performed %d actions % success) tmp = [] if i[market] == sz: market = 0 else: market = 1 #此處有個秒數時間類型及時區轉換 tmp.append({"_index":test_2,"_type
": kline,_source:{code:i[code],market:market, time:time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i[kline_time]/1000 - 8*60*60)) ,mongo_id:str(i[_id])}}) #將最後剩余在tmp中的數據插入 if len(tmp)>0: success, _ = bulk(es, tmp, index=test_2, raise_on_error=True) print(Performed %d actions % success)

3,Elasticsearch+mongo查詢時間統計

import time
from pymongo import MongoClient
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from bson.objectid import ObjectId

#es連接
es = Elasticsearch()

#mongo連接
conn = MongoClient(127.0.0.1, 27017)
db = conn.kline_db  #連接kline_db數據庫,沒有則自動創建
my_set = db.min_kline

tmp = []

#計算運行時間裝飾器
def cal_run_time(func):
    def wrapper(*args,**kwargs):
        start_time = time.time()
        res = func(*args,**kwargs)
        end_time = time.time()
        print(str(func) +---run time--- %s % str(end_time-start_time))
        return res
    return wrapper

@cal_run_time
def query_in_mongo(tmp_list):
    k_list = []
    kline_data = my_set.find({_id:{$in:tmp_list}})
    for k in kline_data:
        k_list.append(k)
    return k_list

@cal_run_time
def query_in_es():
    #bool多條件查詢 must相當於and
    body = {
        "query": {
            "bool": {
                "must": [{
                    "range": {#範圍查詢
                        "time": {
                            "gte": 2017-01-10 00:00:00,  # >=
                            "lte": 2017-04-12 00:00:00  # <=
                        }
                    }
                },
                    {"terms": {# == 或  in:terms 精確查詢
                        "code": [000002,000001]
                    }
                    }
                ]
            }

        }
    }

    #根據body條件記性查詢
    scanResp = scan(es, body, scroll="10m", index="test_2",doc_type="kline", timeout="10m")

    #解析結果字典並放入tmp列表中
    for resp in scanResp:
        tmp.append(ObjectId(resp[_source][mongo_id]))

    print(len(tmp))

    #--------------此處有個坑,直接使用search方法查詢到的結果集中最多只有10條記錄----------------
    # zz = es.search(index="test_2", doc_type="kline", body=body)
    # print(zz[‘hits‘][‘total‘])
    # for resp in zz[‘hits‘][‘hits‘]:
    #     tmp.append(ObjectId(resp[‘_source‘][‘mongo_id‘]))

query_in_es()

query_in_mongo(tmp)

運行結果如下:

第一行:查詢的doc個數:28320

第二行:es查詢所用時間:0.36s

第三行:mongo使用_id查詢所用時間 :0.34s

技術分享圖片

從結果來看對於3億多數據的查詢Elasticsearch的速度還是相當不錯的

※Elasticsearch主要的優勢在於可以進行分詞模糊查詢,所以股票K線並不是完全適應此場景。

※Elasticsearch+Mongo這個架構主要針對:使用mongo存儲海量數據,且這張表更新頻繁。

技術分享圖片 技術分享圖片 技術分享圖片 技術分享圖片 技術分享圖片 技術分享圖片 技術分享圖片 技術分享圖片 技術分享圖片

Elasticsearch+Mongo億級別數據導入及查詢實踐