1. 程式人生 > >kafka-3python生產者和消費者實用demo

kafka-3python生產者和消費者實用demo

python kafka

程序分為productor.py是發送消息端,consumer為消費消息端,

啟動的時候先啟動product再啟動consumer,畢竟只有發了消息,消費端才有消息可以消費,

productor.py

#!/usr/bin/env python2.7
#_*_coding: utf-8 _*_
from kafka import KafkaProducer


kafka_host = ‘192.168.1.200‘  # kafka服務器地址
kafka_port = 9092  # kafka服務器的端口


producer = KafkaProducer(bootstrap_servers=[‘{kafka_host}:{kafka_port}‘.format(
    kafka_host = kafka_host,
    kafka_port = kafka_port
)])


#簡單for循環10次,發送10條消息
for i in range(1,10):
    message_string = ‘some message‘.format(i)

    #調用send方法,發送名字為‘topic1‘的topicid ,發送的消息為message_string
    response = producer.send(‘topic1‘, message_string.encode(‘utf-8‘))
    print response


consumer.py

#!/usr/bin/env python
#_*_coding: utf-8 _*_
import json
from kafka import *

kafka_host = ‘192.168.1.200‘  # kafka服務器地址
kafka_port = 9092  # kafka服務器端口


#消費topic1的topic,並指定group_id(自定義),多個機器或進程想順序消費,可以指定同一個group_id,
# 如果想一條消費多次消費,可以換一個group_id,會從頭開始消費
consumer = KafkaConsumer(
    ‘topic1‘,
    group_id = ‘my-group‘,
    bootstrap_servers = [‘{kafka_host}:{kafka_port}‘.format(kafka_host=kafka_host, kafka_port=kafka_port)]
)
for message in consumer:
    #json讀取kafka的消息
    content = json.loads(message.value)
    print content


本文出自 “馬鵬飛——著” 博客,請務必保留此出處http://mapengfei.blog.51cto.com/1552412/1926068

kafka-3python生產者和消費者實用demo