通过数据同步功能同步Kafka数据(推荐)
云原生数据仓库 AnalyticDB MySQL 版支持新建Kafka同步链路,通过同步链路从指定时间位点,实时同步Kafka中的数据入湖,以满足近实时产出、全量历史归档、弹性分析等需求。本文主要介绍如何添加Kafka数据源,新建Kafka同步链路并启动任务,以及数据同步后如何进行数据分析和数据源管理。
前提条件
- AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。 
- 已创建数据库账号。 - 如果是通过阿里云账号访问,只需创建高权限账号。 
- 如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。 
 
- 已创建云消息队列 Kafka 版(简称Kafka)实例,且与AnalyticDB for MySQL集群部署在相同地域。 
- 已创建Kafka Topic,并发送消息。详情请参见消息队列Kafka版快速入门操作流程。 
注意事项
- 仅支持同步JSON格式的Kafka数据。 
- Kafka中创建的Topic数据超过一定的时间会被自动清理,如果Topic数据过期,同时数据同步任务失败,重新启动同步任务时读取不到被清理掉的数据,会有丢失数据的风险。因此请适当调大Topic数据的生命周期,并在数据同步任务失败时及时联系技术支持。 
- 获取Kafka样例数据在大于8 KB的情况下,Kafka API会将数据进行截断,导致解析样例数据为JSON格式时失败,从而无法自动生成字段映射信息。 
- kafka源端表结构发生变化时,不会触发DDL自动变更,即变更不会同步至AnalyticDB for MySQL。 
- 数据入湖后,需执行Commit操作使写入的数据可见。为避免Commit操作间隔过小影响作业运行稳定性和读写性能,AnalyticDB for MySQL数据同步功能Commit操作间隔默认为5分钟。因此,当您首次创建并启动数据同步任务时,至少需等待5分钟,才可以查看第一批写入的数据。 
计费说明
通过AnalyticDB for MySQL数据迁移功能迁移数据至OSS会产生以下费用。
- AnalyticDB for MySQL的ACU弹性资源费用,计费项详情,请参见湖仓版计费项和企业版、基础版计费项。 
- OSS的存储费用、GET类请求次数以及PUT类和其他请求次数的费用。计费项详情,请参见计费概述。 
使用流程
新建数据源
如果您已添加Kafka数据源,可跳过该步骤,直接新建同步链路,详情请参见新建同步链路。
- 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,然后单击目标集群ID。 
- 在左侧导航栏,单击数据接入>数据源管理。 
- 单击右上角新建数据源。 
- 在新建数据源页面进行参数配置。参数说明如下表所示: - 参数名称 - 参数说明 - 数据源类型 - 选择数据源类型Kafka。 - 数据源名称 - 系统默认按数据源类型与当前时间生成名称,可按需修改。 - 数据源描述 - 数据源备注描述,例如湖仓应用场景、应用业务限制等。 - 部署模式 - 目前仅支持阿里云实例。 - Kafka实例 - Kafka实例ID。 - 登录云消息队列 Kafka 版控制台,在实例列表页面查看实例ID。 - Kafka Topic - 在Kafka中创建的Topic名称。 - 登录云消息队列 Kafka 版控制台,在目标实例的Topic 管理页面查看Topic名称。 - 消息数据格式 - Kafka消息数据格式,目前仅支持JSON。 
- 参数配置完成后,单击创建。 
新建同步链路
- 在左侧导航栏,单击SLS/Kafka数据同步。 
- 在右上角,单击新建同步链路。 
- 在新建同步链路页面,进行数据源的数据源及目标端配置、目标库表配置及同步配置。 - 数据源及目标端配置的参数说明如下: - 参数名称 - 参数说明 - 数据链路名称 - 数据链路名称。系统默认按数据源类型与当前时间生成名称,可按需修改。 - 数据源 - 选择已有的Kafka数据源,也可新建数据源。 - 目标端类型 - 支持如下选项: - 数据湖-用户OSS。 
- 数据湖-ADB湖存储(推荐)。 重要- 选择数据湖-ADB湖存储时,需开启湖存储功能。 
 - ADB湖存储 - AnalyticDB for MySQL湖数据所在湖存储名称。 - 在下拉列表中选择目标湖存储,若无已创建的湖存储,可单击下拉列表中的自动创建,自动创建湖存储。 重要- 当目标端类型选择数据湖-ADB湖存储时,填写该参数。 - OSS路径 - AnalyticDB for MySQL湖数据在OSS中的存储路径。 重要- 当目标端类型选择数据湖-用户OSS时,填写该参数。 
