接入DataWorks数据集成

当您需要在DataWorks中同步PolarDB-X 2.0的数据时,直接使用通用的MySQL类型数据源可能会引发并行读取效率低下、数据库资源过度消耗甚至任务异常等问题。为确保数据同步任务的高效与稳定,建议您使用专为PolarDB-X 2.0优化的专属数据源类型。本文将指导您如何配置和使用PolarDB-X 2.0数据源,并介绍其独有的优化参数。

功能简介

DataWorks中的PolarDB-X 2.0数据源是为PolarDB-X分布式架构量身定制的。与通用MySQL数据源不同,它能够识别PolarDB-X的分区表拓扑,从而实现按分区进行并行数据读取。这种优化的读取方式可以最大化利用数据库资源,显著提升数据同步性能,并避免因不合理的扫描方式对线上业务造成冲击。

适用范围

仅适用于PolarDB-X 2.0实例。

说明

如您使用的是PolarDB-X 1.0实例,请选择DRDS(PolarDB-X 1.0)数据源

操作指南

步骤一:配置账号权限

执行数据同步任务的数据库账号需要具备以下权限。

  • 离线同步:

    • 离线读(Reader):账号需要对目标库拥有SELECT权限。

      说明

      由于Reader在执行前会通过SHOW TOPOLOGY FROM <table_name>指令获取表的拓扑信息,因此需确保该账号具备目标库的读权限。

    • 离线写(Writer):账号需要对目标表拥有INSERTDELETEUPDATE权限。

  • 实时同步(整库实时):

    • 高权限账号:默认可以读取Binlog,用于实时同步。

    • 普通账号:需由高权限账号为其授予SELECTREPLICATION SLAVEREPLICATION CLIENT权限。

步骤二:创建数据源

DataWorks控制台新增数据源时,需要选择PolarDB-X 2.0。其他详细参数与配置,请参见创建数据源

  1. 进入管理中心页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的更多 > 管理中心,在下拉框中选择对应工作空间后单击进入管理中心

  2. 单击左侧导航栏的数据源 > 数据源列表,进入数据源页面。

  3. 单击页面左上角的新增数据源

image

步骤三:(可选)使用自定义连接地址实现负载隔离

如果大量DataWorks任务对某些CN节点造成负载压力,您可以通过自定义连接地址将DataWorks的读写流量路由到专用的CN节点,实现负载隔离。

  1. 创建自定义连接地址

    1. 前往PolarDB分布式版控制台,找到并进入目标实例。

    2. 在左侧导航栏中,单击配置与管理 > 连接管理

    3. 进入自定义地址页签,单击创建自定义连接,配置相关参数。

    说明
    • 若您的实例是三可用区实例时,自带Standby CN,并且Standby CN不承担业务流量,特别适合供DataWorks使用。如下图,可在实例详情页内计算节点列表内找到未激活的节点,即为Standby节点。image

    • 若您的实例不是三可用区实例时,您可以:

      • 变更可用区,将单可用区变更为三可用区,三可用区实例价格和单可用区实例一致。

      • 创建多个自定义连接地,供业务和DataWorks使用。

  2. DataWorks中使用:创建或修改DataWorks数据源时,配置方式选择使用连接串模式,并将上一步创建的自定义连接地址填入主机地址/IP中即可。image

步骤四:数据同步任务开发

因部分高级参数需要在脚本模式下进行配置,建议您通过脚本模式配置数据同步任务。详细步骤与说明,请参考通过脚本模式配置离线同步任务

配置数据读取(Reader)

以下是Reader的核心参数说明,部分高级参数需要在脚本模式下配置。

参数名

描述

是否必选

默认值

table

需要同步的表名称,仅支持单表。

column

需要同步的列名集合,以JSON数组格式定义。

  • 使用所有列配置,例如[*]

  • 支持列裁剪,可以挑选部分列进行同步。

  • 支持列换序,列可以不按照表Schema信息顺序进行同步。

  • 支持配置为任意可以出现在SQL SELECT中的表达式。

  • 需显示指定同步的列集合,不允许为空。

较复杂示例如下:

["c1+c2", 
"FROM_UNIXTIME(c3)", 
"CAST(c4 AS SIGNED)", 
"c5", 
"c5",
"c5",
"IFNULL(c6,0)",
"IF(c7>10,c7,c8)",
]

where

数据过滤条件,将直接拼接到SELECT语句的WHERE子句中。请谨慎使用,确保该语句有高效的执行计划。例如:gmt_create>='2025-09-01'

说明

该过滤条件并非在DataWorks层进行过滤,而是直接拼到SQL中。例如,上文的例子等价于执行SQL:SELECT * FROM tbl WHERE gmt_create>='2025-09-01'。您需要注意执行该语句是否存在性能问题:

  • 如果gmt_create上不存在索引,则该语句会导致全表扫描。

  • 即使gmt_create上存在索引,也可能出现数据库选择索引错误、回表代价过大等问题。

不推荐在生产库通过该参数指定过滤条件,如要使用,需谨慎评估该SQL的性能。

onePartitionOneTask

核心优化参数。当读取的表为分区表时,建议将该参数设置为true。在此情况下,DataWorks将按分区为单位读取数据,每次读取的SQL语句仅包含对单个分区的查询,从而实现不同读取线程之间的相对均匀分配。

说明

使用onePartitionOneTask参数时,单个分片的默认读取超时时间为1小时。若同步大分区数据耗时超过此限制,会导致任务失败。

false

splitPk

该参数已废弃,不建议使用。

示例脚本

