文档

MaxCompute数据源

更新时间:

MaxCompute数据源作为数据中枢,为您提供读取和写入数据至MaxCompute的双向通道。

使用限制

说明

DataWorks的MaxCompute数据源可使用Tunnel Endpoint地址访问相应MaxCompute项目的Tunnel服务,从而通过上传、下载等方式同步该项目的数据。

2023年12月11日之后创建的MaxCompute数据源,若数据源所在的DataWorks服务与需要访问的MaxCompute项目不在同一地域,则无法直接通过Tunnel Endpoint地址同步MaxCompute项目的数据。该场景下,您需先购买云企业网,连通DataWorks服务与MaxCompute项目的网络,网络连通后才可跨地域执行数据同步操作。云企业网的介绍及相关操作,请参见云企业网

离线读

  • MaxCompute Reader支持读取分区表、非分区表,不支持读取虚拟视图、不支持同步外部表。

  • 离线读MaxCompute分区表时,不支持直接对分区字段进行字段映射配置,需要在配置数据来源时指定待同步数据的分区信息。

    例如,分区表t0其字段包含id、name两个字段,一级分区为pt,二级分区为ds。读取t0的pt=1,ds=hangzhou分区数据时,您需要在配置数据来源时指定分区值为pt=1,ds=hangzhou,后续字段映射配置时进行id、name字段的映射配置。

  • MaxCompute Reader不支持数据过滤功能。

    如果您在数据同步过程中,需要过滤符合条件的数据,请创建新表并写入过滤数据后,同步新表中的数据。

离线写

当数据有null值时,MaxCompute Writer不支持VARCHAR类型。

实时写

  • 实时数据同步任务仅支持使用独享数据集成资源组。

  • 实时同步节点目前仅支持同步PolarDB、Oracle、MySQL数据源至MaxCompute。

  • 实时数据同步任务暂不支持同步没有主键的表。

  • 当实时同步至MaxCompute默认数据源(一般为odps_first)时,默认使用临时AK进行同步,临时AK超过7天会自动过期,同时,将导致任务运行失败。平台检测到临时AK导致任务失败时会自动重启任务,如果任务配置了该类型的监控报警,您将会收到报警信息。

  • 一键实时同步至MaxCompute任务配置当天仅能查询历史全量数据,增量数据需要等待第二天merge完成后才可在MaxCompute查询。

  • 一键实时同步至MaxCompute任务每天会生成一个全量分区,为避免数据过多占用存储资源,本方案任务自动建立的MaxCompute表,默认生命周期为30天。如果时长不满足您的业务需求,可以在配置同步任务时单击对应的MaxCompute表名修改生命周期。

  • 数据集成使用MaxCompute引擎同步数据通道进行数据上传和下载(同步数据通道SLA详情请参见数据传输服务(上传)场景与工具),请根据MaxCompute引擎同步数据通道SLA评估数据同步业务技术选型。

  • 一键实时同步至MaxCompute,按实例模式同步时,独享数据集成资源组规格最低需要为8C16G。

  • 仅支持与当前工作空间同地域的自建MaxCompute数据源,跨地域的MaxCompute项目在测试数据源服务连通性时可以正常连通,但同步任务执行时,在MaxCompute建表阶段会报引擎不存在的错误。

    说明

    使用自建MaxCompute数据源时,DataWorks项目仍然需要绑定MaxCompute引擎,否则将无法创建MaxCompute SQL节点,导致全量同步标done节点创建失败。

注意事项

如果目标表列没有配置和源端的列映射,则同步任务执行完成后,此列值为空,即使此目标列建表时指定了默认值,列值仍为空。

支持的字段类型

支持MaxCompute的1.0数据类型、2.0数据类型、Hive兼容数据类型。不同数据类型版本支持的字段类型详情如下。

1.0数据类型支持的字段

字段类型

离线读

离线写

实时写

BIGINT

支持

支持

支持

DOUBLE

支持

支持

支持

DECIMAL

支持

支持

支持

STRING

支持

支持

支持

DATETIME

支持

支持

支持

BOOLEAN

支持

支持

支持

ARRAY

支持

支持

支持

MAP

支持

支持

支持

STRUCT

支持

支持

支持

2.0数据类型、Hive兼容数据类型支持的字段

字段类型

离线读(MaxCompute Reader)

离线写(MaxCompute Writer)

实时写

TINYINT

支持

支持

支持

SMALLINT

支持

支持

支持

INT

支持

支持

支持

BIGINT

支持

支持

支持

BINARY

支持

支持

支持

FLOAT

支持

支持

支持

DOUBLE

支持

支持

支持

DECIMAL(pecision,scale)

支持

支持

支持

VARCHAR(n)

支持

支持

支持

