Log4j写入DataHub

DataHub已经完全兼容Kafka协议,您可以通过Log4j将日志信息写入到DataHub​

相关介绍

Topic类型

Kafka的Topic扩容方式和DataHub的topic扩容方式不同,为了适配Kafka的topic扩容方式,DataHub创建topic时需要将扩容方式选为扩展模式。扩展模式的topic,不再支持分裂/合并操作,而是添加shard的方式,暂不支持减少shard。

Topic命名

Kafka的Topic映射之后为DataHub的project+topic,project和topic以 “.”分割,例如:testproject.test_topic对应到DataHub中Project为test_project,Topic为test_topic,如果含有多个“.”,会以首个“.”分割Project和Topic,多余的“.”和”-“会被替换为““。

Partition

DataHub的每个处于Active状态shard对应Kafka的1个Partition,如果当前Active状态shard为5个,那么就可以视为Kafka有5个Partition,写入数据时,可以指定Partition范围为[0,4],如果不指定,则会由kafka客户端选择Partition。

Tuple Topic

Kafka的数据写入Tuple Topic时,Topic Schema必须为2列或1列,类型必须为STRING,其他情况会写入失败。如果为1列,则只写入value,key的数据将被丢弃,如果为2列,则第1列和第2列分别对应key和value。Tuple Topic写入二进制数据会存在乱码问题,二进制数据建议写入Blob Topic

Blob Topic

Kafka的数据写入Blob Topic时,会把Kafka数据的value写入Blob中,如果Kafka数据的key不为NULL,则会写入DataHub的Attribute,其中key为”kafka_key“,value为Kafka数据的key。

Header

Kafka的Header对应DataHub的Attribute,但是如果Kafka的Header的value为NULL,则会忽略掉对应的header。建议不要使用”kafka_key“作为Header的key。​

Kafka配置参数

C=Consumer, P=Producer, S=Streams

参数 C/P/S 可选配置 是否必须 描述
bootstrap.servers * 参考Kafka域名列表
security.protocol * SASL_SSL 为了保证数据传输的安全性,Kafka写入DataHub默认使用SSL加密传输
sasl.mechanism * PLAIN AK认证方式,仅支持PLAIN
compression.type P LZ4 是否开启压缩传输,目前仅支持LZ4
group.id C project.topic:subId 必须和订阅的topic保持一致,否则无法读取数据
partition.assignment.strategy C org.apache.kafka.clients.consumer.RangeAssignor Kafka默认为RangeAssignor,并且DataHub目前只支持RangeAssignor,请不要修改此配置
session.timeout.ms C/S [60000, 180000] kafka默认为10000, 但是因为DataHub限制最小为60000,所以这里默认会变为60000
heartbeat.interval.ms C/S 建议session.timeout.ms的 2/3 Kafka默认为3000,但是因为session.timeout.ms会被默认修改为60000,所以这里建议显示设置为40000,否则heartbeat请求会过于频繁
application.id S project.topic:subId 必须和订阅的topic保持一致,否则无法读取数据

以上是使用Kafka客户端写入DataHub需要重点关注的参数,对于等客户端相关的参数,行为没有变化,例如:retries,batch.size;对于服务端相关参数不会对服务端行为有改变,例如:无论acks的值为多少,DataHub默认数据完全写入成功之后才会返回。

Kafka域名列表

