本文中含有需要您注意的重要提示信息,忽略该信息可能对您的业务造成影响,请务必仔细阅读。
本文介绍如何将Kafka数据导入到日志服务,实现数据的查询分析、加工等操作。
前提条件
- 已有可用的Kafka集群。 
- 已创建Project和Logstore。具体操作,请参见创建项目Project和创建Logstore。 
版本说明
目前,只支持Kafka 2.0.0及以上版本。
创建数据导入配置
- 登录日志服务控制台。 
- 在接入数据区域的数据导入页签中,单击Kafka-数据导入。 
- 选择目标Project和Logstore,单击下一步。
- 设置导入配置。 - 在导入设置步骤中,配置如下参数。 - 参数 - 说明 - 任务名称 - SLS任务的唯一名称。 - 显示名称 - 任务显示名称。 - 任务描述 - 导入任务的描述。 - 服务地址 - Kafka bootstrap Servers地址。多个服务地址之间使用半角逗号(,)分隔。 - 如果是阿里云云消息队列 Kafka 版,需输入接入点的IP地址或域名。 
- 如果是阿里云ECS上自建的Kafka集群,需输入ECS实例的IP地址。 
- 如果是其他的Kafka集群,需输入Kafka Broker的公网IP地址或域名。 
 - Topic列表 - Kafka主题。多个主题之间使用半角逗号(,)分隔。 - 消费组 - 如果您使用的是阿里云云消息队列 Kafka 版,且未开启自由使用Group功能,则需要选择对应的消费组。创建消费组的具体操作,请参见创建消费组。 - 起始位置 - 开始导入数据的位置。 - 最早:从现有的第一条Kafka数据开始导入。 
- 最晚:从最新生成的Kafka数据开始导入。 
 - 数据格式 - 待导入数据的格式。 - 极简模式:如果待导入的数据为单行格式,您可以选择极简模式。 
- JSON字符串:如果待导入的数据为JSON,您可以选择JSON字符串。导入任务会将数据解析为键值对格式,只解析到第一层。 
 - 解析数组元素 - 打开解析数组元素开关后,对于JSON数组格式的数据,系统会按其数组元素拆分为多条数据后进行导入。 - 编码格式 - 待导入数据的编码格式(即字符集),目前支持UTF-8和GBK。 - VPC实例ID - 如果Kafka集群是VPC环境下的阿里云云消息队列 Kafka 版或阿里云ECS上自建的Kafka集群,您可以通过设置VPC实例ID,实现日志服务通过阿里云内网读取Kafka集群的数据。 - 通过阿里云内网读取数据,具备更好的安全性和网络稳定性。 重要- Kafka集群需允许被IP网段100.104.0.0/16访问。 - 时间配置 - 时间字段 - 设置为Kafka数据中代表时间的列名,用于指定数据导入日志服务时的时间。 - 提取时间正则 - 如果您选择的数据格式为极简模式,您需要设置正则表达式提取Kafka数据中的时间。 - 例如,数据内容为 - message with time 2022-08-08 14:20:20,则您可以设置提取时间正则为- \d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d。- 时间字段格式 - 指定时间格式,用于解析时间字段的值。 - 支持Java SimpleDateFormat语法的时间格式,例如yyyy-MM-dd HH:mm:ss。时间格式的语法详情请参见Class SimpleDateFormat。常见的时间格式请参见时间格式。 
