本文介绍如何使用Kafka数据源进行数据分析或者交互式开发。

建表语法

CREATE TABLE tbName[(columnName dataType [,columnName dataType]*)]
USING kafka
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);

配置参数说明

参数 描述 是否必选
subscribe 关联的Kafka Topic名称。
kafka.bootstrap.servers Kafka集群连接地址。

Kafka开启SASL认证

  1. 开通SASL权限,具体操作请参见SASL用户授权
    重要 Spark2 Kafka DataSource中不支持设置Kafka的group.id,streaming在消费Kafka数据的时候,每个Streaming作业会自动生成以"spark-kafka-source"为前缀的group.id。因此需要在Kafka服务端开通对该Group的权限,具体如下截图,否则会报类似错误org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-58002817-a288-4b72-bc60-0b109506f683-1465208736-driver-2
    group
  2. 在Streaming SQL中添加以下参数来配置安全认证方式。
    参数 描述
    kafka.bootstrap.servers 参数值格式为xxx:9094

    例如,alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094。

    kafka.sasl.mechanism 固定值为PLAIN。
    kafka.security.protocol 固定值为SASL_PLAINTEXT。
    kafka.sasl.jaas.config 参数值格式为org.apache.kafka.common.security.plain.PlainLoginModule required username="xx" password="xx";

    其中,username和password是在Kafka服务端开通sasl用户填写的用户名和密码。

  3. 建表示例。
    CREATE TEMPORARY TABLE example
     USING kafka
     OPTIONS(
      `subscribe` 'topic',
      `serialization.format` '1',
      `kafka.sasl.mechanism` 'PLAIN',
      `kafka.security.protocol` 'SASL_PLAINTEXT',
      `kafka.sasl.jaas.config` 'org.apache.kafka.common.security.plain.PlainLoginModule required username="test" password="test";',
      `kafka.bootstrap.servers` 'alikafka-pre-cn-002-1-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-2-vpc.alikafka.aliyuncs.com:9094,alikafka-pre-cn-002-3-vpc.alikafka.aliyuncs.com:9094',
     );

Kafka offset消费延迟监控

由于Streaming SQL底层使用Structured Streaming SQL消费Kafka作业,默认情况下不会提交当前消费的offset到Kafka Server上,详情请参见Structured Streaming + Kafka Integration Guide

Streaming作业运行之后,会自动在/mnt/disk1/log/spark-streaming下生成prometheus支持的*.prom文件,每个applicationid生成一个,文件名称格式是application_name -applicationId,Streaming作业完成或者执行kill -15命令的时候会自动删除该文件。您可以通过以下指标监控当前作业消费的进度延迟情况。

指标分类 指标名称 指标说明
数据量 num_input_rows 每个batch输入数据量。
input_rows_per_second 每个batch输入速率。
处理速率 processed_rows_per_second 每个batch处理的行数。
计算延迟指标 durationMs_triggerExecution 每个batch总的处理时长。
durationMs_setOffsetRange 每个batch要计算的offset范围处理时长。
durationMs_wal_commit 每个batch写入offset到checkpoint日志处理时长。
durationMs_getEndOffset 每个batch获取endoffset的处理时长。
durationMs_query_planning 每个batch增量查询计划处理时间。
durationMs_get_batch 每个batch获取source端数据处理时间。
durationMs_add_batch 每个batch写入sink端处理时间。
offset延迟指标 lag 每个batch当前streaming的已经消费的最新endoffset和kafka当前的latest offset做差值,指标粒度是topic+partition。
streaming_endoffset 当前streaming的消费的最新endoffset。
kafka_endoffset kafka当前的latest offset。