The MaxCompute data source is a data hub that provides a bidirectional channel for reading from and writing to MaxCompute.
Features
The MaxCompute data source in DataWorks uses a tunnel endpoint to access the tunnel service of a MaxCompute project. This lets you synchronize data by uploading it to or downloading it from the project.
For MaxCompute data sources created after December 11, 2023, if the DataWorks service and the target MaxCompute project are in different regions, you cannot directly synchronize data by using a tunnel endpoint. You must first purchase a Cloud Enterprise Network (CEN) instance to establish a network connection. Cross-region data synchronization is possible only after the connection is established. For more information about CEN and its operations, see Cloud Enterprise Network (CEN).
Batch read
MaxCompute Reader supports reading data from partitioned and non-partitioned tables, but not from virtual views or external tables.
When you perform a batch read from a MaxCompute partitioned table, you cannot directly configure field mapping for partition key columns. To synchronize partition key values, you can add a custom field, manually enter the partition name, and then configure the field mapping.
Use this method to synchronize data from a partition that corresponds to the scheduling time.
For example, a partitioned table named t0 contains the
idandnamecolumns. The level-1 partition key isptand the level-2 partition key isds. To read data from the partition wherept=<Business date>andds=hangzhou, you must specify the partition values aspt=${scheduling parameter}andds=hangzhouwhen you configure the source. Then, you can configure the field mapping for theidandnamecolumns.You can write partition key columns to a destination table by adding them as custom fields.
MaxCompute Reader supports data filtering by using a WHERE clause.
Batch write
MaxCompute Writer does not support the VARCHAR data type if the source data contains null values.
If the destination table is a
DeltaTable, expand Advanced Configuration and set Visible After Synchronization to Yes. Otherwise, the task reports an error if the concurrency is greater than 1.Synchronizing data from a source to a MaxCompute external table is not supported.
If a column in the destination table is not mapped to a source column, Data Integration sets its value to null after synchronization, even if a default value was specified when the table was created.
Real-time write
Real-time data synchronization tasks support serverless resource groups.
Real-time data synchronization tasks do not support tables without a primary key.
Synchronizing data from a source to a MaxCompute external table is not supported.
When data is synchronized in real time to the default MaxCompute data source (usually
odps_first), a temporary AccessKey is used by default. This temporary AccessKey automatically expires after seven days, causing the task to fail. When the platform detects that a task failed due to the temporary AccessKey, it automatically restarts the task. If you have configured this type of monitoring alert for the task, you will receive an alert.On the same day you configure a one-click real-time synchronization task to MaxCompute, you can query only the historical full data. You can query incremental data in MaxCompute only after it is merged the next day.
A one-click real-time synchronization task to MaxCompute generates a full data partition each day. To prevent excessive storage consumption, the automatically created MaxCompute table has a default 30-day lifecycle. If this duration is unsuitable, you can modify the lifecycle by clicking the MaxCompute table name during task configuration.
Data Integration uses the data synchronization channels of the MaxCompute engine for data uploads and downloads. For information about the service level agreement (SLA) for these channels, see Data transfer service (upload) scenarios and tools. Evaluate your data synchronization methods based on the SLA of these channels.
When using one-click real-time synchronization to MaxCompute in instance mode, the exclusive resource group for Data Integration requires at least 8 vCPUs and 16 GiB of memory.
Data Integration supports only user-created MaxCompute data sources in the same region as the current workspace. Although a connection test for a data source in a different region may succeed, the synchronization task will fail during table creation and report an "engine does not exist" error.
When MaxCompute is the destination for whole-database synchronization, it supports only the whole-database full and incremental task and the real-time incremental stream mode for regular tables. If the destination table is a DeltaTable, both real-time synchronization and the whole-database full and incremental task are supported.
NoteWhen you use a user-created MaxCompute data source, you must still associate a MaxCompute engine with your DataWorks project. Otherwise, MaxCompute SQL nodes cannot be created, which prevents the 'done' marker node for full synchronization from being created.
Supported data types
MaxCompute supports 1.0, 2.0, and Hive-compatible data types. The following sections describe the supported data types for each version.
MaxCompute 1.0 data types
Type | Batch read | Batch write | Real-time write |
BIGINT | Supported | Supported | Supported |
DOUBLE | Supported | Supported | Supported |
DECIMAL | Supported | Supported | Supported |
STRING | Supported | Supported | Supported |
DATETIME | Supported | Supported | Supported |
BOOLEAN | Supported | Supported | Supported |
ARRAY | Supported | Supported | Supported |
MAP | Supported | Supported | Supported |
STRUCT | Supported | Supported | Supported |
MaxCompute 2.0 and Hive-compatible data types
Type | Batch read (MaxCompute Reader) | Batch write (MaxCompute Writer) | Real-time write |
TINYINT | Supported | Supported | Supported |
SMALLINT | Supported | Supported | Supported |
INT | Supported | Supported | Supported |
BIGINT | Supported | Supported | Supported |
BINARY | Supported | Supported | Supported |
FLOAT | Supported | Supported | Supported |
DOUBLE | Supported | Supported | Supported |
DECIMAL(precision,scale) | Supported | Supported | Supported |
VARCHAR(n) | Supported | Supported | Supported |
CHAR(n) | Not supported | Supported | Supported |
STRING | Supported | Supported | Supported |
DATE | Supported | Supported | Supported |
DATETIME | Supported | Supported | Supported |
TIMESTAMP | Supported | Supported | Supported |
BOOLEAN | Supported | Supported | Supported |
ARRAY | Supported | Supported | Supported |
MAP | Supported | Supported | Supported |
STRUCT | Supported | Supported | Supported |
Data type conversion
The following table describes the data type conversion rules for MaxCompute Reader.
Category | Data Integration type | MaxCompute data type |
Integer | LONG | BIGINT, INT, TINYINT, and SMALLINT |
Boolean | BOOLEAN | BOOLEAN |
Date and time | DATE | DATETIME, TIMESTAMP, and DATE |
Floating point | DOUBLE | FLOAT, DOUBLE, and DECIMAL |
Binary | BYTES | BINARY |
Complex | STRING | ARRAY, MAP, and STRUCT |
If data type conversion or writing data to the destination data source fails, the data is treated as dirty data. You can use the tolerance threshold for dirty data to control how the system handles these records.
Prerequisites
When reading from or writing to a MaxCompute table, you can enable specific properties as needed.
Connect to MaxCompute and enable project-level configurations
Log on to the MaxCompute client. For more information, see Connect using the local client (odpscmd).
Enable the required project-level configurations in MaxCompute. These operations require project owner permissions. For more information about MaxCompute permissions, see Role planning.
Enable ACID semantics
To enable ACID semantics, run the following command on the client as a project owner. For more information about MaxCompute ACID semantics, see ACID semantics.
setproject odps.sql.acid.table.enable=true;(Optional) Enable V2.0 data types
To use the TIMESTAMP data type, run the following command on the client as a project owner to enable MaxCompute V2.0 data types.
setproject odps.sql.type.system.odps2=true;(Optional) Account authorization
When you bind a MaxCompute compute engine to a workspace, a MaxCompute data source is automatically created in DataWorks. You can use this data source for data synchronization within the current workspace. To synchronize data from this MaxCompute data source in another workspace, ensure the access account for the data source in the other workspace has the required permissions to access this MaxCompute compute engine. For information about cross-account authorization, see Cross-account authorization (MaxCompute and Hologres).
Create a MaxCompute data source
Before you develop data synchronization tasks, you must add your MaxCompute project as a MaxCompute data source in DataWorks. For detailed instructions, see Bind a MaxCompute compute resource.
Workspaces in standard mode support data source isolation. You can add and isolate data sources in your development and production environments to enhance data security. For more information, see Isolate data sources in development and production environments.
The odps_first data source is a default data source if it appears in your workspace without being manually created. It was automatically created by the first MaxCompute engine bound to your workspace before the data source feature was upgraded. When you perform data synchronization, selecting this data source means that data is read from or written to that MaxCompute engine project.
On the data source's configuration page, you can view the name of the MaxCompute project it uses to confirm which project the data is read from or written to. For more information, see Data source management.
Data synchronization tasks
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
Single-table batch synchronization task
See Configure batch synchronization in the codeless UI and Use the code editor.
For a full list of parameters and a sample script for the code editor, see Appendix: Sample script and parameters.
Single-table real-time synchronization task
See Configure a single-table real-time synchronization task.
Full-database synchronization task
See Configure a full-database batch synchronization task, Configure a full-database real-time synchronization task, and Configure a full-database full and incremental synchronization task.
FAQ
Notes on using "Add a row" or "Add a field" for field mapping in MaxCompute (ODPS) tables
How to synchronize a partition field in a MaxCompute (ODPS) table
How to synchronize data from multiple partitions in a MaxCompute (ODPS) table
How to perform column filtering, reordering, and null padding in MaxCompute
For more FAQs on Data Integration, see Data Integration.
Appendix: Script demos and parameters
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo
Before you run the code, remove the comments.
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"odps",// The plugin name.
"parameter":{
"partition":[],// The partitions to read data from.
"isCompress":false,// Specifies whether to compress data.
"datasource":"",// The data source.
"column":[// The columns in the source table.
"id"
],
"where": "",// The WHERE clause for data filtering.
"enableWhere":false,// Specifies whether to use a WHERE clause for data filtering.
"table":""// The table name.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The error record count.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. If this parameter is set to true, the mbps parameter takes effect.
"concurrent":1, // The job concurrency.
"mbps":"12"// The throttling rate. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}To specify a MaxCompute Tunnel endpoint, you can manually configure the data source in script mode. Replace the "datasource":"", line in the preceding sample with your data source parameters. For example:
"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"*****", Reader script parameters
Parameter | Description | Required | Default |
datasource | The name of the data source. In script mode, this value must match the name of the added data source. | Yes | None |
table | The name of the source table. The name is not case-sensitive. | Yes | None |
partition | The partition to read data from.
For example, consider a partitioned table named test with four partitions: pt=1,ds=hangzhou, pt=1,ds=shanghai, pt=2,ds=hangzhou, and pt=2,ds=beijing. The following examples show how to configure this parameter:
You can also specify conditions to select partitions:
Note The content after | Required only for partitioned tables. | None |
column | The columns to read from the MaxCompute source table. For example, a table named test contains the id, name, and age columns:
| Yes | None |
enableWhere | Specifies whether to use a WHERE clause for data filtering. | No | false |
where | The WHERE clause to use for data filtering. | No | None |
Writer script demo
The following code is a sample configuration.
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",// The plugin name.
"parameter":{
"partition":"",// The destination partition.
"truncate":true,// The cleanup rule.
"isCompress":false,// Specifies whether to compress data.
"datasource":"odps_first",// The data source name.
"column": [// The destination column names.
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"table":""// The destination table name.
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The error record count, which is the maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. If this parameter is set to true, the mbps parameter takes effect.
"concurrent":1, // The job concurrency.
"mbps":"12"// The throttling rate. 1 mbps = 1 MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}To specify a MaxCompute Tunnel endpoint, you can manually configure the data source in script mode. Replace the "datasource":"", line in the preceding sample with your data source parameters. The following code provides an example.
"accessId":"<yourAccessKeyId>",
"accessKey":"<yourAccessKeySecret>",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********", Writer script parameters
Parameter | Description | Required | Default |
datasource | The name of the data source. In script mode, this value must match the name of the added data source. | Yes | None |
table | The name of the destination table. The name is not case-sensitive. You can specify only one table. | Yes | None |
partition | The partition to write data to. You must specify the lowest-level partition. For example, to write to a table with three partition levels, you must specify the full partition path, such as
| Required only for partitioned tables. | None |
column | Specifies the destination columns for the data. To write data to all columns, set this parameter to
| Yes | None |
truncate | Setting MaxCompute Writer uses MaxCompute SQL for the data cleanup. Because this is not an atomic operation, the truncate option is not atomic. Consequently, running concurrent cleanup jobs on the same table or partition can cause timing issues. To prevent such issues, avoid running DDL operations from multiple jobs on the same partition simultaneously. Alternatively, you can create the partitions before starting the concurrent jobs. | Yes | None |
emptyAsNull | Specifies whether to convert an empty string to a null value before writing. | No | false |
consistencyCommit | Controls data visibility.
| No | false |