- 展示的Bucket是与AnalyticDB for MySQL集群同地域的所有Bucket,您可以任意选择其中一个。请谨慎规划存储路径,创建后不允许修改。 
- 建议选择一个空目录,且不能与其他任务的OSS路径有相互前缀关系,防止数据覆盖。例如,两个数据同步任务的OSS路径分别为 - oss://testBucketName/test/sls1/和- oss://testBucketName/test/,OSS路径有相互前缀关系,数据同步过程中会有数据覆盖。
 - 存储格式 - 数据存储格式。支持如下选项: - PAIMON。 重要- 仅目标端类型为数据湖-用户OSS时,支持该格式。 
- ICEBERG。 
 
- 目标库表配置参数说明如下: - 参数名称 - 参数说明 - 库名 - 同步到AnalyticDB for MySQL的数据库名称。如果不存在同名数据库,将新建库;如果已存在同名数据库,数据会同步到已存在的数据库中。库名命名规则,详见使用限制。 重要- 在数据源及目标端配置中,若存储格式为PAIMON,已有数据库需满足以下条件,否则数据同步任务会失败: - 必须是外部数据库,即建库语句必须为 - CREATE EXTERNAL DATABASE<数据库名>。
- 建库语句DBPROPERTIES参数中必须有 - catalog属性,且- catalog值必须为- paimon。
- 建库语句DBPROPERTIES参数必须有 - adb.paimon.warehouse属性。例如:- adb.paimon.warehouse=oss://testBucketName/aps/data。
- 建库语句DBPROPERTIES参数必须有 - LOCATION属性,且必须在数据库名称后面加- .db,否则XIHE查询会失败。例如:- LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/。- LOCATION配置的OSS路径,Bucket目录必须真实存在,否则建库会失败。
 - 表名 - 同步到AnalyticDB for MySQL的表名称。如果库中不存在同名表,将新建表;如果库中已存在同名表,数据同步会失败。表名命名规则,详见使用限制。 - 样例数据 - 自动从Kafka Topic中获取的最新数据作为样例数据。 说明- Kafka Topic中的数据需为JSON格式,若存在其他格式的数据,数据同步时会报错。 - JSON解析层级 - 设置JSON的嵌套解析层数,取值说明: - 0:不做解析。 
- 1(默认值):解析一层。 
- 2:解析两层。 
- 3:解析三层。 
- 4:解析四层。 
 - JSON的嵌套解析策略,请参见JSON解析层级和Schema字段推断示例。 - Schema字段映射 - 展示样例数据经过JSON解析后的Schema信息。可在此调整目标字段名,类型或按需增删字段等。 - 分区键设置 - 为目标表设置分区键。建议按日志时间或者业务逻辑配置分区,以保证入湖与查询性能。如不设置,则目标表默认没有分区。 - 目标端分区键的格式处理方法分为:时间格式化和指定分区字段。 - 按日期时间分区,分区字段名请选择一个日期时间字段。格式处理方法选择时间格式化,选择源端字段格式和目标分区格式。AnalyticDB for MySQL会按源端字段格式识别分区字段的值,并将其转换为目标分区格式进行分区。例如,源字段为gmt_created,值为1711358834,源端字段格式为秒级精度时间戳,目标分区格式为yyyyMMdd,则会按20240325进行分区。 
- 按字段值分区,格式处理方法请选择指定分区字段。 
 
- 同步配置的参数说明如下: - 参数名称 - 参数说明 - 增量同步起始消费位点 - 同步任务启动时会从选择的时间点开始消费Kafka数据。取值说明: - 最早位点(begin_cursor):自动从Kafka数据中最开始的时间点消费数据。 
- 最近位点(end_cursor):自动从Kafka数据中最近的时间点消费数据。 
- 自定义点位:您可以选择任意一个时间点,系统则会从Kafka中第一条大于等于该时间点的数据开始消费。 
 - Job型资源组 - 指定任务运行的Job型资源组。 - 增量同步所需ACU数 - 指定任务运行的Job型资源组ACU数。最小ACU数为2,最大ACU数为Job型资源组可用计算最大资源数。建议多指定一些ACU数,可以提升入湖性能及任务稳定性。 说明- 创建数据同步任务时,使用Job型资源组中的弹性资源。数据同步任务会长期占用资源,因此系统会从资源组中扣除该任务占用的资源。例如,Job型资源组的计算最大资源为48 ACU,已创建了一个8 ACU的同步任务,在该资源组中创建另一个同步任务时,可选的最大ACU数为40。 - 高级配置 - 高级配置可以让您对同步任务进行个性化的配置。如需进行个性化配置,请联系技术支持。 
 
