本文介绍如何在公网环境下使用Ruby SDK接入消息队列Kafka版的SSL接入点并使用PLAIN机制收发消息。

前提条件

您已安装Ruby。详情请参见安装Ruby

安装Ruby依赖库

执行以下命令安装Ruby依赖库。
gem install ruby-kafka -v 0.6.8

准备配置

发送消息

  1. 创建发送消息程序producer.rb
    # frozen_string_literal: true
    
    $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
    
    require "kafka"
    
    logger = Logger.new($stdout)
    #logger.level = Logger::DEBUG
    logger.level = Logger::INFO
    
    brokers = "xxx:xx,xxx:xx"
    topic = "xxx"
    username = "xxx"
    password = "xxx"
    
    kafka = Kafka.new(
        seed_brokers: brokers,
        client_id: "sasl-producer",
        logger: logger,
        # put "./cert.pem" to anywhere this can read
        ssl_ca_cert: File.read('./cert.pem'),
        sasl_plain_username: username,
        sasl_plain_password: password,
        )
    
    producer = kafka.producer
    
    begin
        $stdin.each_with_index do |line, index|
    
        producer.produce(line, topic: topic)
    
        producer.deliver_messages
    end
    
    ensure
    
        producer.deliver_messages
    
        producer.shutdown
    end
    参数 描述
    brokers SSL接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。
    topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。
    username 用户名。
    • 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面获取默认用户的用户名。
    • 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。详情请参见SASL用户授权
    password 密码。
    • 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面获取默认用户的密码。
    • 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。详情请参见SASL用户授权
  2. 执行以下命令发送消息。
    ruby producer.rb

订阅消息

  1. 创建订阅消息程序consumer.rb
    # frozen_string_literal: true
    
    $LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
    
    require "kafka"
    
    logger = Logger.new(STDOUT)
    #logger.level = Logger::DEBUG
    logger.level = Logger::INFO
    
    brokers = "xxx:xx,xxx:xx"
    topic = "xxx"
    username = "xxx"
    password = "xxx"
    consumerGroup = "xxx"
    
    kafka = Kafka.new(
            seed_brokers: brokers,
            client_id: "sasl-consumer",
            socket_timeout: 20,
            logger: logger,
            # put "./cert.pem" to anywhere this can read
            ssl_ca_cert: File.read('./cert.pem'),
            sasl_plain_username: username,
            sasl_plain_password: password,
            )
    
    consumer = kafka.consumer(group_id: consumerGroup)
    consumer.subscribe(topic, start_from_beginning: false)
    
    trap("TERM") { consumer.stop }
    trap("INT") { consumer.stop }
    
    begin
        consumer.each_message(max_bytes: 64 * 1024) do |message|
        logger.info("Get message: #{message.value}")
        end
    rescue Kafka::ProcessingError => e
        warn "Got error: #{e.cause}"
        consumer.pause(e.topic, e.partition, timeout: 20)
    
        retry
    end
    参数 描述
    brokers SSL接入点。您可在消息队列Kafka版控制台的实例详情页面的基本信息区域获取。
    topic Topic名称。您可在消息队列Kafka版控制台的Topic管理页面获取。
    username 用户名。
    • 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面获取默认用户的用户名。
    • 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。详情请参见SASL用户授权
    password 密码。
    • 如果实例未开启ACL,您可以在消息队列Kafka版控制台的实例详情页面获取默认用户的密码。
    • 如果实例已开启ACL,请确保要使用的SASL用户为PLAIN类型且已授权收发消息的权限。详情请参见SASL用户授权
    consumerGroup Consumer Group名称。您可在消息队列Kafka版控制台的Consumer Group管理页面获取。
  2. 执行以下命令消费消息。
    ruby consumer.rb