本文为您介绍MaxCompute Reader支持的数据类型、字段映射和数据源等参数及配置示例。

背景信息

MaxCompute Reader插件实现了从MaxCompute读取数据的功能,有关MaxCompute的详细介绍请参见MaxCompute简介

根据您配置的源头项目、表、分区和表字段等信息,可以通过Tunnel从MaxCompute系统中读取数据。常用的Tunnel命令请参见Tunnel命令操作

MaxCompute Reader支持读取分区表、非分区表,不支持读取虚拟视图。DataWorks不支持对MaxCompute分区表进行字段映射,您需要对分区字段进行单独配置。例如,读取t0表,其分区为pt=1,ds=hangzhou,则您需要在配置中配置该值。表字段既可以依序指定全部列、部分列,也可以调整列顺序、指定常量字段和指定分区列(分区列不是表字段)。
说明
  • MaxCompute Reader不支持数据过滤功能。如果您在数据同步过程中,需要过滤符合条件的数据,请创建新表并写入过滤数据后,同步新表中的数据。
  • MaxCompute Reader不支持同步外部表。

支持的数据类型

MaxCompute Reader针对MaxCompute的类型转换列表,如下所示。
类型分类 数据集成配置类型 数据库数据类型
整数类 LONG BIGINT、INT、TINYINT和SMALLINT
布尔类 BOOLEAN BOOLEAN
日期时间类 DATE DATETIME、TIMESTAMP和DATE
浮点类 DOUBLE FLOAT、DOUBLE和DECIMAL
二进制类 BYTES BINARY
复杂类 STRING ARRAY、MAP和STRUCT

参数说明

参数 描述 是否必选 默认值
datasource 数据源名称。脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。
table 读取数据表的表名称(大小写不敏感)。
partition
读取的数据所在的分区信息。
  • ODPS的分区配置支持linux shell通配符,*表示0个或多个字符,?表示任意一个字符。
  • 默认情况下,读取的分区必须存在,如果分区不存在则运行的任务会报错。如果您希望当分区不存在时任务仍然执行成功,则可以切换至脚本模式执行任务,并在ODPS的Parameter中添加"successOnNoPartition": true配置。
例如,分区表test包含pt=1,ds=hangzhoupt=1,ds=shanghaipt=2,ds=hangzhoupt=2,ds=beijing四个分区,则读取不同分区数据的配置如下:
  • 如果您需要读取pt=1,ds=hangzhou分区的数据,则分区信息的配置为"partition":"pt=1,ds=shanghai”
  • 如果您需要读取pt=1中所有分区的数据,则分区信息的配置为 "partition":"pt=1,ds=*”
  • 如果您需要读取整个test表所有分区的数据,则分区信息的配置为 "partition":"pt=*,ds=*”
此外,您还可以根据实际需求设置分区数据的获取条件:
  • 如果您需要指定最大分区,则可以添加/*query*/ ds=(select MAX(ds) from DataXODPSReaderPPR)配置信息。
  • 如果需要按条件过滤,则可以添加相关条件/*query*/ pt+表达式配置。例如/*query*/ pt>=20170101 and pt<20170110表示获取pt分区中,20170101日期之后(包含20170101日期),至20170110日期之前(不包含20170110日期)的所有数据。
说明 /*query*/表示将其后填写的内容识别为一个where条件。
如果表为分区表,则必填。如果表为非分区表,则不能填写。
column 读取MaxCompute源头表的列信息。例如表test的字段为idnameage
  • 如果您需要依次读取idnameage,则应该配置为"column":["id","name","age"]或者配置为"column":["*"]
    说明 不推荐您配置抽取字段为(*),因为它表示依次读取表的每个字段。如果您的表字段顺序调整、类型变更或者个数增减,您的任务会存在源头表列和目的表列不能对齐的风险,则直接导致您的任务运行结果不正确甚至运行失败。
  • 如果您想依次读取nameid,则应该配置为"coulumn":["name","id"]
  • 如果您想在源头抽取的字段中添加常量字段(以适配目标表的字段顺序)。例如,您想抽取的每一行数据值为age列对应的值,name列对应的值,常量日期值1988-08-08 08:08:08,id列对应的值,则您应该配置为"column":["age","name","'1988-08-08 08:08:08'","id"],即常量列首尾用符号'包住即可。
    内部实现上识别常量是通过检查您配置的每一个字段,如果发现有字段首尾都有',则认为其是常量字段,其实际值为去除'之后的值。
    说明
    • MaxCompute Reader抽取数据表不是通过MaxCompute的Select SQL语句,所以不能在字段上指定函数。
    • column必须显示指定同步的列集合,不允许为空。

向导开发介绍

打开新建的数据同步节点,即可进行同步任务的配置,详情请参见通过向导模式配置任务

您需要在数据同步任务的编辑页面进行以下配置:
  1. 选择数据源。
    配置同步任务的数据来源数据去向选择数据源
    参数 描述
    数据源 即上述参数说明中的datasource,通常填写您配置的数据源名称。
    开发项目名 默认不可以修改。
    说明 仅标准模式的工作空间会显示该配置。
    生产项目名 默认不可以修改。
    即上述参数说明中的table,选择需要同步的表。
    分区信息 如果您每日增量数据限定在对应日期的分区中,可以使用分区进行每日增量。例如,配置分区pt的值为${bizdate}
    说明 DataWorks不支持对MaxCompute分区表进行字段映射,您需要单独配置分区字段。
    说明 如果是指定所有的列,可以在column配置,例如"column": [""]partition支持配置多个分区和通配符的配置方法:
    • "partition":"pt=20140501/ds=*"代表ds中的所有的分区。
    • "partition":"pt=top?"中的问号(?)代表前面的字符是否存在,指pt=top和pt=to两个分区。
    您可以输入需要同步的分区列,例如MaxCompute的分区为pt=${bdp.system.bizdate},您可以直接添加分区名称pt至源表字段中(可能会有未识别的标志,直接忽视进行下一步):
    • 如果需要同步所有的分区,配置分区值为pt=*
    • 如果需要同步某个分区,可以直接选择您要同步的时间值。
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应的关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其它空行会被忽略。
    添加一行 单击添加一行,您可以输入以下类型的字段:
    • 可以输入常量,输入的值需要使用英文单引号,如'abc’'123’等。
    • 可以配合调度参数使用,例如${bizdate}等。
    • 如果您输入的值无法解析,则类型显示为未识别。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

配置一个从MaxCompute抽取数据到本地的作业,使用脚本开发的详情请参见通过脚本模式配置任务
注意 实际运行时,请删除下述代码中的注释。
{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"odps",//插件名。
            "parameter":{
                "partition":[],//读取数据所在的分区。
                "isCompress":false,//是否压缩。
                "datasource":"",//数据源。
                "column":[//源头表的列信息。
                    "id"
                ],
                "emptyAsNull":true,
                "table":""//表名。
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}
如果您需要指定MaxCompute的Tunnel Endpoint,可以通过脚本模式手动配置数据源。将上述示例中的"datasource":"",替换为数据源的具体参数,示例如下。
"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"*****",