MaxCompute数据源作为数据中枢,为您提供读取和写入数据至MaxCompute的双向通道。
使用限制
DataWorks的MaxCompute数据源可使用Tunnel Endpoint地址访问相应MaxCompute项目的Tunnel服务,从而通过上传、下载等方式同步该项目的数据,使用Tunnel服务时的上传与下载会涉及DownloadTable操作。
2023年12月11日之后创建的MaxCompute数据源,若数据源所在的DataWorks服务与需要访问的MaxCompute项目不在同一地域,则无法直接通过Tunnel Endpoint地址同步MaxCompute项目的数据。该场景下,您需先购买云企业网,连通DataWorks服务与MaxCompute项目的网络,网络连通后才可跨地域执行数据同步操作。云企业网的介绍及相关操作,请参见云企业网。
离线读
MaxCompute Reader支持读取分区表、非分区表,不支持读取虚拟视图、不支持同步外部表。
离线读MaxCompute分区表时,不支持直接对分区字段进行字段映射配置,需要在配置数据来源时指定待同步数据的分区信息。
例如,分区表t0的字段包含id、name两个字段,一级分区为pt,二级分区为ds。读取t0的pt=1,ds=hangzhou分区数据时,您需要在配置数据来源时指定分区值为pt=1,ds=hangzhou,后续字段映射配置时进行id、name字段的映射配置。
MaxCompute Reader支持使用WHERE进行数据过滤。
离线写
当数据有null值时,MaxCompute Writer不支持VARCHAR类型。
如果写入的表为
DeltaTable
,请将同步完成才可见设为是,否则在并发大于1的场景下,任务将会报错。
实时写
实时数据同步任务仅支持使用独享数据集成资源组。
实时同步节点目前仅支持同步PolarDB、Oracle、MySQL数据源至MaxCompute。
实时数据同步任务暂不支持同步没有主键的表。
当实时同步至MaxCompute默认数据源(一般为
odps_first
)时,默认使用临时AK进行同步,临时AK超过7天会自动过期,同时,将导致任务运行失败。平台检测到临时AK导致任务失败时会自动重启任务,如果任务配置了该类型的监控报警,您将会收到报警信息。一键实时同步至MaxCompute任务配置当天仅能查询历史全量数据,增量数据需要等待第二天merge完成后才可在MaxCompute查询。
一键实时同步至MaxCompute任务每天会生成一个全量分区,为避免数据过多占用存储资源,本方案任务自动建立的MaxCompute表,默认生命周期为30天。如果时长不满足您的业务需求,可以在配置同步任务时单击对应的MaxCompute表名修改生命周期。
数据集成使用MaxCompute引擎同步数据通道进行数据上传和下载(同步数据通道SLA详情请参见数据传输服务(上传)场景与工具),请根据MaxCompute引擎同步数据通道SLA评估数据同步业务技术选型。
一键实时同步至MaxCompute,按实例模式同步时,独享数据集成资源组规格最低需要为8C16G。
仅支持与当前工作空间同地域的自建MaxCompute数据源,跨地域的MaxCompute项目在测试数据源服务连通性时可以正常连通,但同步任务执行时,在MaxCompute建表阶段会报引擎不存在的错误。
说明使用自建MaxCompute数据源时,DataWorks项目仍然需要绑定MaxCompute引擎,否则将无法创建MaxCompute SQL节点,导致全量同步标done节点创建失败。
注意事项
如果目标表列没有配置和源端的列映射,则同步任务执行完成后,此列值为空,即使此目标列建表时指定了默认值,列值仍为空。
支持的字段类型
支持MaxCompute的1.0数据类型、2.0数据类型、Hive兼容数据类型。不同数据类型版本支持的字段类型详情如下。
1.0数据类型支持的字段
字段类型 | 离线读 | 离线写 | 实时写 |
BIGINT | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
DECIMAL | 支持 | 支持 | 支持 |
STRING | 支持 | 支持 | 支持 |
DATETIME | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
ARRAY | 支持 | 支持 | 支持 |
MAP | 支持 | 支持 | 支持 |
STRUCT | 支持 | 支持 | 支持 |
2.0数据类型、Hive兼容数据类型支持的字段
字段类型 | 离线读(MaxCompute Reader) | 离线写(MaxCompute Writer) | 实时写 |
TINYINT | 支持 | 支持 | 支持 |
SMALLINT | 支持 | 支持 | 支持 |
INT | 支持 | 支持 | 支持 |
BIGINT | 支持 | 支持 | 支持 |
BINARY | 支持 | 支持 | 支持 |
FLOAT | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
DECIMAL(pecision,scale) | 支持 | 支持 | 支持 |
VARCHAR(n) | 支持 | 支持 | 支持 |
CHAR(n) | 不支持 | 支持 | 支持 |
STRING | 支持 | 支持 | 支持 |
DATE | 支持 | 支持 | 支持 |
DATETIME | 支持 | 支持 | 支持 |
TIMESTAMP | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
ARRAY | 支持 | 支持 | 支持 |
MAP | 支持 | 支持 | 支持 |
STRUCT | 支持 | 支持 | 支持 |
数据类型转换说明
MaxCompute Reader针对MaxCompute的类型转换列表,如下所示。
类型分类 | 数据集成配置类型 | 数据库数据类型 |
整数类 | LONG | BIGINT、INT、TINYINT和SMALLINT |
布尔类 | BOOLEAN | BOOLEAN |
日期时间类 | DATE | DATETIME、TIMESTAMP和DATE |
浮点类 | DOUBLE | FLOAT、DOUBLE和DECIMAL |
二进制类 | BYTES | BINARY |
复杂类 | STRING | ARRAY、MAP和STRUCT |
如果数据转换失败,或数据写出至目的端数据源失败,则将数据作为脏数据,您可以配合脏数据限制阈值使用。
数据同步前准备:MaxCompute环境准备
读取或写入MaxCompute表数据时,您可以根据需要选择是否开启相关属性。
连接MaxCompute并开启项目级配置
登录MaxCompute客户端,详情请参见使用本地客户端(odpscmd)连接。
开启MaxCompute项目级相关配置:请确认是否已拥有对应的操作权限,您可使用Project Owner账号执行相关操作,关于MaxCompute权限说明,详情请参见角色规划。
开启acid属性
您可以使用Project Owner账号在客户端执行以下命令开启acid属性,关于MaxCompute ACID语义说明,详情请参见ACID语义。
setproject odps.sql.acid.table.enable=true;
(可选)开启2.0数据类型
如果需要使用MaxCompute数据2.0类型中的timestamp类型,您需要使用Project Owner账号在客户端执行以下命令开启数据2.0。
setproject odps.sql.type.system.odps2=true;
(可选)创建账号
工作空间绑定MaxCompute引擎时,默认将在DataWorks生成一个MaxCompute数据源,在当前工作空间可使用该默认引擎数据源进行数据同步,若您需要在其他空间同步当前工作空间的MaxCompute数据源,您需要创建Accesskey ID和Accesskey Secret,以便在其他工作空间创建数据源并使用该数据源时,可基于您的身份访问该引擎数据。
创建个人Accesskey ID和Accesskey Secret,操作详情请参见准备阿里云账号。
创建MaxCompute数据源,详情请参见配置MaxCompute数据源。
数据同步任务开发:MaxCompute同步流程引导
MaxCompute数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。
创建MaxCompute数据源
进行数据同步任务开发前,您需要在DataWorks上将MaxCompute项目创建为MaxCompute数据源。创建MaxCompute数据源的详情操作,请参见创建MaxCompute数据源。
标准模式的工作空间支持数据源隔离功能,您可以分别添加并隔离开发环境和生产环境的数据源,以保护您的数据安全。详情请参见数据源开发和生产环境隔离。
若工作空间中名为odps_first的MaxCompute数据源非人为在数据源界面创建,则该数据源为数据源改版前,您当前工作空间绑定的第一个MaxCompute引擎默认创建的数据源。进行数据同步时,如选择此数据源,则表示您要将数据读取或写入至该MaxCompute引擎项目中。
您可在数据源配置页面,查看数据源使用的MaxCompute项目名称,确认数据最终读取或写入至哪一个MaxCompute项目。详情请参见管理数据源。
单表离线同步任务配置指导
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
单表实时同步任务配置指导
操作流程请参见配置单表增量数据实时同步、DataStudio侧实时同步任务配置。
整库离线、整库(实时)全增量、整库(实时)分库分表等整库级别同步配置指导
操作流程请参见数据集成侧同步任务配置。
常见问题
更多其他数据集成常见问题请参见数据集成常见问题。
附录:脚本Demo与参数说明
附录:离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
MaxCompute Reader脚本Demo
实际运行时,请删除下述代码中的注释。
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"odps",//插件名。
"parameter":{
"partition":[],//读取数据所在的分区。
"isCompress":false,//是否压缩。
"datasource":"",//数据源。
"column":[//源头表的列信息。
"id"
],
"where": "",//使用WHERE数据过滤时,填写具体WHERE子句内容。
"enableWhere":false,//是否使用WHERE进行数据过滤。
"table":""//表名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
如果您需要指定MaxCompute的Tunnel Endpoint,可以通过脚本模式手动配置数据源。将上述示例中的"datasource":"",
替换为数据源的具体参数,示例如下。
"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"*****",
MaxCompute Reader脚本参数
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称。脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
table | 读取数据表的表名称(大小写不敏感)。 | 是 | 无 |
partition | 读取的数据所在的分区信息。
例如,分区表test包含pt=1,ds=hangzhou、pt=1,ds=shanghai、pt=2,ds=hangzhou、pt=2,ds=beijing四个分区,则读取不同分区数据的配置如下:
此外,您还可以根据实际需求设置分区数据的获取条件:
说明
| 如果表为分区表,则必填。如果表为非分区表,则不能填写。 | 无 |
column | 读取MaxCompute源头表的列信息。例如表test的字段为id、name和age:
| 是 | 无 |
MaxCompute Writer脚本Demo
脚本配置样例如下。
{
"type":"job",
"version":"2.0",//版本号。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",//插件名。
"parameter":{
"partition":"",//分区信息。
"truncate":true,//清理规则。
"compress":false,//是否压缩。
"datasource":"odps_first",//数据源名。
"column": [//源端列名。
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"table":""//表名。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数,表示脏数据的最大容忍条数。
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
如果您需要指定MaxCompute的Tunnel Endpoint,可以通过脚本模式手动配置数据源:将上述示例中的"datasource":"",
替换为数据源的具体参数,示例如下。
"accessId":"<yourAccessKeyId>",
"accessKey":"<yourAccessKeySecret>",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********",
MaxCompute Writer脚本参数
参数 | 描述 | 是否必选 | 默认值 |
datasource | 数据源名称,脚本模式支持添加数据源,该配置项填写的内容必须与添加的数据源名称保持一致。 | 是 | 无 |
table | 写入的数据表的表名称(大小写不敏感),不支持填写多张表。 | 是 | 无 |
partition | 需要写入数据表的分区信息,必须指定到最后一级分区。例如把数据写入一个三级分区表,必须配置到最后一级分区,例如
| 如果表为分区表,则必填。如果表为非分区表,则不能填写。 | 无 |
column | 需要导入的字段列表。当导入全部字段时,可以配置为
| 是 | 无 |
truncate | 通过配置 因为利用MaxCompute SQL进行数据清理工作,SQL无法保证原子性,所以truncate选项不是原子操作。当多个任务同时向一个Table或Partition清理分区时,可能出现并发时序问题,请务必注意。 针对该类问题,建议您尽量不要多个作业DDL同时操作同一个分区,或者在多个并发作业启动前,提前创建分区。 | 是 | 无 |