Kafka实时入湖

本文为您介绍如何在数据湖构建(Data Lake Formation,简称DLF)中使用Kafka实时入湖任务将数据同步到数据湖中。

重要

数据入湖功能已经停止更新,如果您有数据入湖需求请参考:数据入湖停止更新公告

前提条件

已开通数据湖构建服务,尚未开通请点击开通数据湖构建

操作步骤

  1. 登录数据湖构建控制台,选择数据入湖 > 入湖任务管理

  2. 单击新建入湖任务,选择“Kafka实时入湖”并单击下一步。

    新建Kafka实时入湖

  3. 配置数据源。

    • 数据源连接:Kafka数据源需要提前在数据源管理中创建,目前支持阿里云消息队列Kafka与EMR Kafka集群。

    • 订阅Topic:Kafka Topic的名称,一个入湖任务仅支持订阅一个Topic,不支持同时订阅多个Topic。

    Kafka实时入湖-配置数据源

  4. 数据预处理。目前Kafka入湖任务中需要通过定义预处理算子的方式,对Kafka Topic中的消息进行解析、过滤等处理。详情请参见数据预处理

    Kafka实时入湖-数据预处理

  5. 配置目标数据湖信息。配置项包括目标数据库、目标数据表名称、存储格式、数据湖存储位置等。

    Kafka实时入湖-配置目标数据湖信息

  6. 配置任务信息。配置项包括任务实例名称、RAM角色、最大资源使用量等。

    Kafka实时入湖-配置任务信息

数据预处理

Kafka实时入湖提供了数据预处理功能对Kafka数据在入湖前进行处理,目前需要用户自定义预处理算子实现。

Kafka默认字段列表

在使用数据预处理功能之前,我们需要清楚Kafka入湖过程中目标schema包含哪些字段,字段列表如下。

字段

类型

说明

key

binary

Kafka消息key

value

binary

Kafka消息体

topic

string

Kafka topic

partition

int

Kafka分区值

offset

long

Kafka消息偏移量

timestamp

timestamp

时间戳

timestampType

int

时间戳类型

数据预处理含义

数据预处理是Kafka实时入湖提供的用于对入湖数据预先处理的功能。数据预处理支持使用Spark SQL函数定义预处理算子,目前支持map与filter两种算子。定义预处理算子需要注意以下几点:

  • 第一个预处理算子需要基于Kafka入湖的schema来编写,请参考如上字段列表。

  • 每一个预处理算子的处理逻辑可以看作一个SQL的子查询。算子按照定义顺序执行,后面算子需要基于前面算子的输出字段来编写SQL函数。

  • map算子类似SQL的select操作,由若干个SQL表达式逗号隔开组成,每个表达式必须包含通过as指定表达式别名。filter算子类似SQL的where语句,对前面的算子输出的字段进行过滤。

数据预处理示例

您可以参考以下示例编写自己的预处理算子。

  • 提取Kafka消息体与时间戳字段。通过定义一个map算子实现,表达式参考如下。

    cast(value as string) as content, from_unixtime(cast(timestamp as bigint), 'yyyy-MM-dd') as dt
  • 展开标准JSON格式日志数据。通过定义一个map算子实现,表达式参考如下。

    get_json_object(cast(value as string), '$.id') as id, get_json_object(cast(value as string), '$.eventType') as eventType, get_json_object(cast(value as string), '$.bizCode') as bizCode
  • 过滤ID字段值大于1000的数据。通过一个filter算子实现,表达式参考如下。

    id > 1000