kafka-3python生產者和消費者實用demo
阿新 • • 發佈:2017-05-16
python kafka
程序分為productor.py是發送消息端,consumer為消費消息端,
啟動的時候先啟動product再啟動consumer,畢竟只有發了消息,消費端才有消息可以消費,
productor.py
#!/usr/bin/env python2.7 #_*_coding: utf-8 _*_ from kafka import KafkaProducer kafka_host = ‘192.168.1.200‘ # kafka服務器地址 kafka_port = 9092 # kafka服務器的端口 producer = KafkaProducer(bootstrap_servers=[‘{kafka_host}:{kafka_port}‘.format( kafka_host = kafka_host, kafka_port = kafka_port )]) #簡單for循環10次,發送10條消息 for i in range(1,10): message_string = ‘some message‘.format(i) #調用send方法,發送名字為‘topic1‘的topicid ,發送的消息為message_string response = producer.send(‘topic1‘, message_string.encode(‘utf-8‘)) print response
consumer.py
#!/usr/bin/env python #_*_coding: utf-8 _*_ import json from kafka import * kafka_host = ‘192.168.1.200‘ # kafka服務器地址 kafka_port = 9092 # kafka服務器端口 #消費topic1的topic,並指定group_id(自定義),多個機器或進程想順序消費,可以指定同一個group_id, # 如果想一條消費多次消費,可以換一個group_id,會從頭開始消費 consumer = KafkaConsumer( ‘topic1‘, group_id = ‘my-group‘, bootstrap_servers = [‘{kafka_host}:{kafka_port}‘.format(kafka_host=kafka_host, kafka_port=kafka_port)] ) for message in consumer: #json讀取kafka的消息 content = json.loads(message.value) print content
本文出自 “馬鵬飛——著” 博客,請務必保留此出處http://mapengfei.blog.51cto.com/1552412/1926068
kafka-3python生產者和消費者實用demo