CHAR(n)

不支持

支持

支持

STRING

支持

支持

支持

DATE

支持

支持

支持

DATETIME

支持

支持

支持

TIMESTAMP

支持

支持

支持

BOOLEAN

支持

支持

支持

ARRAY

支持

支持

支持

MAP

支持

支持

支持

STRUCT

支持

支持

支持

数据类型转换说明

MaxCompute Reader针对MaxCompute的类型转换列表,如下所示。

类型分类

数据集成配置类型

数据库数据类型

整数类

LONG

BIGINT、INT、TINYINT和SMALLINT

布尔类

BOOLEAN

BOOLEAN

日期时间类

DATE

DATETIME、TIMESTAMP和DATE

浮点类

DOUBLE

FLOAT、DOUBLE和DECIMAL

二进制类

BYTES

BINARY

复杂类

STRING

ARRAY、MAP和STRUCT

重要

如果数据转换失败,或数据写出至目的端数据源失败,则将数据作为脏数据,您可以配合脏数据限制阈值使用。

数据同步前准备:MaxCompute环境准备

读取或写入MaxCompute表数据时,您可以根据需要选择是否开启相关属性。

连接MaxCompute并开启项目级配置

  • 登录MaxCompute客户端,详情请参见使用本地客户端(odpscmd)连接

  • 开启MaxCompute项目级相关配置:请确认是否已拥有对应的操作权限,您可使用Project Owner账号执行相关操作,关于MaxCompute权限说明,详情请参见角色规划

开启acid属性

您可以使用Project Owner账号在客户端执行以下命令开启acid属性,关于MaxCompute ACID语义说明,详情请参见ACID语义

setproject odps.sql.acid.table.enable=true;

(可选)开启2.0数据类型

如果需要使用MaxCompute数据2.0类型中的timestamp类型,您需要使用Project Owner账号在客户端执行以下命令开启数据2.0。

setproject odps.sql.type.system.odps2=true;

数据同步任务开发:MaxCompute同步流程引导

MaxCompute数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。

创建MaxCompute数据源

进行数据同步任务开发前,您需要在DataWorks上将MaxCompute项目创建为MaxCompute数据源。创建MaxCompute数据源的详情操作,请参见创建MaxCompute数据源

说明
  • 标准模式的工作空间支持数据源隔离功能,您可以分别添加并隔离开发环境和生产环境的数据源,以保护您的数据安全。详情请参见数据源开发和生产环境隔离

  • 若工作空间中名为odps_first的MaxCompute数据源非人为在数据源界面创建,则该数据源为数据源改版前,您当前工作空间绑定的第一个MaxCompute引擎默认创建的数据源。进行数据同步时,如选择此数据源,则表示您要将数据读取或写入至该MaxCompute引擎项目中。

    您可在数据源配置页面,查看数据源使用的MaxCompute项目名称,确认数据最终读取或写入至哪一个MaxCompute项目。详情请参见管理数据源

单表离线同步任务配置指导

单表实时同步任务配置指导

操作流程请参见配置单表增量数据实时同步DataStudio侧实时同步任务配置

整库级别同步任务配置指导

整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步任务的配置操作,请参见数据集成侧同步任务配置

常见问题

更多其他数据集成常见问题请参见数据集成常见问题

附录:脚本Demo与参数说明

附录:离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。

MaxCompute Reader脚本Demo

重要

