本文介绍如何使用PHP SDK通过接入点接入消息队列Kafka版并收发消息。

环境准备

安装C++依赖库

  1. 执行以下命令切换到yum源配置目录/etc/yum.repos.d/
    cd /etc/yum.repos.d/
  2. 创建yum源配置文件confluent.repo
    [Confluent.dist]
    name=Confluent repository (dist)
    baseurl=https://packages.confluent.io/rpm/5.1/7
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
    
    [Confluent]
    name=Confluent repository
    baseurl=https://packages.confluent.io/rpm/5.1
    gpgcheck=1
    gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
    enabled=1
  3. 执行以下命令安装C++依赖库。
    yum install librdkafka-devel

安装PHP依赖库

  1. 执行以下命令安装PHP依赖库。
    pecl install rdkafka
  2. 在PHP的初始化文件php.ini中添加以下一行语句以开启扩展。
    extension=rdkafka.so

准备配置

  1. 可选:下载SSL根证书。如果是SSL接入点,需下载该证书。
  2. 访问Aliware-kafka-demos,单击download,下载Demo工程到本地并解压。
  3. 在解压的Demo工程找到kafka-php-demo文件夹,根据接入点类型打开对应的文件夹,配置setting.php文件。
    <?php
    
    return [
        'sasl_plain_username' => 'xxx',
        'sasl_plain_password' => 'xxx',
        'bootstrap_servers' => "xxx:xx,xxx:xx",
        'topic_name' => 'xxx',
        'consumer_id' => 'xxx'
    ];
    参数 描述
    sasl_plain_username SASL用户名。如果是默认接入点,则无此配置项。
    说明
    • 如果实例未开启ACL,您可以在消息队列Kafka版控制台实例详情页面的配置信息区域获取默认的用户名密码
    • 如果实例已开启ACL,请确保要使用的SASL用户已被授予向消息队列Kafka版实例收发消息的权限。具体操作,请参见SASL用户授权
    sasl_plain_password SASL用户名密码。如果是默认接入点,则无此配置项。
    bootstrap_servers SSL接入点。您可在消息队列Kafka版控制台实例详情页面的接入点信息区域获取。
    topic_name Topic名称。您可在消息队列Kafka版控制台Topic 管理页面获取。
    consumer_id Group名称。您可在消息队列Kafka版控制台Group 管理页面获取。
  4. 配置完成后,将配置文件所在文件夹下的全部文件(如果是SSL接入点实例,包含证书SSL根证书文件)上传至服务器PHP安装目录下。

发送消息

执行以下命令发送消息。

php kafka-producer.php
消息程序kafka-producer.php示例代码如下:
说明 示例代码为SSL接入点的代码。如果是默认接入点,无SASL相关代码,即删除如下代码中包含sasl.ssl.相关的代码即可。
<?php

$setting = require __DIR__ . '/setting.php';

$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');
$conf->set('message.send.max.retries', 5);
$rk = new RdKafka\Producer($conf);
# if want to debug, set log level to LOG_DEBUG
$rk->setLogLevel(LOG_INFO);
$rk->addBrokers($setting['bootstrap_servers']);
$topic = $rk->newTopic($setting['topic_name']);
$a = $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message hello kafka");
$rk->poll(0);
while ($rk->getOutQLen() > 0) {
    $rk->poll(50);
}
echo "send succ" . PHP_EOL;

代码示例详情,请参见php-rdkafka

订阅消息

执行如下命令订阅消息。

php kafka-consumer.php
消息程序kafka-consumer.php示例代码如下:
说明 示例代码为SSL接入点的代码。如果是默认接入点,无SASL相关代码,即删除如下代码中包含sasl.ssl.相关的代码即可。
<?php
$setting = require __DIR__ . '/setting.php';
$conf = new RdKafka\Conf();
$conf->set('sasl.mechanisms', 'PLAIN');
$conf->set('api.version.request', 'true');
$conf->set('sasl.username', $setting['sasl_plain_username']);
$conf->set('sasl.password', $setting['sasl_plain_password']);
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('ssl.ca.location', __DIR__ . '/ca-cert.pem');

$conf->set('session.timeout.ms', 10000);

$conf->set('request.timeout.ms', 305000);

$conf->set('group.id', $setting['consumer_id']);

$conf->set('metadata.broker.list', $setting['bootstrap_servers']);

$topicConf = new RdKafka\TopicConf();

$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

$consumer->subscribe([$setting['topic_name']]);

echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";

while (true) {
    $message = $consumer->consume(30 * 1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

?>

代码示例详情,请参见php-rdkafka