使用Python-elasticsearch-bulk批量快速向elasticsearch插入資料
阿新 • • 發佈:2018-11-05
最近遇到一個批量向elasticsearch插入資料低效率的問題,在網上找到如下解決方案:
對,就是使用elasticsearch內建的bulk API進行批量的插入操作。
同樣,python elasticsearch lib也提供了bulk API的功能,因此便有如下程式碼:
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import pymysql
import time
# 連線ES
es = Elasticsearch(
['127.0.0.1'],
port=9200
)
# 連線MySQL
print("Connect to mysql...")
mysql_db = "test"
m_conn = pymysql.connect(host='127.0.0.1, port=3306, user='root', passwd='root', db=mysql_db, charset='utf8')
m_cursor = m_conn.cursor()
try:
num_id = 0
while True:
s = time.time()
# 查詢資料
sql = "select name,age,area from testTable LIMIT {}, 100000".format(num_id*100000)
# 這裡假設查詢出來的結果為 張三 26 北京
m_cursor.execute(sql)
query_results = m_cursor.fetchall()
if not query_results:
print("MySQL查詢結果為空 num_id=<{}>".format(num_id))
break
else:
actions = []
for line in query_results:
# 拼接插入資料結構
action = {
"_index": "company_base_info_2",
"_type": "company_info",
"_source": {
"name": line[0],
"age": line[1],
"area": line[2],
}
}
# 形成一個長度與查詢結果數量相等的列表
actions.append(action)
# 批量插入
a = helpers.bulk(es, actions)
e = time.time()
print("{} {}s".format(a, e-s))
num_id += 1
finally:
m_cursor.close()
m_conn.close()
print("MySQL connection close...")
程式碼的關鍵在於構造action結構,放入列表中,給helpers.bulk(es, actions)傳參,呼叫方法真的是很簡單了。