1. 程式人生 > >python消費kafka資料批量插入到es

python消費kafka資料批量插入到es

1、es的批量插入

這是為了方便後期配置的更改,把配置資訊放在logging.conf中
用elasticsearch來實現批量操作,先安裝依賴包,sudo pip install Elasticsearch2

from elasticsearch import Elasticsearch  
class ImportEsData:

    logging.config.fileConfig("logging.conf")
    logger = logging.getLogger("msg")

    def __init__(self,hosts,index,type)
:
self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000) self.index = index self.type = type def set_date(self,data): # 批量處理 # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()}) self.es.index(index=self.index,doc_type=self.index,body=data)

2、使用pykafka消費kafka

1.因為kafka是0.8,pykafka不支援zk,只能用get_simple_consumer來實現
2.為了實現多個應用同時消費而且不重消費,所以一個應用消費一個partition
3. 為是確保消費資料量在不滿足10000這個批量值,能在一個時間範圍內插入到es中,這裡設定consumer_timeout_ms一個超時等待時間,退出等待消費阻塞。
4.退出等待消費阻塞後導致無法再消費資料,因此在獲取self.consumer 的外層加入了while True 一個死迴圈

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient import logging import logging.config from ConfigUtil import ConfigUtil import datetime class KafkaPython: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") logger_data = logging.getLogger("data") def __init__(self): self.server = ConfigUtil().get("kafka","kafka_server") self.topic = ConfigUtil().get("kafka","topic") self.group = ConfigUtil().get("kafka","group") self.partition_id = int(ConfigUtil().get("kafka","partition")) self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms")) self.consumer = None self.hosts = ConfigUtil().get("es","hosts") self.index_name = ConfigUtil().get("es","index_name") self.type_name = ConfigUtil().get("es","type_name") def getConnect(self): client = KafkaClient(self.server) topic = client.topics[self.topic] p = topic.partitions ps={p.get(self.partition_id)} self.consumer = topic.get_simple_consumer( consumer_group=self.group, auto_commit_enable=True, consumer_timeout_ms=self.consumer_timeout_ms, # num_consumer_fetchers=1, # consumer_id='test1', partitions=ps ) self.starttime = datetime.datetime.now() def beginConsumer(self): print("beginConsumer kafka-python") imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name) #建立ACTIONS count = 0 ACTIONS = [] while True: endtime = datetime.datetime.now() print (endtime - self.starttime).seconds for message in self.consumer: if message is not None: try: count = count + 1 # print(str(message.partition.id)+","+str(message.offset)+","+str(count)) # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count)) action = { "_index": self.index_name, "_type": self.type_name, "_source": message.value } ACTIONS.append(action) if len(ACTIONS) >= 10000: imprtEsData.set_date(ACTIONS) ACTIONS = [] self.consumer.commit_offsets() endtime = datetime.datetime.now() print (endtime - self.starttime).seconds #break except (Exception) as e: # self.consumer.commit_offsets() print(e) self.logger.error(e) self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n") # self.logger_data.error(message.value+"\n") # self.consumer.commit_offsets() if len(ACTIONS) > 0: self.logger.info("等待時間超過,consumer_timeout_ms,把集合資料插入es") imprtEsData.set_date(ACTIONS) ACTIONS = [] self.consumer.commit_offsets() def disConnect(self): self.consumer.close() from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk class ImportEsData: logging.config.fileConfig("logging.conf") logger = logging.getLogger("msg") def __init__(self,hosts,index,type): self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000) self.index = index self.type = type def set_date(self,data): # 批量處理 success = bulk(self.es, data, index=self.index, raise_on_error=True) self.logger.info(success)

3.執行

if __name__ == '__main__':
    kp = KafkaPython()
    kp.getConnect()
    kp.beginConsumer()
    # kp.disConnect()

注:簡單的寫了一個從kafka中讀取資料到一個list裡,當資料達到一個閾值時,在批量插入到 es的外掛
現在還在批量的壓測中。。。
歡迎一起討論