通过数据同步功能同步SLS至湖仓版(推荐)

更新时间:

SLS同步链路支持从指定时间位点将日志服务LogStore中的数据实时同步至云原生数据仓库 AnalyticDB MySQL 版集群,以满足近实时产出、全量历史归档、弹性分析等需求。本文介绍如何添加SLS数据源,新建SLS同步链路并启动任务,以及数据同步后如何进行数据分析和数据源管理。

前提条件

  • AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版

  • 已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组

  • 已创建AnalyticDB for MySQL集群的数据库账号。

  • 已开通日志服务,并在AnalyticDB for MySQL集群所在地域,创建日志服务Project和Logstore。详细信息,请参见快速入门

注意事项

目前AnalyticDB for MySQL集群中的一张表仅支持同步日志服务中的一个LogStore。

计费说明

通过AnalyticDB for MySQL数据迁移功能迁移数据至OSS会产生以下费用。

使用流程

配置RAM授权

跨账号同步SLS数据到AnalyticDB for MySQL时,您需要在源端创建RAM角色,并为RAM角色精确授权、修改RAM角色的信任策略。如果您仅同步当前账号下的SLS数据,可跳过该步骤,直接新建数据源,详情请参见新建数据源

  1. 创建RAM角色。具体操作,请参见创建阿里云账号的RAM角色

    说明

    配置选择信任的云账号参数时,选择其他云账号,填写AnalyticDB for MySQL集群所属的阿里云账号ID。您可以登录账号中心,在概览页面查看账号ID

  2. 为RAM角色授予AliyunAnalyticDBAccessingLogRolePolicy权限。具体操作,请参见为RAM角色精确授权

  3. 修改RAM角色的信任策略。具体操作,请参见修改RAM角色的信任策略

    {
      "Statement": [
        {
          "Action": "sts:AssumeRole",
          "Effect": "Allow",
          "Principal": {
            "RAM": [
                "acs:ram::<阿里云账号ID>:root"
            ],
            "Service": [
                "<阿里云账号ID>@ads.aliyuncs.com"
            ]
          }
        }
      ],
      "Version": "1"
    }
    说明

    阿里云账号ID为步骤1中填写的阿里云账号ID,配置时无需填写尖括号(<>)。

新建数据源

说明

如果您需要在已有的数据源管理任务中进行数据同步,可跳过该步骤,直接新建同步链路,详情请参见创建同步链路

  1. 登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。

  2. 在左侧导航栏,单击数据接入>数据源管理

  3. 单击右上角新建数据源

  4. 新建数据源页面进行参数配置。参数说明如下表所示:

    参数名称

    参数说明

    数据源类型

    选择数据源类型SLS

    数据源名称

    系统默认按数据源类型与当前时间生成名称,可按需修改。

    数据源描述

    数据源备注描述,例如湖仓应用场景、应用业务限制等。

    部署模式

    目前仅支持阿里云实例。

    SLS Project所在地域

    SLS Project所在地域。

    是否跨阿里云主账号

    SLS数据源支持跨阿里云账号同步SLS数据到AnalyticDB for MySQL

    • 不跨账号:同步当前账号下的SLS数据到AnalyticDB for MySQL

    • 跨账号:同步非当前账号下的SLS数据到AnalyticDB for MySQL。选择跨账号同步数据时,需要填写跨阿里云主账号跨阿里云主账号角色名

      说明
      • 跨阿里云主账号:源端所属的阿里云账号ID。

      • 跨阿里云主账号角色名:源端创建的RAM角色名。即步骤一中创建的RAM角色。

    SLS Project

    源端SLS的Project。

    重要

    SLS Project列表中会展示阿里云账号(主账号)与RAM用户(子账号)下所有的Project。若选择阿里云账号的Project,请确保RAM用户有该Project的权限,否则数据无法同步到中AnalyticDB for MySQL

    SLS Logstore

    源端SLS的Logstore。

  5. 参数配置完成后,单击创建