- 上述参数配置完成后,单击提交。 
启动数据同步任务
- 在SLS/Kafka数据同步页面,选择创建成功的数据同步任务,在操作列单击启动。 
- 单击右上角查询,状态变为正在启动即数据同步任务启动成功。 
数据分析
同步任务成功后,您可以通过Spark Jar开发对同步到AnalyticDB for MySQL的数据进行分析。Spark开发的相关操作,请参见Spark开发编辑器和Spark离线应用开发。
- 在左侧导航栏,单击。 
- 在默认模板中输入示例语句,并单击立即执行。 - -- Here is just an example of SparkSQL. Modify the content and run your spark program. conf spark.driver.resourceSpec=medium; conf spark.executor.instances=2; conf spark.executor.resourceSpec=medium; conf spark.app.name=Spark SQL Test; conf spark.adb.connectors=oss; -- Here are your sql statements show tables from lakehouse20220413156_adbTest;
- 可选:在应用列表页签中,单击操作列的日志,查看Spark SQL运行的日志。 
管理数据源
在数据源管理页面,您可以在操作列执行以下操作。
| 操作按钮 | 说明 | 
| 新建链路 | 快捷跳转到创建此数据源下的数据同步或数据迁移任务。 | 
| 查看 | 查看数据源的详细配置。 | 
| 编辑 | 编辑数据源属性,如更新数据源名称、描述等。 | 
| 删除 | 删除当前数据源。 说明  当数据源下存在数据同步或数据迁移任务时,此数据源无法直接删除,需先在SLS/Kafka数据同步页面,单击目标同步任务操作列的删除,删除数据同步或数据迁移任务。 | 
JSON解析层级和Schema字段推断示例
解析层级指按相应层数解析出JSON中的字段。如果用户向Kafka发送的JSON数据如下。
{
  "name" : "zhangle",
  "age" : 18,
  "device" : {
    "os" : {
        "test":lag,
        "member":{
             "fa":zhangsan,
             "mo":limei
       }
     },
    "brand" : "none",
    "version" : "11.4.2"
  }
}JSON数据解析后,对应0~4层的效果如下。
0层解析
不做任何解析,直接输出原始JSON数据。
| JSON字段 | 值 | 目标字段名 | 
| __value__ | { "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }} | __value__ | 
1层解析
解析JSON的第一层字段。
| JSON字段 | 值 | 目标字段名 | 
| name | zhangle | name | 
| age | 18 | age | 
| device | { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" } | device | 
2层解析
解析JSON的第二层字段。如果字段没有嵌套则直接输出,例如name和age字段直接输出。如果字段中有嵌套,则输出其子层级字段,例如device字段有嵌套,因此输出其子层级device.os、device.brand和device.version。
由于目标字段名不支持“.”,因此会自动替换为“_”。
| JSON字段 | 值 | 目标字段名 | 
| name | zhangle | name | 
| age | 18 | age | 
| device.os | { "test":lag,"member":{ "fa":zhangsan,"mo":limei }} | device_os | 
| device.brand | none | device_brand | 
| device.version | 11.4.2 | device_version | 
3层解析
| JSON字段 | 值 | 目标字段名 | 
| name | zhangle | name | 
| age | 18 | age | 
| device.os.test | lag | device_os_test | 
| device.os.member | { "fa":zhangsan,"mo":limei } | device_os_member | 
| device.brand | none | device_brand | 
| device.version | 11.4.2 | device_version | 
4层解析
| JSON字段 | 值 | 目标字段名 | 
| name | zhangle | name | 
| age | 18 | age | 
| device.os.test | lag | device_os_test | 
| device.os.member.fa | zhangsan | device_os_member_fa | 
| device.os.member.mo | lime | device_os_member_mo | 
| device.brand | none | device_brand | 
| device.version | 11.4.2 | device_version |