1. 程式人生 > >【kafka】celery與kafka的聯用問題

【kafka】celery與kafka的聯用問題

log 正常 def producing blog tasks _id info 結果

背景:一個小應用,用celery下發任務,任務內容為kafka生產一些數據。

問題:使用confluent_kafka模塊時,單獨啟用kafka可以正常生產消息,但是套上celery後,kafka就無法將新消息生產到topic隊列中了。

解決:換了個pykafka模塊,結果問題就沒有了。

我很疑惑啊,是我調用confluent_kafka的方法不對嗎,怎麽套上celery就不行了呢?

可以用的pykafka代碼:

tasks.py

from celery import Celery
from pykafka import KafkaClient
import json


app = Celery(
tasks, backend=amqp, broker=amqp://xxx:[email protected]/xxxhost) @app.task def produce(): client = KafkaClient(hosts="localhost:9092") print client.topics topic = client.topics[test_result] with topic.get_sync_producer() as producer: for i in range(3): data
= {"info": {"ip": "1.2.3.4", "port": i}, "type": "test", "task_id": "test_celery_kafka"} print(Producing message: %s % data) producer.produce(json.dumps(data)) print "finish produce" producer.stop() print "stop"

run_worker.py

from tasks import
produce for i in range(1000): result = produce.delay() print result.status

無法正常生產數據的confluent_kafka代碼:

tasks.py

from celery import Celery
from kafka_producer import p
import json


app = Celery(tasks, backend=amqp, broker=amqp://xxx:[email protected]/xxxhost)

@app.task
def produce():
    for i in range(3000):
        data = {"info": {"ip": "1.2.3.4"}, "type": "test", "task_id": "test_celery_kafka"}
        print(Producing message: %s % data)
        p.produce(test_result3, json.dumps(data))
    print "finish produce"
    p.flush()
    print "finish flush"

run_worker.py

from tasks import produce
result = produce.delay()
print result.status
print result.ready()
print result.get()
print result.status

【kafka】celery與kafka的聯用問題