在DataWorks开放平台启用消息订阅后,您可通过Kafka SDK来订阅已配置的DataWorks的事件消息。本文将为您介绍在获取Kafka相关配置信息后,如何基于kafka SDK消费消息。

前提条件

使用限制

  • DataWorks的OpenEvent仅支持使用消息队列Kafka版的SSL接入点PLAIN机制进行收发消息。
  • DataWorks的OpenEvent仅支持2.2.0版本的消息队列Kafka版,建议客户端和服务端版本保持一致。

开发指引

DataWorks使用的消息队列Kafka版可以支持不同语言SDK开发消息收发程序,支持的SDK语言包括:java、python、C++、Go、PHP、Ruby、Node.js和C#,不同SDK的消息开发指引请参考下表。

注意
  • 开发消息订阅时,Topic、Consumer Group、用户及SSL接入点等配置信息可以在DataWorks控制台OpenEvent页面获取。
  • 配置依赖时,如果需要配置Kafka版本信息,请配置为2.2.0。
  • 开发消息订阅时,无需配置发送消息,其他准备配置和订阅消息,请根据各SDK要求进行操作。
SDK类型 网络环境 协议 端口 参考文档 Demo
Java 公网、VPC SASL_SSL 9093 SSL接入点PLAIN机制收发消息 SASL_SSL/PLAIN
python 公网、VPC SASL_SSL 9093 Python SDK收发消息 SASL_SSL/PLAIN
C++ 公网、VPC SASL_SSL 9093 C++ SDK收发消息 SASL_SSL/PLAIN
Go 公网、VPC SASL_SSL 9093 Go SDK收发消息 SASL_PLAIN
PHP 公网、VPC SASL_SSL 9093 PHP SDK收发消息 SASL_SSL/PLAIN
Ruby 公网、VPC SASL_SSL 9093 Ruby SDK收发消息 SASL_SSL/PLAIN
Node.js 公网、VPC SASL_SSL 9093 Node.js SDK收发消息 SASL_PLAIN
C# 公网、VPC SASL_SSL 9093 C# SDK收发消息

事件消息解析

DataWorks不同的事件类型发送的消息格式不同,您可以参考附录:消息格式了解不同事件消息的格式详情。

开放消息订阅代码示例

本示例以Java语言编写,您可以参考文档依据实际情况进行开发。

  1. 添加Pom依赖,Kafka依赖
     <!-- 添加KAFKA依赖-->
    <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.2.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-api</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    <!-- 添加API依赖-->
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>aliyun-java-sdk-core</artifactId>
      <version>4.5.1</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun</groupId>
      <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
      <version>3.4.14</version>
    </dependency>
  2. 创建JAAS配置文件kafka_client_jaas.conf。
    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="xxxx"
      password="xxxx";
    };  
  3. 创建消息队列Kafka版配置文件kafka.properties。
    ##SSL接入点,通过控制台获取。
    bootstrap.servers=xxxx
    ##Topic,通过控制台创建。
    topic=xxxx
    ##Group,通过控制台创建。
    group.id=xxxx
    ##SSL根证书。
    ssl.truststore.location=/xxxx/kafka.client.truststore.jks
    ##JAAS配置文件。
    java.security.auth.login.config=/xxxx/kafka_client_jaas.conf    

    您可以在管控台查看group.id、topic、SSL接入点信息、用户名和密码,如查看Kafka配置信息所示。bootstrap.servers来自管控台上给出的SSL接入点信息,目前只能使用这一种接入方式,请以管控台上的实际信息为准。

  4. 创建OpenEvent订阅代码示例。
    package com.aliyun.openservices.kafka.ons;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.config.SslConfigs;
    
    public class KafkaConsumerDemo {
    
        public static void main(String args[]) {
            //设置sasl文件的路径
            JavaKafkaConfigurer.configureSasl();
    
            //加载kafka.properties
            Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
    
            Properties props = new Properties();
            //设置接入点,请通过控制台获取对应Topic的接入点
            props.put("bootstrap.servers", kafkaProperties.getProperty("bootstrap.servers"));
            //设置SSL根证书的路径,请记得将XXX修改为自己的路径
            //与sasl路径类似,该文件也不能被打包到jar中
            props.put("ssl.truststore.location", kafkaProperties.getProperty("ssl.truststore.location"));
            //根证书store的密码,保持不变
            props.put("ssl.truststore.password", "KafkaOnsClient");
            //接入协议,目前支持使用SASL_SSL协议接入
            props.put("security.protocol", "SASL_SSL");
            //SASL鉴权方式,保持不变
            props.put("sasl.mechanism", "PLAIN");
            //两次poll之间的最大允许间隔
            //可更加实际拉去数据和客户的版本等设置此值,默认30s
            props.put("session.timeout.ms", 30000);
            //设置单次拉取的量,走公网访问时,该参数会有较大影响
            props.put("max.partition.fetch.bytes", 32000);
            props.put("fetch.max.bytes", 32000);
            //每次poll的最大数量
            //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿
            props.put("max.poll.records", 30);
            //消息的反序列化方式
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //当前消费实例所属的消费组,请在控制台申请之后填写
            //属于同一个组的消费实例,会负载消费消息
            props.put("group.id", kafkaProperties.getProperty("group.id"));
            //hostname校验改成空
            props.put("ssl.endpoint.identification.algorithm", "");
    
            //构造消息对象,也即生成一个消费实例
            KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
            //设置消费组订阅的Topic,可以订阅多个
            //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样
            List<String> subscribedTopics = new ArrayList<String>();
            //如果需要订阅多个Topic,则在这里add进去即可
            //每个Topic需要先在控制台进行创建
            subscribedTopics.add(kafkaProperties.getProperty("topic"));
            consumer.subscribe(subscribedTopics);
    
        }
    }