本文介绍如何使用Ruby SDK通过接入点接入云消息队列 Kafka 版并收发消息。

环境准备

您已安装Ruby。更多信息,请参见安装Ruby

安装Ruby依赖库

执行以下命令安装Ruby依赖库。
gem install ruby-kafka -v 0.6.8

准备配置

  1. 可选:下载SSL根证书。如果是SSL接入点,需下载该证书。
  2. 访问Aliware-kafka-demos,单击download,下载Demo工程到本地并解压。
  3. 在解压的Demo工程找到kafka-ruby-demo文件夹,根据接入点类型打开对应的文件夹,配置producer.ruby文件和consumer.ruby文件。
    表 1. 配置项说明
    参数描述
    brokersSSL接入点。您可在云消息队列 Kafka 版控制台实例详情页面的接入点信息区域获取。
    topicTopic名称。您可在云消息队列 Kafka 版控制台Topic 管理页面获取。
    usernameSASL用户名。如果是默认接入点,则无此配置项。
    说明
    • 如果实例未开启ACL,您可以在云消息队列 Kafka 版控制台实例详情页面的配置信息区域获取默认的用户名密码
    • 如果实例已开启ACL,请确保要使用的SASL用户已被授予向云消息队列 Kafka 版实例收发消息的权限。具体操作,请参见SASL用户授权
    passwordSASL用户名密码。如果是默认接入点,则无此配置项。
    consumerGroupGroup名称。您可在云消息队列 Kafka 版控制台Group 管理页面获取。
  4. 配置完成后,将配置文件所在文件夹下的全部文件(如果是SSL接入点实例,包含证书SSL根证书文件),上传至服务器Ruby安装目录下。

发送消息

执行以下命令发送消息。

ruby producer.ruby

关于代码中配置项说明,请参见配置项说明

消息程序producer.ruby代码示例如下:
说明 示例代码为SSL接入点的代码。您需要根据实际接入点类型,删除或者修改配置项,其余代码请根据加粗代码注释修改。
# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new($stdout)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO

brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"

kafka = Kafka.new(
    seed_brokers: brokers,
    client_id: "sasl-producer",     #如果是默认接入点,取值需修改为“simple-producer”。
    logger: logger,
    # put "./cert.pem" to anywhere this can read
    #如果是默认接入点,删除以下三行代码。
    ssl_ca_cert: File.read('./cert.pem'),    
    sasl_plain_username: username,
    sasl_plain_password: password,
    )

producer = kafka.producer

begin
    $stdin.each_with_index do |line, index|

    producer.produce(line, topic: topic)

    producer.deliver_messages
end

ensure

    producer.deliver_messages

    producer.shutdown
end

订阅消息

执行以下命令消费消息。

ruby consumer.ruby

消息程序consumer.ruby示例代码如下:

关于代码中配置项说明,请参见配置项说明
说明 示例代码为SSL接入点的代码。您需要根据实际接入点类型,删除或者修改配置项,其余代码请根据加粗代码注释修改。
# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))

require "kafka"

logger = Logger.new(STDOUT)
#logger.level = Logger::DEBUG
logger.level = Logger::INFO

brokers = "xxx:xx,xxx:xx"
topic = "xxx"
username = "xxx"
password = "xxx"
consumerGroup = "xxx"

kafka = Kafka.new(
        seed_brokers: brokers,
        client_id: "sasl-consumer",    #如果是默认接入点,取值需修改为“test”
        socket_timeout: 20,
        logger: logger,
        # put "./cert.pem" to anywhere this can read
        #如果是默认接入点,删除以下三行代码。
        ssl_ca_cert: File.read('./cert.pem'),
        sasl_plain_username: username,
        sasl_plain_password: password,
        )

consumer = kafka.consumer(group_id: consumerGroup)
consumer.subscribe(topic, start_from_beginning: false)

trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }

begin
    consumer.each_message(max_bytes: 64 * 1024) do |message|
    logger.info("Get message: #{message.value}")
    end
rescue Kafka::ProcessingError => e
    warn "Got error: #{e.cause}"
    consumer.pause(e.topic, e.partition, timeout: 20)

    retry
end