1. 程式人生 > >大資料ETL實踐探索(4)---- 之 搜尋神器elastic search

大資料ETL實踐探索(4)---- 之 搜尋神器elastic search


3.本地檔案匯入aws elastic search

修改訪問策略,設定本地電腦的公網ip,這個經常會變化,每次使用時候需要設定一下

這裡寫圖片描述

安裝anancota
https://www.anaconda.com/download/

初始化環境,win10下開啟Anaconda Prompt 的命令列


conda create -n elasticsearch python=3.6
source activate elasticsearch 
pip install elasticsearch
pip install pandas

使用指令碼如下:windows獲取當前資料夾下所有csv並建立索引入es



from elasticsearch import helpers, Elasticsearch
import pandas as pd
from time import time

import win_unicode_console
win_unicode_console.enable()

import os  
  
def file_name(file_dir):   
    for root, dirs, files in os.walk(file_dir):  
        print(root) #當前目錄路徑  
        print(dirs) #當前路徑下所有子目錄  
print(files) #當前路徑下所有非目錄子檔案 return [item for item in files if '.csv' in item] root_path=os.getcwd()+'\\' fileslist = file_name(root_path) # size of the bulk chunksize=50000 for file in fileslist: t0=time() f = open(root_path+file,'r', encoding='UTF-8') # read csv # 使用 pandas 解析csv
csvfile=pd.read_csv(f, iterator=True, chunksize=chunksize,low_memory=False) # 初始化es es = Elasticsearch(["https://yoururl.amazonaws.com.cn"]) # 初始化索引 try : es.indices.delete(file.strip('.csv').lower()) except : pass es.indices.create(file.strip('.csv').lower()) # start bulk indexing print("now indexing %s..."%(file)) for i,df in enumerate(csvfile): print(i) records=df.where(pd.notnull(df), None).T.to_dict() list_records=[records[it] for it in records] try : helpers.parallel_bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower(),thread_count=8) except : print("error!, skip records...") pass print("done in %.3fs"%(time()-t0))

上一段程式碼發現,資料錄入es時候有問題,由於並行錄入是懶載入的模式,所以資料居然沒錄進去,按照下面連結提供的思路,程式碼需要如下修改:

程式碼例項:
https://www.programcreek.com/python/example/104891/elasticsearch.helpers.parallel_bulk

參考帖子:
https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498


from elasticsearch import helpers, Elasticsearch
import pandas as pd
from time import time

from elasticsearch.helpers import BulkIndexError
from elasticsearch.exceptions import TransportError,ConnectionTimeout,ConnectionError
import traceback  
import logging
logging.basicConfig(filename='log-for_.log',
                    format='%(asctime)s -%(name)s-%(levelname)s-%(module)s:%(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S %p',
                    level=logging.ERROR)


import win_unicode_console
win_unicode_console.enable()

import os  
  
def file_name(file_dir):   
    for root, dirs, files in os.walk(file_dir):  
        print(root) #當前目錄路徑  
        print(dirs) #當前路徑下所有子目錄  
        print(files) #當前路徑下所有非目錄子檔案  
        return [item for item in files if '.csv' in item]

#NAME = "PV_PROV_LOG"
root_path=os.getcwd()+'\\'
#csv_filename="%s.csv" % NAME



fileslist = file_name(root_path)

# size of the bulk
chunksize=1000

for file in fileslist:

    t0=time()
    # open csv file
    
    f = open(root_path+file,'r', encoding='UTF-8') # read csv

    # parse csv with pandas
    csvfile=pd.read_csv(f, iterator=True, chunksize=chunksize,low_memory=False) 

    # init ElasticSearch
    es = Elasticsearch(["..."])

    # init index
    try :
        es.indices.delete(file.strip('.csv').lower())
    except :
        pass

    es.indices.create(file.strip('.csv').lower())

    # start bulk indexing 
    print("now indexing %s..."%(file))

    for i,df in enumerate(csvfile): 
        print(i)
        records=df.where(pd.notnull(df), None).T.to_dict()
        list_records=[records[it] for it in records]
        #print(list_records)
        try :
            #helpers.bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower())
            for success, info in helpers.parallel_bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower(),thread_count=8):
                if not success:
                    print('A document failed:', info)
            #helpers.parallel_bulk(es, list_records, index=file.strip('.csv').lower(), doc_type=file.strip('.csv').lower(),thread_count=8)
        except ConnectionTimeout:
            logging.error("this is ES ConnectionTimeout ERROR \n %s"%str(traceback.format_exc()))
            logging.info('retry bulk es')
        except TransportError:
            
            logging.error("this is ES TransportERROR \n %s"%str(traceback.format_exc()))
            logging.info('retry bulk es')
        except ConnectionError:
            logging.error("this is ES ConnectionError ERROR \n %s"%str(traceback.format_exc()))
            logging.info('retry bulk es')
            
        except BulkIndexError:
            logging.error("this is ES BulkIndexError ERROR \n %s"%str(traceback.format_exc()))
            logging.info('retry bulk es')
            pass
        except Exception:
            logging.error("exception not match \n %s"%str(traceback.format_exc()))
            logging.error('retry bulk es')
            pass
        except :
            print("error!, skiping some records")
            print (list_records)
            print(json.loads(result))
            pass

    print("done in %.3fs"%(time()-t0))