1. 程式人生 > >使用Python-elasticsearch-bulk批量快速向elasticsearch插入資料

使用Python-elasticsearch-bulk批量快速向elasticsearch插入資料

最近遇到一個批量向elasticsearch插入資料低效率的問題,在網上找到如下解決方案:

https://segmentfault.com/q/1010000005027014

對,就是使用elasticsearch內建的bulk API進行批量的插入操作。
同樣,python elasticsearch lib也提供了bulk API的功能,因此便有如下程式碼:

from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pymysql
import time

# 連線ES
es =
Elasticsearch( ['127.0.0.1'], port=9200 ) # 連線MySQL print("Connect to mysql...") mysql_db = "test" m_conn = pymysql.connect(host='127.0.0.1, port=3306, user='root', passwd='root', db=mysql_db, charset='utf8') m_cursor = m_conn.cursor() try: num_id = 0 while True: s = time.time() # 查詢資料
sql = "select name,age,area from testTable LIMIT {}, 100000".format(num_id*100000) # 這裡假設查詢出來的結果為 張三 26 北京 m_cursor.execute(sql) query_results = m_cursor.fetchall() if not query_results: print("MySQL查詢結果為空 num_id=<{}>".format(num_id)) break
else: actions = [] for line in query_results: # 拼接插入資料結構 action = { "_index": "company_base_info_2", "_type": "company_info", "_source": { "name": line[0], "age": line[1], "area": line[2], } } # 形成一個長度與查詢結果數量相等的列表 actions.append(action) # 批量插入 a = helpers.bulk(es, actions) e = time.time() print("{} {}s".format(a, e-s)) num_id += 1 finally: m_cursor.close() m_conn.close() print("MySQL connection close...")

程式碼的關鍵在於構造action結構,放入列表中,給helpers.bulk(es, actions)傳參,呼叫方法真的是很簡單了。