数据摄入YAML作业开发(公测中)

实时计算Flink版基于Flink CDC,通过开发YAML作业的方式有效地实现了将数据从源端同步到目标端的数据摄入工作。本文将为您介绍数据摄入YAML作业开发的操作步骤。

背景信息

数据摄入模块整合了Flink CDC连接器,相对于CDAS和CTAS,它通过YAML配置的方式可以轻松定义复杂的ETL流程,并自动转化为Flink运算逻辑。除支持整库同步、单表同步、分库分表同步、新增表同步、表结构变更和自定义计算列同步等能力,还支持ETL处理、Where条件过滤、列裁剪和计算列,极大地简化了数据集成过程,有效提升了数据集成的效率和可靠性。

YAML优势

在实时计算Flink版中,您可以选择开发数据摄入YAML作业、SQL作业或自行开发DataStream作业完成数据同步工作。下面介绍一下数据摄入YAML作业相比于其他两种开发方式的优势。

YAML vs SQL

数据摄入YAML作业和SQL作业在数据传递过程中使用不同的数据类型:

  • SQL传递RowData,YAML传递DataChangeEvent和SchemaChangeEvent。SQL的每个RowData都有自己的变更类型,主要有4种类型:insert(+I),update before(-U),update after(+U)和delete(-D)。

  • YAML使用SchemaChangeEvent传递Schema变更信息,例如创建表,添加列、清空表等,DataChangeEvent用来传递数据变更,主要是insert,update和delete,update消息中同时包含了update before和update after的内容,这使得您能够写入原始变更数据到目标端。

数据摄入YAML作业相比SQL作业的优势如下:

数据摄入YAML

SQL

自动识别Schema,支持整库同步

需要人工写Create Table和Insert语句

支持多策略的Schema变更

不支持Schema变更

支持原始Changelog同步

破坏原始Changelog结构

支持读写多个表

读写单个表

相对于CTAS或CDAS语句,YAML作业功能也更为强大,可以支持:

  • 上游表结构变更立即同步,不用等新数据写入触发。

  • 支持原始Changelog同步,Update消息不拆分。

  • 同步更多类型的Schema变更,例如Truncate Table和Drop Table等变更。

  • 支持指定表的映射关系,灵活定义目标端表名。

  • 支持灵活的Schema Evolution行为,用户可配置。

  • 支持WHERE条件过滤数据。

  • 支持裁剪字段。

YAML vs DataStream

数据摄入YAML作业相比DataStream作业的优势如下:

数据摄入YAML

DataStream

为各级别用户设计,不只是专家

需要熟悉Java和分布式系统

隐藏底层细节,便于开发

需要熟悉Flink框架

YAML格式容易理解和学习

需要了解Maven等工具管理相关依赖

已有作业方便复用

难以复用已有代码

使用限制

  • 仅实时计算引擎VVR 8.0.9及以上版本支持数据摄入YAML作业。

  • 仅支持从一个源端流向一个目标端。从多个数据源读取或写入多个目标端时需编写多个YAML作业。

  • 暂不支持将YAML作业部署到Session集群。

数据摄入连接器

当前支持作为数据摄入源端和目标端的连接器如下表所示。

说明

欢迎您通过工单、钉钉等渠道反馈感兴趣的上下游存储,未来计划适配更多上下游以更好满足您的需要。

连接器

支持类型

Source

Sink

消息队列Kafka

实时数仓Hologres

×

MySQL

说明

支持连接RDS MySQL版、PolarDB MySQL版及自建MySQL。

×

Upsert Kafka

×

Print

×

StarRocks

×

流式数据湖仓Paimon

×

操作步骤

  1. 登录实时计算管理控制台

  2. 单击目标工作空间操作列的控制台

  3. 在左侧导航栏选择数据开发 > 数据摄入

  4. 单击新建,选择空白的数据摄入草稿,单击下一步

    您也可以直接选择目标数据同步模板(MySQL到Starrocks数据同步、MySQL到Paimon数据同步或MySQL到Hologres数据同步)快速配置YAML作业开发信息。

  5. 填写作业名称存储位置和选择引擎版本后,单击确定

  6. 配置YAML作业开发信息。

    # 必填
    source:
      # 数据源类型
      type: <替换为您源端连接器类型>
      # 数据源配置信息,配置项详情请参见对应连接器文档。
      ...
    
    # 必填
    sink:
      # 目标类型
      type: <替换为您目标端连接器类型>
      # 数据目标配置信息,配置项详情请参见对应连接器文档。
      ...
    
    # 可选
    transform:
      # 转换规则,针对flink_test.customers表
      - source-table: flink_test.customers
        # 投影配置,指定要同步的列,并进行数据转换
        projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
        # 过滤条件,只同步id大于10的数据
        filter: id > 10
        # 描述信息,用于解释转换规则
        description: append calculated columns based on source table
    
    # 可选
    route:
      # 路由规则,指定源表和目标表之间的对应关系
      - source-table: flink_test.customers
        sink-table: db.customers_o
        # 描述信息,用于解释路由规则
        description: sync customers table
      - source-table: flink_test.customers_suffix
        sink-table: db.customers_s
        # 描述信息,用于解释路由规则
        description: sync customers_suffix table
    
    #可选
    pipeline:
      # 任务名称
      name: MySQL to Hologres Pipeline

    涉及的代码块说明详情如下。

    是否必填

    代码模块

    说明

    必填

    source(数据源端)

    数据管道的起点,Flink CDC将从数据源中捕获变更数据。

    说明
    • 目前仅支持MySQL作为数据源,具体的配置项详情请参见MySQL

    • 您可以使用变量对敏感信息进行设置,详情请参见变量管理

    sink(数据目标端)

    数据管道的终点,Flink CDC将捕获的数据变更传输到这些目标系统中。

    说明
    • 目前支持的目标端系统请参见数据摄入连接器,目标端配置项详情请参见对应连接器文档。

    • 您可以使用变量对敏感信息进行设置,详情请参见变量管理

    可选

    pipeline

    (数据管道)

    定义整个数据通道作业的一些基础配置,例如pipeline名称等。

    transform(数据转换)

    填写数据转化规则。转换是指对流经Flink管道的数据进行操作的过程。支持ETL处理、Where条件过滤,列裁剪和计算列。

    当Flink CDC捕获的原始变更数据需要经过转换以适应特定的下游系统时,可以通过transform实现。

    route(路由)

    如果未配置该模块,则代表整库或目标表同步。

    在某些情况下,捕获的变更数据可能需要根据特定规则被发送到不同的目的地。路由机制允许您灵活指定上下游的映射关系,将数据发送到不同的数据目标端。

    各模块语法结构和配置项说明详情,请参见数据摄入开发参考

    以将MySQL中app_db数据库下的所有表同步到Hologres的某个数据库为例,代码示例如下。

    source:
      type: mysql
      hostname: <hostname>
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: app_db.\.*
      server-id: 5400-5404
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: <endpoint>
      dbname: <database-name>
      username: ${secret_values.holousername}
      password: ${secret_values.holopassword}
    
    pipeline:
      name: Sync MySQL Database to Hologres
  7. (可选)单击深度检查

    您可以进行语法检测、网络连通性和访问权限检查。

相关文档