1. 程式人生 > >Centos下PHP安裝Kafka擴充套件操作教程

Centos下PHP安裝Kafka擴充套件操作教程

說明:網上有好幾種PHP操作kafka的擴充套件,有kafka-php和php-rdkafka兩種是比較流行的。但其中kafka-php功能較全,但是使用composer安裝,對於內網使用者不能訪問外網,因此,我們選擇php-rdkafka。

php-rdkafka依賴librdkafka,先下載安裝包librdkafkaphp-rdkafka

1、新建一個目錄用來存在擴充套件包

mkdir /home/kafka

2、將兩個擴充套件包放到/home/kafka目錄下

3、安裝librdkafka

unzip librdkafka-master.zip #解壓
cd librdkafka-master    #進入安裝目錄
./configure    #配置
make && make install

4、安裝php-rdkafka

unzip php-rdkafka-master.zip
cd php-rdkafka-master
/usr/local/php/bin/phpize    #安裝php擴充套件
./configure --with-php-config=/usr/local/php/bin/php-config --with-rdkafka    #配置
make all -j 5
make install

#裝載擴充套件到配置
vim /usr/local/php/etc/php.ini 

新增
extension = rdkafka.so

重啟apache
重啟php-fpm

5、開啟phpinfo出現下圖表示安裝成功

6、測試一下

/usr/local/php/bin/php -c \a

/usr/local/php/etc/php.ini -r "new Rdkafka\Conf();"
不報錯表示成功

7、生產者和消費者類

<?php
/**
 * User: TCF_jingfeng
 * Date: 18-8-27
 * Time: 上午9:09
 */

class Kafka
{
//    public $broker_list = 'localhost:9092';//配置kafka,可以用逗號隔開多個kafka
//    public $topic = 'test';

    public $broker_list = '你自己的地址:9092';//配置kafka,可以用逗號隔開多個kafka
    public $topic='';//你自己的topic
    public $partition = 0;

    protected $producer = null;
    protected $consumer = null;

    public function __construct()
    {
        if (empty($this->broker_list)) {
            throw new InvalidConfigException("broker not config");
        }
        $rk = new \RdKafka\Producer();
        if (empty($rk)) {
            throw new InvalidConfigException("producer error");
        }
        $rk->setLogLevel(LOG_DEBUG);
        if (!$rk->addBrokers($this->broker_list)) {
            throw new InvalidConfigException("producer error");
        }
        $this->producer = $rk;
    }

    public function setTopic($mytopic){
        $this->topic = $mytopic;
    }
    /**
     * 生產者
     * @param array $messages
     * @return mixed
     */
    public function send($messages = [])
    {
        $topic = $this->producer->newTopic($this->topic);
        return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
    }

    /**
     * 消費者
     */
    public function consumer($object, $callback)
    {
        $conf = new \RdKafka\Conf();
        $conf->set('group.id', 0);
        $conf->set('metadata.broker.list', $this->broker_list);

        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('auto.offset.reset', 'smallest');

        $conf->setDefaultTopicConf($topicConf);

        $consumer = new \RdKafka\KafkaConsumer($conf);

        $consumer->subscribe([$this->topic]);

        echo "waiting for messages.....\n";
        while (true) {
            $message = $consumer->consume(120 * 1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    echo "message payload....";
                    $object->$callback($message->payload);
                    break;
            }
            sleep(1);
        }
    }
}

8、生產者程式碼

  $producer = new Kafka();
  $producer ->setTopic('errorlog');
  $producer ->send("hello kafka");