Centos下PHP安裝Kafka擴充套件操作教程
阿新 • • 發佈:2018-11-09
說明:網上有好幾種PHP操作kafka的擴充套件,有kafka-php和php-rdkafka兩種是比較流行的。但其中kafka-php功能較全,但是使用composer安裝,對於內網使用者不能訪問外網,因此,我們選擇php-rdkafka。
php-rdkafka依賴librdkafka,先下載安裝包librdkafka和php-rdkafka
1、新建一個目錄用來存在擴充套件包
mkdir /home/kafka
2、將兩個擴充套件包放到/home/kafka目錄下
3、安裝librdkafka
unzip librdkafka-master.zip #解壓 cd librdkafka-master #進入安裝目錄 ./configure #配置 make && make install
4、安裝php-rdkafka
unzip php-rdkafka-master.zip
cd php-rdkafka-master
/usr/local/php/bin/phpize #安裝php擴充套件
./configure --with-php-config=/usr/local/php/bin/php-config --with-rdkafka #配置
make all -j 5
make install
#裝載擴充套件到配置
vim /usr/local/php/etc/php.ini
新增
extension = rdkafka.so
重啟apache
重啟php-fpm
5、開啟phpinfo出現下圖表示安裝成功
6、測試一下
/usr/local/php/bin/php -c \a
/usr/local/php/etc/php.ini -r "new Rdkafka\Conf();"
不報錯表示成功
7、生產者和消費者類
<?php /** * User: TCF_jingfeng * Date: 18-8-27 * Time: 上午9:09 */ class Kafka { // public $broker_list = 'localhost:9092';//配置kafka,可以用逗號隔開多個kafka // public $topic = 'test'; public $broker_list = '你自己的地址:9092';//配置kafka,可以用逗號隔開多個kafka public $topic='';//你自己的topic public $partition = 0; protected $producer = null; protected $consumer = null; public function __construct() { if (empty($this->broker_list)) { throw new InvalidConfigException("broker not config"); } $rk = new \RdKafka\Producer(); if (empty($rk)) { throw new InvalidConfigException("producer error"); } $rk->setLogLevel(LOG_DEBUG); if (!$rk->addBrokers($this->broker_list)) { throw new InvalidConfigException("producer error"); } $this->producer = $rk; } public function setTopic($mytopic){ $this->topic = $mytopic; } /** * 生產者 * @param array $messages * @return mixed */ public function send($messages = []) { $topic = $this->producer->newTopic($this->topic); return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages)); } /** * 消費者 */ public function consumer($object, $callback) { $conf = new \RdKafka\Conf(); $conf->set('group.id', 0); $conf->set('metadata.broker.list', $this->broker_list); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.offset.reset', 'smallest'); $conf->setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf); $consumer->subscribe([$this->topic]); echo "waiting for messages.....\n"; while (true) { $message = $consumer->consume(120 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo "message payload...."; $object->$callback($message->payload); break; } sleep(1); } } }
8、生產者程式碼
$producer = new Kafka();
$producer ->setTopic('errorlog');
$producer ->send("hello kafka");