本文为您介绍DM Reader支持的数据类型、字段映射和数据源等参数及配置示例。

注意 DM Reader仅支持使用新增和使用独享数据集成资源组,不支持使用默认资源组和自定义资源组

背景信息

DM Reader插件从DM读取数据。在底层实现上,DM Reader通过JDBC连接远程DM数据库,并执行相应的SQL语句,从DM库中读取数据。

DM Reader通过JDBC连接器连接至远程的DM数据库,并根据您配置的信息生成查询SQL语句,发送至远程DM数据库,执行该SQL并返回结果。然后使用数据同步自定义的数据类型拼装为抽象的数据集,传递给下游Writer处理:
  • 对于您配置的tablecolumnwhere等信息,DM Reader将其拼接为SQL语句发送至DM数据库。
  • 对于您配置的querySql信息,DM直接将其发送至DM数据库。

DM Reader支持大部分通用的关系数据库数据类型,例如数字、字符等。但也存在部分类型没有支持的情况,请注意检查您的数据类型,根据具体的数据库进行选择。

类型转换列表

DM Reader针对DM类型的转换列表,如下所示。
类型分类 DM数据类型
整数类 INT、TINYINT、SMALLINT和BIGINT
浮点类 REAL、FLOAT、DOUBLE、NUMBER和DECIMAL
字符串类 CHAR、VARCHAR、LONGVARCHAR和TEXT
日期时间类 DATE、DATETIME、TIMESTAMP和TIME
布尔型 BIT
二进制类 BINARY、VARBINARY和BLOB

参数说明

参数 描述 是否必选 默认值
datasource 输入DM数据源名称,配置数据源详情请参见配置DM数据源
table 所选取的需要同步的表。
column 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息,默认使用所有列配置,例如[ * ]:
  • 支持列裁剪,即列可以挑选部分列进行导出。
  • 支持列换序,即列可以不按照表schema信息顺序进行导出。
  • 支持常量配置,您需要按照JSON格式["id","1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]
    • id为普通列名。
    • 1为整型数字常量。
    • 'bazhen.csy'为字符串常量。
    • null为空指针。
    • to_char(a + 1)为函数表达式。
    • 2.3为浮点数。
    • true为布尔值。
  • column必须显示您指定同步的列集合,不允许为空 。
splitPk DM Reader进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片。数据同步系统会启动并发任务进行数据同步,以提高数据同步的效能:
  • 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,切分出来的分片也不容易出现数据热点。
  • 目前splitPk仅支持整型数据切分,不支持浮点、字符串和日期等其他类型。如果您指定其他非支持类型,DM Reader将报错。
  • 如果不填写splitPk,将视作您不对单表进行切分,DM Reader使用单通道同步全量数据。
where 筛选条件,DM Reader根据指定的columntablewhere条件拼接SQL,并根据该SQL进行数据抽取。例如在做测试时,可以将where条件指定为limit 10。
在实际业务场景中,通常会选择当天的数据进行同步,可以将where条件指定为gmt_create>$bizdate
  • where条件可以有效地进行业务增量同步。
  • where条件不配置或为空时,则视作全表同步数据。
querySql 在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置型来自定义筛选SQL。当您配置该项后,数据同步系统会忽略columntable等配置,直接使用该配置项的内容对数据进行筛选。

例如,需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id 。当您配置querySql时,DM Reader直接忽略columntablewhere条件的配置。

fetchSize 该配置项定义了插件和数据库服务器端每次批量数据获取条数,该值决定了数据同步系统和服务器端的网络交互次数,能够提升数据抽取性能。
说明 fetchSize值过大(>2048)可能造成数据同步进程OOM。
1,024

向导开发介绍

打开新建的数据同步节点,即可进行同步任务的配置,详情请参见通过向导模式配置离线同步任务

您需要在数据同步任务的编辑页面进行以下配置:
  1. 选择数据源。
    配置同步任务的数据来源数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常输入您配置的数据源名称。
    Schema 所选取的需要同步的表所在的Schema。
    即上述参数说明中的table
    数据过滤 您将要同步数据的筛选条件,暂时不支持limit关键字过滤。SQL语法与选择的数据源一致。
    切分键 您可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键,仅支持类型为整型的字段。
    读取数据时,根据配置的字段进行数据分片,实现并发读取,可以提升数据同步效率。
    说明 切分键与数据同步中的选择来源有关,配置数据来源时才显示切分键配置项。
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段。鼠标放至需要删除的字段上,即可单击删除图标进行删除 。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其它空行会被忽略。
    添加一行 添加一行的功能如下所示:
    • 可以输入常量,输入的值需要使用英文单引号。例如,'abc''123'等。
    • 可以配合调度参数使用。例如,${bizdate}等。
    • 可以输入关系数据库支持的函数。例如,now()count(1)等。
    • 如果您输入的值无法解析,则类型显示为未识别
  3. 通道控制。通道控制
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组概述新增和使用独享数据集成资源组

脚本开发介绍

配置一个从DM数据库同步抽取数据作业,使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务
{
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {
            "record": "0"
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "concurrent":1 //作业并发数。
                  "mbps":"12",//限流

        }
    },
    "steps": [
        {
            "category": "reader",
            "name": "Reader",
            "parameter": {
                "datasource": "dm_datasource",
                "table": "table",
                "column": [
                    "*"
                ],
                "preSql": [
                    "delete from XXX;"
                ],
                "fetchSize": 2048
            },
            "stepType": "dm"
        },
        {
            "category": "writer",
            "name": "Writer",
            "parameter": {},
            "stepType": "stream"
        }
    ],
    "type": "job",
    "version": "2.0"
}