- 支持epoch格式,可选值为epoch、epochMillis、epochMacro或epochNano。 
 - 时间字段时区 - 选择时间字段对应的时区。 - 当时间格式是epoch时,不需要设置时区。 - 默认时间来源 - 当没有提供时间提取信息或者时间提取失败时,使用您所设置的时间来源,包括系统当前时间和kafka消息时间戳。 - 高级配置 - 日志上下文 - 打开日志上下文开关后,支持日志服务的上下文查询功能。您可以查看目标数据在原始Kafka partition中的前若干条(上文)或后若干条(下文)数据。 - 通信协议 - 通过公网导入时,建议通过加密连接、用户认证的方式进行导入,即在此处定义连接Kafka集群的通信协议信息,配置示例如下所示。 - protocol字段的可选值为plaintext、ssl、sasl_plaintext或sasl_ssl。建议设置为sasl_ssl,此协议需要连接加密和用户认证。 - 设置protocol为sasl_plaintext或sasl_ssl时,需设置sasl节点。其中,mechanism字段可以为PLAIN、SCRAM-SHA-256或SCRAM-SHA-512,表示用户名/密码身份验证机制。 - { "protocol":"sasl_plaintext", "sasl":{ "mechanism":"PLAIN", "username":"xxx", "password":"yyy" } }- 私网域名解析 - 部署在阿里云ECS上的Kafka Broker之间采用内部域名通信时,您需要在此处指定每个Broker对应的ECS域名和IP地址。配置示例如下所示。 - { "hostname#1":"192.168.XX.XX", "hostname#2":"192.168.XX.XX", "hostname#3":"192.168.XX.XX" }
- 单击预览,预览导入结果。 
- 确认无误后,单击下一步。 
 
- 创建索引和预览数据,然后单击下一步。日志服务默认开启全文索引。您也可以根据采集到的日志,手动创建字段索引,或者单击自动生成索引,日志服务将自动生成字段索引。更多信息,请参见创建索引。 重要- 如果需要查询日志中的所有字段,建议使用全文索引。如果只需查询部分字段、建议使用字段索引,减少索引流量。如果需要对字段进行分析(SELECT语句),必须创建字段索引。 
- 单击查询日志,进入查询和分析页面,确认是否成功导入Kafka数据。 - 等待1分钟左右,如果有目标Kafka数据导入,则说明导入成功。 
查看导入配置
创建导入配置成功后,您可以在控制台中查看已创建的导入配置及生成的统计报表。
- 单击目标Project。 
- 选择目标日志库下的,单击配置名称。 
- 在导入配置概览页面,查看导入配置的基本信息和统计报表。  
相关操作
在导入配置概览页面,您还可以进行如下操作。
- 修改配置 - 单击修改配置,修改导入配置的相关配置。具体配置,请参见创建数据导入配置。 
- 删除配置 - 单击删除配置,删除该导入配置。 警告- 删除后不可恢复,请谨慎操作。 
- 停止任务 - 单击停止,停止该导入任务。 
常见问题
| 问题 | 可能原因 | 解决方法 | 
| 预览时出现Kafka Broker连接错误(Broker transport failure)。 | 
 | 
 | 
| 预览时出现超时错误(preview request timed out)。 | 待导入的Kafka Topic中没有数据。 | 如果待导入的Kafka Topic中没有数据,请在写入数据后,再重试预览。 | 
| 数据存在乱码。 | 编码格式配置不符合预期。 | 根据Kafka真实的编码格式更新导入配置。 如果需要修复已有的乱码数据,请创建新的Logstore和导入配置。 | 
| 日志服务中显示的数据时间和数据本身的时间不一致。 | 设置导入配置时,没有指定日志时间字段或者设置时间格式、时区有误。 | 设置指定的日志时间字段以及正确的时间格式和时区。更多信息,请参见创建数据导入配置。 | 
| 导入数据后,无法查询和分析数据。 | 
 | |
| 导入的数据条目数量少于预期。 | 存在大于3 MB的Kafka数据,您可以通过数据处理流量观测仪表盘确认。 | 缩小单条Kafka消息的大小。 | 
| 数据导入时存在明显的延迟 | 
 | 
 | 
错误处理机制
| 限制项 | 说明 | 
| 网络连接错误 | 导入任务会定期重试,即网络连接恢复后,导入任务会自动从之前中断的Offset位置继续消费数据。 | 
| Kafka Topic不存在 | 当目标Kafka Topic不存在时,导入任务会跳过该Topic,且不影响其他正常的Topic的数据导入。 当不存在的Topic被重建后,导入任务会正常消费该Topic中的数据(存在约10分钟的延迟)。 |