在DataWorks开放平台启用消息订阅后,您可通过Kafka SDK来订阅已配置的DataWorks的事件消息。本文将为您介绍在获取Kafka相关配置信息后,如何基于kafka SDK消费消息。
前提条件
已开启消息订阅功能并完成相关配置,详情请参见开启消息订阅:DataWorks端(基于Kafka)。
已获取Kafka相关配置信息,详情请参见查看Kafka配置信息。
使用限制
DataWorks的OpenEvent仅支持使用云消息队列 Kafka 版的SSL接入点PLAIN机制进行收发消息。
DataWorks的OpenEvent仅支持2.2.0版本的云消息队列 Kafka 版,建议客户端和服务端版本保持一致。
开发指引
DataWorks使用的
云消息队列 Kafka 版可以支持不同语言SDK开发消息收发程序,支持的SDK语言包括:java、python、C++、Go、PHP、Ruby、Node.js和C#,不同SDK的消息开发指引请参考下表。
开发消息订阅时,Topic、Consumer Group、用户及SSL接入点等配置信息可以在DataWorks控制台的OpenEvent页面获取。
配置依赖时,如果需要配置Kafka版本信息,请配置为2.2.0。
开发消息订阅时,无需配置发送消息,其他准备配置和订阅消息,请根据各SDK要求进行操作。
SDK类型 | 网络环境 | 协议 | 端口 | 参考文档 |
Java | 公网、VPC | SASL_SSL | 9093 | |
python | 公网、VPC | SASL_SSL | 9093 | |
C++ | 公网、VPC | SASL_SSL | 9093 | |
Go | 公网、VPC | SASL_SSL | 9093 | |
PHP | 公网、VPC | SASL_SSL | 9093 | |
Ruby | 公网、VPC | SASL_SSL | 9093 | |
Node.js | 公网、VPC | SASL_SSL | 9093 | |
C# | 公网、VPC | SASL_SSL | 9093 |
事件消息解析
DataWorks不同的事件类型发送的消息格式不同,您可以参考附录:消息格式了解不同事件消息的格式详情。
开放消息订阅代码示例
本示例以Java语言编写,您可以参考文档依据实际情况进行开发。
添加Pom依赖,Kafka依赖
<!-- 添加KAFKA依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> </exclusions> </dependency> <!-- 添加API依赖--> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.1</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-dataworks-public</artifactId> <version>3.4.14</version> </dependency>
创建JAAS配置文件kafka_client_jaas.conf。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="xxxx"; };
创建
云消息队列 Kafka 版配置文件kafka.properties。
##SSL接入点,通过控制台获取。 bootstrap.servers=xxxx ##Topic,通过控制台创建。 topic=xxxx ##Group,通过控制台创建。 group.id=xxxx ##SSL根证书。 ssl.truststore.location=/xxxx/kafka.client.truststore.jks ##JAAS配置文件。 java.security.auth.login.config=/xxxx/kafka_client_jaas.conf
您可以在管控台查看group.id、topic、SSL接入点信息、用户名和密码,如查看Kafka配置信息所示。bootstrap.servers来自管控台上给出的SSL接入点信息,目前只能使用这一种接入方式,请以管控台上的实际信息为准。
创建OpenEvent订阅代码示例。
package com.aliyun.openservices.kafka.ons; import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; public class KafkaConsumerDemo { public static void main(String args[]) { //设置sasl文件的路径 JavaKafkaConfigurer.configureSasl(); //加载kafka.properties Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties(); Properties props = new Properties(); //设置接入点,请通过控制台获取对应Topic的接入点 props.put("bootstrap.servers", kafkaProperties.getProperty("bootstrap.servers")); //设置SSL根证书的路径,请记得将XXX修改为自己的路径 //与sasl路径类似,该文件也不能被打包到jar中 props.put("ssl.truststore.location", kafkaProperties.getProperty("ssl.truststore.location")); //根证书store的密码,保持不变 props.put("ssl.truststore.password", "KafkaOnsClient"); //接入协议,目前支持使用SASL_SSL协议接入 props.put("security.protocol", "SASL_SSL"); //SASL鉴权方式,保持不变 props.put("sasl.mechanism", "PLAIN"); //两次poll之间的最大允许间隔 //可更加实际拉去数据和客户的版本等设置此值,默认30s props.put("session.timeout.ms", 30000); //设置单次拉取的量,走公网访问时,该参数会有较大影响 props.put("max.partition.fetch.bytes", 32000); props.put("fetch.max.bytes", 32000); //每次poll的最大数量 //注意该值不要改得太大,如果poll太多数据,而不能在下次poll之前消费完,则会触发一次负载均衡,产生卡顿 props.put("max.poll.records", 30); //消息的反序列化方式 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //当前消费实例所属的消费组,请在控制台申请之后填写 //属于同一个组的消费实例,会负载消费消息 props.put("group.id", kafkaProperties.getProperty("group.id")); //hostname校验改成空 props.put("ssl.endpoint.identification.algorithm", ""); //构造消息对象,也即生成一个消费实例 KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props); //设置消费组订阅的Topic,可以订阅多个 //如果GROUP_ID_CONFIG是一样,则订阅的Topic也建议设置成一样 List<String> subscribedTopics = new ArrayList<String>(); //如果需要订阅多个Topic,则在这里add进去即可 //每个Topic需要先在控制台进行创建 subscribedTopics.add(kafkaProperties.getProperty("topic")); consumer.subscribe(subscribedTopics); } }