创建同步链路

  1. 在左侧导航栏,单击SLS/Kafka数据同步

  2. 在右上角,单击新建同步链路

  3. 新建同步链路页面,进行数据源的数据源及目标端配置目标库表配置同步配置

    • 数据源及目标端配置的参数说明如下:

      参数名称

      参数说明

      数据链路名称

      数据链路名称。系统默认按数据源类型与当前时间生成名称,可按需修改。

      数据源

      选择已有的SLS数据源,也可新建数据源。

      目标端类型

      目前仅支持数据湖-OSS存储

      OSS路径

      AnalyticDB for MySQL湖仓数据在OSS中的存储路径。

      重要
      • 展示的Bucket是与AnalyticDB for MySQL集群同地域的所有Bucket,您可以任意选择其中一个。请谨慎规划存储路径,创建后不允许修改。

      • 建议选择一个空目录,且不能与其他任务的OSS路径有相互前缀关系,防止数据覆盖。例如,两个数据同步任务的OSS路径分别为oss://adb_demo/test/sls1/和oss://adb_demo/test/,OSS路径有相互前缀关系,数据同步过程中会有数据覆盖。

    • 目标库表配置的参数说明如下:

      参数名称

      参数说明

      库名

      同步到AnalyticDB for MySQL的数据库名称。如果不存在同名数据库,将新建库;如果已存在同名数据库,数据会同步到已存在的数据库中。库名命名规则,详见使用限制

      表名

      同步到AnalyticDB for MySQL的表名称。如果库中不存在同名表,将新建表;如果库中已存在同名表,数据同步会失败。表名命名规则,详见使用限制

      Schema字段映射

      默认会从日志服务的投递任务配置中获取字段,如LogStore没有配置投递任务,会默认根据最近的日志数据获取字段。

      • 支持的数据类型:BOOLEAN、INT、BIGINT、FLOAT、DOUBLE、STRING。

      • 支持同步SLS保留字段,详情请参见保留字段

      重要
      • 暂不支持修改目标端字段名。

      • 若任务启动运行过(包含启动运行中和已启动运行完成),不支持修改已有列信息,但支持添加新列。若任务仅创建但未启动运行,则可正常修改。

      分区键设置

      为目标表设置分区键。建议按日志时间或者业务逻辑配置分区,以保证入湖与查询性能。如不设置,则目标表默认没有分区。

      目标端分区键的格式处理方法分为:时间格式化和指定分区字段。

      • 按日期时间分区,分区字段名请选择一个日期时间字段。格式处理方法选择时间格式化,选择源端字段格式和目标分区格式。AnalyticDB for MySQL会按源端字段格式识别分区字段的值,并将其转换为目标分区格式进行分区。例如,源字段为gmt_created,值为1711358834,源端字段格式为秒级精度时间戳,目标分区格式为yyyyMMdd,则会按20240325进行分区。

      • 按字段值分区,格式处理方法请选择指定分区字段。

    • 同步配置的参数说明如下:

      参数名称

      参数说明

      增量同步起始消费位点

      同步任务启动时会从选择的时间点开始消费SLS数据。取值说明:

      • 最早位点(begin_cursor):自动从SLS数据中最开始的时间点消费数据。

      • 最近位点(end_cursor):自动从SLS数据中最近的时间点获取数据。

      • 自定义点位:您可以选择任意一个时间点,系统则会从SLS中第一条大于等于该时间点的数据开始消费。

      Job型资源组

      指定任务运行的Job型资源组。

      增量同步所需ACU数

      指定任务运行的Job型资源组ACU数。最小ACU数为2,最大ACU数为Job型资源组可用计算最大资源数。建议多指定一些ACU数,可以提升入湖性能及任务稳定性。

      说明

      创建数据同步任务时,使用Job型资源组中的弹性资源。数据同步任务会长期占用资源,因此系统会从资源组中扣除该任务占用的资源。例如,Job型资源组的计算最大资源为48 ACU,已创建了一个8 ACU的同步任务,在该资源组中创建另一个同步任务时,可选的最大ACU数为40。

      高级配置

      高级配置可以让您对同步任务进行个性化的配置。如需进行个性化配置,请联系技术支持。

  4. 上述参数配置完成,单击提交

启动数据同步任务

  1. SLS/Kafka数据同步页面,选择创建成功的数据同步任务,在操作列单击启动

  2. 单击右上角查询,状态变为正在启动即数据同步任务启动成功。

数据分析

同步任务成功后,您可以通过Spark Jar开发对同步到AnalyticDB MySQL的数据进行分析。Spark开发的相关操作,请参见Spark开发编辑器Spark离线应用开发

  1. 在左侧导航栏,单击作业开发 > Spark Jar 开发

  2. 在默认模板中输入示例语句,并单击立即执行

    -- Here is just an example of SparkSQL. Modify the content and run your spark program.
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- Here are your sql statements
    show tables from lakehouse20220413156_adbTest;
  3. 可选:应用列表页签中,单击操作列的日志,查看Spark SQL运行的日志。

管理数据源

数据源管理页面,您可以在操作列执行以下操作。

操作按钮

说明

新建链路

快捷跳转到创建此数据源下的数据同步或数据迁移任务。

查看

查看数据源的详细配置。

编辑

编辑数据源属性,如更新数据源名称、描述等。

删除

删除当前数据源。

说明

当数据源下存在数据同步或数据迁移任务时,此数据源无法直接删除,需先在SLS/Kafka数据同步页面,单击目标同步任务操作列的删除,删除数据同步或数据迁移任务。