大資料ETL實踐探索(4)---- 之 搜尋神器elastic search
阿新 • • 發佈:2018-12-09
3.本地檔案匯入aws elastic search
修改訪問策略,設定本地電腦的公網ip,這個經常會變化,每次使用時候需要設定一下
安裝anancota
https://www.anaconda.com/download/
初始化環境,win10下開啟Anaconda Prompt 的命令列
conda create -n elasticsearch python=3.6
source activate elasticsearch
pip install elasticsearch
pip install pandas
使用指令碼如下:windows獲取當前資料夾下所有csv並建立索引入es
from elasticsearch import helpers, Elasticsearch
import pandas as pd
from time import time
import win_unicode_console
win_unicode_console.enable()
import os
def file_name(file_dir):
for root, dirs, files in os.walk(file_dir):
print(root) #當前目錄路徑
print(dirs) #當前路徑下所有子目錄
print(files) #當前路徑下所有非目錄子檔案
return [item for item in files if '.csv' in item]
root_path=os.getcwd()+'\\'
fileslist = file_name(root_path)
# size of the bulk
chunksize=50000
for file in fileslist:
t0=time()
f = open(root_path+file,'r', encoding='UTF-8') # read csv
# 使用 pandas 解析csv
csvfile=pd.read_csv(f, iterator=True, chunksize=chunksize,low_memory=False)
# 初始化es
es = Elasticsearch(["https://yoururl.amazonaws.com.cn"])
# 初始化索引
try :
es.indices.delete(file.strip('.csv').lower())
except :
pass
es.indices.create(file.strip('.csv').lower())
# start bulk indexing
print("now indexing %s..."%(file))
for i,df in enumerate(csvfile):
print(i)
records=df.where(pd.notnull(df), None).T.to_dict()
list_records=[records[it] for it in records]
try :
helpers.parallel_bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower(),thread_count=8)
except :
print("error!, skip records...")
pass
print("done in %.3fs"%(time()-t0))
上一段程式碼發現,資料錄入es時候有問題,由於並行錄入是懶載入的模式,所以資料居然沒錄進去,按照下面連結提供的思路,程式碼需要如下修改:
程式碼例項:
https://www.programcreek.com/python/example/104891/elasticsearch.helpers.parallel_bulk
參考帖子:
https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498
from elasticsearch import helpers, Elasticsearch
import pandas as pd
from time import time
from elasticsearch.helpers import BulkIndexError
from elasticsearch.exceptions import TransportError,ConnectionTimeout,ConnectionError
import traceback
import logging
logging.basicConfig(filename='log-for_.log',
format='%(asctime)s -%(name)s-%(levelname)s-%(module)s:%(message)s',
datefmt='%Y-%m-%d %H:%M:%S %p',
level=logging.ERROR)
import win_unicode_console
win_unicode_console.enable()
import os
def file_name(file_dir):
for root, dirs, files in os.walk(file_dir):
print(root) #當前目錄路徑
print(dirs) #當前路徑下所有子目錄
print(files) #當前路徑下所有非目錄子檔案
return [item for item in files if '.csv' in item]
#NAME = "PV_PROV_LOG"
root_path=os.getcwd()+'\\'
#csv_filename="%s.csv" % NAME
fileslist = file_name(root_path)
# size of the bulk
chunksize=1000
for file in fileslist:
t0=time()
# open csv file
f = open(root_path+file,'r', encoding='UTF-8') # read csv
# parse csv with pandas
csvfile=pd.read_csv(f, iterator=True, chunksize=chunksize,low_memory=False)
# init ElasticSearch
es = Elasticsearch(["..."])
# init index
try :
es.indices.delete(file.strip('.csv').lower())
except :
pass
es.indices.create(file.strip('.csv').lower())
# start bulk indexing
print("now indexing %s..."%(file))
for i,df in enumerate(csvfile):
print(i)
records=df.where(pd.notnull(df), None).T.to_dict()
list_records=[records[it] for it in records]
#print(list_records)
try :
#helpers.bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower())
for success, info in helpers.parallel_bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower(),thread_count=8):
if not success:
print('A document failed:', info)
#helpers.parallel_bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower(),thread_count=8)
except ConnectionTimeout:
logging.error("this is ES ConnectionTimeout ERROR \n %s"%str(traceback.format_exc()))
logging.info('retry bulk es')
except TransportError:
logging.error("this is ES TransportERROR \n %s"%str(traceback.format_exc()))
logging.info('retry bulk es')
except ConnectionError:
logging.error("this is ES ConnectionError ERROR \n %s"%str(traceback.format_exc()))
logging.info('retry bulk es')
except BulkIndexError:
logging.error("this is ES BulkIndexError ERROR \n %s"%str(traceback.format_exc()))
logging.info('retry bulk es')
pass
except Exception:
logging.error("exception not match \n %s"%str(traceback.format_exc()))
logging.error('retry bulk es')
pass
except :
print("error!, skiping some records")
print (list_records)
print(json.loads(result))
pass
print("done in %.3fs"%(time()-t0))