{
  "stepType": "polardbx20",
  "parameter": {
    "checkSlave": "false",
    "slaveDelayLimit": "300",
    "datasource": "ms_polardbx",
    "envType": 1,
    "column": ["c1+c2", 
      "FROM_UNIXTIME(c3)", 
      "CAST(c4 AS SIGNED)", 
      "c5", 
      "c5",
      "c5",
      "IFNULL(c6,0)",
      "IF(c7>10,c7,c8)",
      ],
    "tableComment": "",
    "onePartitionOneTask":true    // 当读取的表为分区表时推荐设置该参数为true
  },
  "name": "Reader",
  "category": "reader"
}

配置数据写入(Writer)

以下是Writer的核心参数说明。

参数名

描述

是否必选

默认值

table

目标表的名称。

writeMode

选择导入模式,支持以下模式:

  • insert into:当主键/唯一性索引冲突时会写不进去冲突的行,以脏数据的形式体现。

  • insert ignore:常用模式。当主键/唯一性索引冲突时会忽略,保留目标端原有的记录。

  • replace into:当主键/唯一性索引冲突时,会先删除原有行,再插入新行。即新行会替换原有行的所有字段。

  • upsert:常用模式。使用INSERT INTO ... ON DUPLICATE KEY UPDATE ... 语句,当主键/唯一性索引冲突时,默认更新所有字段,可通过updateColumn指定更新部分列:

    • 默认更新所有字段:即新行会替换原有行的所有字段,此时行为与repalce into类似。

    • 更新部分列:指定updateColumn,只更新部分列,例如:

      "writeMode":"upsert",
      "updateColumn":["c1","c2"]

      此时生成的SQL语句为INSERT INTO ... ON DUPLICATE KEY UPDATE c1=VALUES(c1), c2=VALUES(c2)

  • upsert complex:高级用法。当主键/唯一性索引冲突时,执行updateColumn中定义的复杂更新逻辑。

insert into

updateColumn

配合writeMode中的upsertupsert complex模式使用,指定更新的列或更新逻辑。

column

需要写入数据的目标表列名,字段之间用英文逗号分隔,例如"column": ["id", "name", "age"]。使用["*"]表示所有列。

preSql

执行数据同步任务之前,需率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句。

postSql

执行数据同步任务之后执行的SQL语句,目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句。

batchSize

批量写入的记录数量。增大此值可减少数据同步系统与PolarDB-X 2.0之间的网络交互次数,从而提升整体吞吐量。然而,如果设置过大,将会增加内存消耗,并可能导致内存溢出。

256

enableRetry

是否开启自动重试(如死锁、锁等待超时)。当表有主键或唯一键时,建议设置为true,开启自动重试。

说明

当表中不包含主键或唯一键时,重试可能导致数据出现重复。

false

retryableException

enableRetry搭配使用,除了默认的异常列表,还可以添加一些为包含的异常,通常无需修改。

["Lock wait timeout exceeded", "Deadlock found when trying to get lock", "Query execution was interrupted"]

maxRetryTimes

enableRetry搭配使用,重试的次数,通常无需修改。

100

enableReConnect

是否在连接中断(如网络闪断、主备切换)后自动重连并重试。当表有主键或唯一键时,建议设置为true,开启自动重试。

说明

当表中不包含主键或唯一键时,重试可能导致数据出现重复。

false

示例脚本

{
  "stepType": "polardbx20",
  "parameter": {
    "postSql": [],
    "datasource": "ms_polardbx",
    "envType": 1,
    "column": [
      "id",
      "c1",
      "c2",
      "c3",
      "c4"
    ],
    "tableComment": "",
    "writeMode": "insert ignore",
    "batchSize": 256,
    "table": "tbl",
    "preSql": [],
    "enableRetry": true,	//如果数据带有主键或者唯一键,建议设置为true
    "enableReConnect": true	//如果数据带有主键或者唯一键,建议设置为true
  },
  "name": "Writer",
  "category": "writer"
}

常见问题

任务执行超过8小时后报错,是什么原因?

这很可能是因为任务运行时间超过了PolarDB-X 2.0默认的事务最大时长(8小时)。您可以使用高权限账号执行以下命令调大该参数(单位:秒):

说明

该问题触发时,任务执行时间通常恰好等于8小时,但也可能远大于8小时,因此当任务执行时间超过8小时,可以优先怀疑触发了此问题。

SET GLOBAL max_trx_duration = 100000;

读取单个大分片时报错 XResult stream fetch result timeout,如何处理?

该错误表示单个分片的读取时间超过了默认的1小时。您可以使用高权限账号执行以下命令调大Socket超时时间(单位:毫秒):

SET GLOBAL socket_timeout = 36000000;

任务报错Application was streaming results when the connection failed,如何处理?

这是一个通用性连接错误,请按以下步骤检查和优化配置:

  • 确认DataWorks数据源类型是否为PolarDB-X 2.0

  • Reader配置中,为分区表设置onePartitionOneTask=true

  • 适当调大max_trx_durationsocket_timeout参数。

如何判断一个SQL查询是否来自DataWorks?

DataWorks的读取SQL通常是慢SQL(拖数据的SQL是慢SQL很正常,无需担忧),如果使用的是PolarDB-X 2.0数据源,则SQL语句前通常会包含类似 /*+TDDL:SCAN(...)*/ 的优化器提示(Hint),用于指定扫描的物理分片。例如:

/*+TDDL:SCAN(TABLE="mocktable",REAL_TABLE=("t1_00"),NODE="GROUP_00")*/SELECT ...

文中提到的某些参数(如onePartitionOneTask)在配置界面上找不到?

部分高级参数或优化参数仅在脚本模式下可见。您可以在DataWorks的节点编辑页面上方的工具栏中切换到脚本模式进行配置。image