本文档主要介绍了基于DLA和DTS同步RDS数据来构建实时数据湖的方法。

背景信息

实时数据湖方案是新一代低成本、低延时的入库建仓方案,支持CRUD的大规模分析数据集。实时数据湖方案架构如下图所示:实时数据湖

前提条件

操作步骤

  1. 创建Spark集群环境。
    1. 登录Data Lake Analytics管理控制台,新建虚拟集群。具体操作请参见创建虚拟集群
    2. 配置DLA虚拟集群与RDS、DTS所在的VPC网络,确保可以通过DLA的Spark引擎去消费VPC网络数据。具体操作请参见配置数据源网络
  2. 创建全量同步作业。
    1. 在左侧导航栏单击Serverless Spark > 作业管理
    2. 作业编辑页面,单击创建作业创建一个Spark全量同步作业,全量同步RDS的数据。具体操作请参见创建和执行Spark作业
      Spark作业
    3. 全量同步作业创建成功后,在作业列表中选中对应的作业,参考以下示例配置Spark作业。
      {
          "args": [
                  "oss://<你的oss路径>/dla-lakehouse-batch-jdbc-config.properties"
          ],
        "file": "oss://<你的oss路径>/datalake-batch-1.0.0-shaded.jar",
        "name": "dla-lakehouse-batch", //名称,可以自定义
        "className": "com.aliyun.dla.batch.jdbc.SparkHudiJdbc",
        "conf": {
          "spark.driver.resourceSpec": "large", //规格有small(1cu)、medium(2cu)、large(4cu)/xlarge(8cu)/2xlarge(16cu),每个cu是1Core4GB
          "spark.executor.resourceSpec": "medium",  //同上
          "spark.executor.instances": 5,  //具体executor的个数;driver只有一个
          "spark.dla.eni.enable": "true",  //通过ENI打通客户的数据VPC
          "spark.dla.eni.vswitch.id": "vsw-xxx",  //用户数据VPC的vswitchId
          "spark.dla.eni.security.group.id": "sg-xxx", //用户数据VPC的安全组id
          "spark.dla.job.log.oss.uri": "oss://<你的oss路径>/spark-logs/", //为了存spark日志
          "spark.sql.hive.metastore.version": "dla", //表示自动创建dla的元信息,后续可以直接用dla来分析
          "spark.dla.connectors": "oss"    //支持读写用户的oss
        }
      }
      说明 datalake-batch-1.0.0-shaded.jar文件的下载地址
      dla-lakehouse-batch-jdbc-config.properties的模板文件内容请参考以下示例:
      ### jdbc源库配置
      ## 源库用户名
      dla.datalake.batch.jdbc.username=<JDBC_USERNAME>
      ## 源库密码
      dla.datalake.batch.jdbc.password=<JDBC_PASSWORD>
      ## 源库jdbc url链接
      dla.datalake.batch.jdbc.url=<JDBC_URL>
      ## 源库名(英文逗号分隔)
      dla.datalake.batch.jdbc.db.name=DBNAME
      
      ## 同步数据到OSS的根目录,构建对应的lakehouse
      dla.datalake.hoodie.target.base.path=oss://lakehouse/
      
      ## jdbc同步模式,填写固定database即可
      dla.datalake.batch.jdbc.sync.mode=database
    4. Spark作业配置完成后,单击执行全量同步作业。
  3. 创建增量同步作业。
    全量同步作业执行完成后,参考步骤2创建一个Spark增量同步作业,并参考以下示例配置Spark作业来持续增量同步DTS数据。
    {
        "args": [
                   "oss://<你的oss路径>/dla-lakehouse-streaming-dts-config.properties"
        ],
        "file": "oss://<你的oss路径>/datalake-streaming-1.0.0-shaded.jar",
        "name": "dla-lakehouse-streaming",  //name可以自定义
        "className": "com.aliyun.dla.streaming.dts.SparkHudiDts",
        "conf": {
              "spark.driver.resourceSpec": "large", //规格有small(1cu)、medium(2cu)、large(4cu)/xlarge(8cu)/2xlarge(16cu),每个cu是1Core4GB
                "spark.executor.resourceSpec": "medium",  //同上
                "spark.executor.instances": 20,  //具体executor的个数;driver只有一个
                "spark.dla.eni.enable": "true",  //通过ENI打通客户的数据VPC
                "spark.dla.eni.vswitch.id": "vsw-xxx",  //用户数据VPC的vswitchId
                "spark.dla.eni.security.group.id": "sg-xxx", //用户数据VPC的安全组id
                "spark.dla.job.log.oss.uri": "oss://<你的oss路径>/spark-logs/", //存储spark日志的oss路径
                "spark.sql.hive.metastore.version": "dla", //表示自动创建dla的元信息,后续可以直接用dla来分析
            "spark.dla.connectors": "oss"    //支持读写用户的oss
        }
    }
    说明 datalake-streaming-1.0.0-shaded.jar文件的下载地址
    dla-lakehouse-streaming-dts-config.properties的模板文件内容请参考以下示例:
    ### dts config
    ## dts订阅用户名
    dla.datalake.streaming.dts.username=DTS_USERNAME
    ## dts订阅密码
    dla.datalake.streaming.dts.password=DTS_PASSWORD
    ## dts消费位点(可选latest,earliest 或自定义offset),默认为latest
    dla.datalake.streaming.dts.offset=latest
    ## 按照指定时间位点开始消费,单位为秒(1608523200对应2020/12/21 12:00:00),需设置为增量同步的开始时间
    dla.datalake.streaming.dts.offset.by.timestamp=1608523200
    ## dts的消费组id(请在dts控制台上查看该配置)
    dla.datalake.streaming.dts.group.id=dtsxxxxx
    ## dts服务端地址(请在dts控制台上查看该配置)
    dla.datalake.streaming.dts.bootstrap.server=dts-xxx-vpc.aliyuncs.com:18003
    ## dts的订阅topic(请在dts控制台上查看该配置)
    dla.datalake.streaming.dts.subscribe.topic=cn_hangzhou_xxx
    
    ## checkpoint地址,用于保证数据不丢失
    dla.datalake.streaming.dts.checkpoint.location=oss://<你的oss路径>/checkpoint/
    
    ## 同步数据到OSS的根目录,需与全量同步路径设置一致
    dla.datalake.hoodie.target.base.path=oss://lakehouse/

