Elasticsearch+Mongo億級別數據導入及查詢實踐
阿新 • • 發佈:2018-01-28
參數配置 doc 時間 col lin 時區 start sta bulk 數據方案:
- 在Elasticsearch中通過code及time字段查詢對應doc的mongo_id字段獲得mongodb中的主鍵_id
- 通過獲得id再進入mongodb進行查詢
- 全部為股票及指數的分鐘K線數據(股票代碼區分度較高)
- Elasticsearch及mongodb都未分片且未優化參數配置
- mongodb數據量:
- Elasticsearch數據量:
2,將數據從mongo源庫導入Elasticsearch
import time from pymongo import MongoClient from elasticsearch importElasticsearch from elasticsearch.helpers import bulk es = Elasticsearch() conn = MongoClient(‘127.0.0.1‘, 27017) db = conn.kline_db my_set = db.min_kline x = 1 tmp = [] #此處有個坑mongo查詢時由於數據量比較大時間較長需要設置遊標不過期:no_cursor_timeout=True for i in my_set.find(no_cursor_timeout=True): x+=1 #每次插入100000條 ifx%100000 == 99999: #es批量插入 success, _ = bulk(es, tmp, index=‘test_2‘, raise_on_error=True) print(‘Performed %d actions‘ % success) tmp = [] if i[‘market‘] == ‘sz‘: market = 0 else: market = 1 #此處有個秒數時間類型及時區轉換 tmp.append({"_index":‘test_2‘,"_type": ‘kline‘,‘_source‘:{‘code‘:i[‘code‘],‘market‘:market, ‘time‘:time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(i[‘kline_time‘]/1000 - 8*60*60)) ,‘mongo_id‘:str(i[‘_id‘])}}) #將最後剩余在tmp中的數據插入 if len(tmp)>0: success, _ = bulk(es, tmp, index=‘test_2‘, raise_on_error=True) print(‘Performed %d actions‘ % success)
3,Elasticsearch+mongo查詢時間統計
import time from pymongo import MongoClient from elasticsearch import Elasticsearch from elasticsearch.helpers import scan from bson.objectid import ObjectId #es連接 es = Elasticsearch() #mongo連接 conn = MongoClient(‘127.0.0.1‘, 27017) db = conn.kline_db #連接kline_db數據庫,沒有則自動創建 my_set = db.min_kline tmp = [] #計算運行時間裝飾器 def cal_run_time(func): def wrapper(*args,**kwargs): start_time = time.time() res = func(*args,**kwargs) end_time = time.time() print(str(func) +‘---run time--- %s‘ % str(end_time-start_time)) return res return wrapper @cal_run_time def query_in_mongo(tmp_list): k_list = [] kline_data = my_set.find({‘_id‘:{‘$in‘:tmp_list}}) for k in kline_data: k_list.append(k) return k_list @cal_run_time def query_in_es(): #bool多條件查詢 must相當於and body = { "query": { "bool": { "must": [{ "range": {#範圍查詢 "time": { "gte": ‘2017-01-10 00:00:00‘, # >= "lte": ‘2017-04-12 00:00:00‘ # <= } } }, {"terms": {# == 或 in:terms 精確查詢 "code": [‘000002‘,‘000001‘] } } ] } } } #根據body條件記性查詢 scanResp = scan(es, body, scroll="10m", index="test_2",doc_type="kline", timeout="10m") #解析結果字典並放入tmp列表中 for resp in scanResp: tmp.append(ObjectId(resp[‘_source‘][‘mongo_id‘])) print(len(tmp)) #--------------此處有個坑,直接使用search方法查詢到的結果集中最多只有10條記錄---------------- # zz = es.search(index="test_2", doc_type="kline", body=body) # print(zz[‘hits‘][‘total‘]) # for resp in zz[‘hits‘][‘hits‘]: # tmp.append(ObjectId(resp[‘_source‘][‘mongo_id‘])) query_in_es() query_in_mongo(tmp)
運行結果如下:
第一行:查詢的doc個數:28320
第二行:es查詢所用時間:0.36s
第三行:mongo使用_id查詢所用時間 :0.34s
從結果來看對於3億多數據的查詢Elasticsearch的速度還是相當不錯的
※Elasticsearch主要的優勢在於可以進行分詞模糊查詢,所以股票K線並不是完全適應此場景。
※Elasticsearch+Mongo這個架構主要針對:使用mongo存儲海量數據,且這張表更新頻繁。
Elasticsearch+Mongo億級別數據導入及查詢實踐