实际运行时,请删除下述代码中的注释。

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"odps",//插件名。
            "parameter":{
                "partition":[],//读取数据所在的分区。
                "isCompress":false,//是否压缩。
                "datasource":"",//数据源。
                "column":[//源头表的列信息。
                    "id"
                ],
                "emptyAsNull":true,
                "table":""//表名。
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

如果您需要指定MaxCompute的Tunnel Endpoint,可以通过脚本模式手动配置数据源。将上述示例中的"datasource":"",替换为数据源的具体参数,示例如下。

"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"*****", 

MaxCompute Reader脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称。脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。

table

读取数据表的表名称(大小写不敏感)。

partition

读取的数据所在的分区信息。

  • ODPS的分区配置支持linux Shell通配符,*表示0个或多个字符,?表示任意一个字符。

  • 默认情况下,读取的分区必须存在,如果分区不存在则运行的任务会报错。如果您希望当分区不存在时任务仍然执行成功,则可以切换至脚本模式执行任务,并在ODPS的Parameter中添加"successOnNoPartition": true配置。

例如,分区表test包含pt=1,ds=hangzhoupt=1,ds=shanghaipt=2,ds=hangzhoupt=2,ds=beijing四个分区,则读取不同分区数据的配置如下:

  • 如果您需要读取pt=1,ds=hangzhou分区的数据,则分区信息的配置为"partition":"pt=1,ds=hangzhou”

  • 如果您需要读取pt=1中所有分区的数据,则分区信息的配置为 "partition":"pt=1,ds=*”

  • 如果您需要读取整个test表所有分区的数据,则分区信息的配置为 "partition":"pt=*,ds=*”

此外,您还可以根据实际需求设置分区数据的获取条件:

  • 如果您需要指定最大分区,则可以添加/*query*/ ds=(select MAX(ds) from DataXODPSReaderPPR)配置信息。

  • 如果需要按条件过滤,则可以添加相关条件/*query*/ pt+表达式配置。例如/*query*/ pt>=20170101 and pt<20170110表示获取pt分区中,20170101日期之后(包含20170101日期),至20170110日期之前(不包含20170110日期)的所有数据。

说明

/*query*/表示将其后填写的内容识别为一个where条件。

如果表为分区表,则必填。如果表为非分区表,则不能填写。

column

读取MaxCompute源头表的列信息。例如表test的字段为idnameage

  • 如果您需要依次读取idnameage,则应该配置为"column":["id","name","age"]或者配置为"column":["*"]

    说明

    不推荐您配置抽取字段为(*),因为它表示依次读取表的每个字段。如果您的表字段顺序调整、类型变更或者个数增减,您的任务会存在源头表列和目的表列不能对齐的风险,则直接导致您的任务运行结果不正确甚至运行失败。

  • 如果您想依次读取nameid,则应该配置为"column":["name","id"]

  • 如果您想在源头抽取的字段中添加常量字段(以适配目标表的字段顺序)。例如,您想抽取的每一行数据值为age列对应的值,name列对应的值,常量日期值1988-08-08 08:08:08,id列对应的值,则您应该配置为"column":["age","name","'1988-08-08 08:08:08'","id"],即常量列首尾用符号'包住即可。

    内部实现上识别常量是通过检查您配置的每一个字段,如果发现有字段首尾都有',则认为其是常量字段,其实际值为去除'之后的值。

    说明
    • MaxCompute Reader抽取数据表不是通过MaxCompute的Select SQL语句,所以不能在字段上指定函数。

    • column必须显示指定同步的列集合,不允许为空。

MaxCompute Writer脚本Demo

脚本配置样例如下。

{
    "type":"job",
    "version":"2.0",//版本号。
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"odps",//插件名。
            "parameter":{
                "partition":"",//分区信息。
                "truncate":true,//清理规则。
                "compress":false,//是否压缩。
                "datasource":"odps_first",//数据源名。
            "column": [//源端列名。
                "id",
                "name",
                "age",
                "sex",
                "salary",
                "interest"
                ],
                "emptyAsNull":false,//空字符串是否作为null。
                "table":""//表名。
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数,表示脏数据的最大容忍条数。
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

如果您需要指定MaxCompute的Tunnel Endpoint,可以通过脚本模式手动配置数据源:将上述示例中的"datasource":"",替换为数据源的具体参数,示例如下。

"accessId":"<yourAccessKeyId>",
 "accessKey":"<yourAccessKeySecret>",
 "endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
 "odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"**********", 

MaxCompute Writer脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。

table

写入的数据表的表名称(大小写不敏感),不支持填写多张表。

partition

需要写入数据表的分区信息,必须指定到最后一级分区。例如把数据写入一个三级分区表,必须配置到最后一级分区,例如pt=20150101, type=1, biz=2

  • 对于非分区表,该值务必不要填写,表示直接导入至目标表。

  • MaxCompute Writer不支持数据路由写入,对于分区表请务必保证写入数据到最后一级分区。

如果表为分区表,则必填。如果表为非分区表,则不能填写。

column

需要导入的字段列表。当导入全部字段时,可以配置为"column": ["*"]。当需要插入部分MaxCompute列,则填写部分列,例如"column": ["id","name"]

  • MaxCompute Writer支持列筛选、列换序。例如一张表中有a、b和c三个字段,您只同步c和b两个字段,则可以配置为"column": ["c","b"],在导入过程中,字段a自动补空,设置为null。

  • column必须显示指定同步的列集合,不允许为空。

truncate

通过配置"truncate": "true"保证写入的幂等性。即当出现写入失败再次运行时,MaxCompute Writer将清理前述数据,并导入新数据,可以保证每次重跑之后的数据都保持一致 。

因为利用MaxCompute SQL进行数据清理工作,SQL无法保证原子性,所以truncate选项不是原子操作。当多个任务同时向一个TablePartition清理分区时,可能出现并发时序问题,请务必注意。

针对该类问题,建议您尽量不要多个作业DDL同时操作同一个分区,或者在多个并发作业启动前,提前创建分区。

  • 本页导读 (1)
文档反馈