地区 Region 外网Endpoint 经典网络ECS Endpoint VPC ECS Endpoint
华东1(杭州) cn-hangzhou dh-cn-hangzhou.aliyuncs.com:9092 dh-cn-hangzhou.aliyun-inc.com:9093 dh-cn-hangzhou-int-vpc.aliyuncs.com:9094
华东2(上海) cn-shanghai dh-cn-shanghai.aliyuncs.com:9092 dh-cn-shanghai.aliyun-inc.com:9093 dh-cn-shanghai-int-vpc.aliyuncs.com:9094
华北2(北京) cn-beijing dh-cn-beijing.aliyuncs.com:9092 dh-cn-beijing.aliyun-inc.com:9093 dh-cn-beijing-int-vpc.aliyuncs.com:9094
华南1(深圳) cn-shenzhen dh-cn-shenzhen.aliyuncs.com:9092 dh-cn-shenzhen.aliyun-inc.com:9093 dh-cn-shenzhen-int-vpc.aliyuncs.com:9094
华北3(张家口) cn-zhangjiakou dh-cn-zhangjiakou.aliyuncs.com:9092 dh-cn-zhangjiakou.aliyun-inc.com:9093 dh-cn-zhangjiakou-int-vpc.aliyuncs.com:9094
亚太东南1(新加坡) ap-southeast-1 dh-ap-southeast-1.aliyuncs.com:9092 dh-ap-southeast-1.aliyun-inc.com:9093 dh-ap-southeast-1-int-vpc.aliyuncs.com:9094
亚太东南3(吉隆坡) ap-southeast-3 dh-ap-southeast-3.aliyuncs.com:9092 dh-ap-southeast-3.aliyun-inc.com:9093 dh-ap-southeast-3-int-vpc.aliyuncs.com:9094
亚太南部1(孟买) ap-south-1 dh-ap-south-1.aliyuncs.com:9092 dh-ap-south-1.aliyun-inc.com:9093 dh-ap-south-1-int-vpc.aliyuncs.com:9094
欧洲中部1(法兰克福) eu-central-1 dh-eu-central-1.aliyuncs.com:9092 dh-eu-central-1.aliyun-inc.com:9093 dh-eu-central-1-int-vpc.aliyuncs.com:9094
上海金融云 cn-shanghai-finance-1 dh-cn-shanghai-finance-1.aliyuncs.com:9092 dh-cn-shanghai-finance-1.aliyun-inc.com:9093 dh-cn-shanghai-finance-1-int-vpc.aliyuncs.com:9094

示例

1.生成kafka_client_producer_jaas.conf文件

创建文件kafka_client_producer_jaas.conf,保存到任意路径,文件内容如下。

  1. KafkaClient {
  2. org.apache.kafka.common.security.plain.PlainLoginModule required
  3. username="accessId"
  4. password="accessKey";
  5. };

2.引入maven依赖

Kafka-client版本至少大于等于0.10.0.0,推荐2.4.0

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>2.4.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-log4j-appender</artifactId>
  9. <version>2.4.0</version>
  10. </dependency>

3.日志生成程序示例

  1. import org.apache.log4j.Logger;
  2. public class TestLog4j2Kafka {
  3. private static Logger logger = Logger.getLogger(TestLog4j2Kafka.class);
  4. public static void main(String[] args){
  5. for(int i = 0;i <= 1000; i++) {
  6. logger.info("This is Message [" + i + "] from log4j producer .. ");
  7. }
  8. }
  9. }

4.log4j.properties文件配置示例

注意事项:Topic必须开启Shard扩展模式,否则无法写入

  1. log4j.rootLogger=info,console,KAFKA
  2. ## 不设置可能会出现死锁,具体详情请查看kafka文档说明:https://issues.apache.org/jira/browse/KAFKA-6415
  3. log4j.logger.org.apache.kafka.clients.Metadata = WARN
  4. ## appender KAFKA
  5. log4j.appender.KAFKA=org.apache.kafka.log4jappender.KafkaLog4jAppender
  6. ## 写入Topic名称,以projecttest_suyang,topictest_log4j为例
  7. log4j.appender.KAFKA.topic=test_suyang.test_log4j
  8. ## kafka域名,以华东1region为例
  9. log4j.appender.KAFKA.brokerList=dh-cn-hangzhou.aliyuncs.com:9092
  10. log4j.appender.KAFKA.compressionType=lz4
  11. log4j.appender.KAFKA.syncSend=true
  12. log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
  13. log4j.appender.KAFKA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n
  14. log4j.appender.KAFKA.saslKerberosServiceName =KAFKA
  15. log4j.appender.KAFKA.securityProtocol =SASL_SSL
  16. ## kafka_client_producer_jaas.conf文件路径
  17. log4j.appender.KAFKA.clientJaasConfPath=D:\\test\\kafka_client_producer_jaas.conf
  18. log4j.appender.KAFKA.saslMechanism=PLAIN
  19. ## appender console
  20. log4j.appender.console=org.apache.log4j.ConsoleAppender
  21. log4j.appender.console.target=System.err
  22. log4j.appender.console.layout=org.apache.log4j.PatternLayout
  23. log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n

5.启动示例日志程序,抽样查看数据

12