dla-lakehouse-streaming-dts-config.properties文件的全量配置项说明

下面的配置项,可选项请根据您的业务实际情况进行添加,必填项则必须在dla-lakehouse-streaming-dts-config.properties文件中进行配置。

DTS配置
配置项 是否必填 说明
dla.datalake.streaming.dts.username 订阅DTS的用户名,可登录数据传输控制台 在订阅任务中的订阅配置菜单查看。
dla.datalake.streaming.dts.password 订阅DTS的密码,可登录数据传输控制台 在订阅任务中的订阅配置菜单查看。
dla.datalake.streaming.dts.offset 消费位点,默认为latest。
dla.datalake.streaming.dts.group.id 消费组ID,可登录数据传输控制台 在订阅任务中的数据消费菜单查看。
dla.datalake.streaming.dts.bootstrap.server 订阅DTS服务器地址,可登录数据传输控制台 在订阅任务中的订阅配置菜单查看。
dla.datalake.streaming.dts.max.offsets.per.trigger 每次同步处理多少条数据,默认10000。
dla.datalake.streaming.dts.subscribe.topic 订阅topic,可登录数据传输控制台 在订阅任务中的数据消费菜单查看。
dla.datalake.streaming.dts.processing.time.interval 每次同步的时间间隔,默认3s。
dla.datalake.streaming.dts.checkpoint.location checkpoint路径,默认为/tmp/sparkstreaming/checkpoint/
dla.datalake.streaming.dts.db.tables 过滤dts中指定库表,格式为db1:table1;db2:table2。
dla.datalake.streaming.dts.concurrent.table.write.enable 是否开启多张表并发写入,默认为true。
dla.datalake.streaming.dts.concurrent.table.write.thread.pool.size 多张表并发写入线程池大小,默认为10。
DLA配置
配置项 是否必填 说明
dla.datalake.meta.sync.enable 是否开启自动同步至DLA,默认为true(开启)。
dla.datalake.meta.username sync.enable为true时必填 同步到DLA JDBC用户名。
dla.datalake.meta.password sync.enable为true时必填 同步到DLA JDBC密码。
dla.datalake.meta.jdbc.url sync.enable为true时必填 同步到DLA JDBC链接。
dla.datalake.meta.db.name 同步至DLA的库名。如果配置了此参数,则所有表同步至该库中;否则从dts/jdbc/dfs中解析库名。
dla.datalake.meta.table.name 同步至DLA的表名。sync.mode为table时,如果配置了此参数,则所有数据同步到该表中;否则自动解析。
HUDI配置
配置项 是否必填 说明
dla.datalake.hoodie.target.base.path 同步DTS至OSS的根目录,默认为/tmp/dla-streaming-datalake/
dla.datalake.hoodie.compact.inline 写入时是否开启内联compaction(COPY_ON_WRITE模式不生效),默认为true。
dla.datalake.hoodie.compact.inline.max.delta.commits 写入多少个delta提交时进行compaction,默认为10。
dla.datalake.hoodie.table.type hudi表类型,默认为MERGE_ON_READ。
dla.datalake.hoodie.insert.shuffle.parallelism insert并发度,默认为3。
dla.datalake.hoodie.upsert.shuffle.parallelism upssert并发度,默认为3。
dla.datalake.hoodie.enable.timeline.server 是否开启timeline,默认为false(关闭)。
dla.datalake.hoodie.save.mode 保存模式。全量同步时默认为Override。
dla.datalake.hoodie.table.name Hudi表名。
dla.datalake.hoodie.datasource.write.operation 写入类型。全量同步时默认为bulk_insert。
dla.datalake.hoodie.bulkinsert.shuffle.parallelism 全量同步时默认为bulk_insert时并发度,默认为1。
dla.datalake.hoodie.partition.field 分区字段,默认为空字符串,即不进行分区。
dla.datalake.hoodie.precombine.field 预聚合字段。在同步DFS sync.mode为table时必填,其他情况选填。
dla.datalake.hoodie.datasource.write.recordkey.field 主键字段。在同步DFS sync.mode为table时必填,其他情况选填。
dla.datalake.hoodie.key.generator.class 键生成类路径,默认为org.apache.hudi.keygen.ComplexKeyGenerator。
dla.datalake.hoodie.dla.sync.partition.fields 同步至DLA的分区字段,默认为空字符串。
dla.datalake.hoodie..dla.sync.partition.extractor.class 同步至DLA的分区字段解析器,默认为org.apache.hudi.hive.NonPartitionedExtractor。
说明 Hudi表实际的路径为{dla.datalake.hoodie.target.base.path}/{解析的库名}/{解析的表名}
其他系统配置
配置项 是否必填 说明
dla.datalake.system.convert.all.types.to.string 是否将所有类型转化为String类型,默认为false。
dla.datalake.system.convert.decimal.to.string 是否将decimal类型转化为String类型,默认为true。
dla.datalake.system.convert.decimal.to.double 是否将decimal类型转化为Double类型,默认为false。当配置为true时,请将 dla.datalake.system.convert.decimal.to.string 配置为false。
dla.datalake.system.decimal.columns.definition decimal类型定义,格式为:表名:列名1,precision,scale;列名2,precision,scale。如:tableName1:c1,10,2;c2,5,2#tableName2:c,4,2。
dla.datalake.system.convert.int.to.long 是否将int类型转化为long类型定义,默认为true。
说明 请确保您在配置全量同步和增量同步作业时保持相同的配置,否则会导致类型不匹配而报错。