MaxCompute

更新时间:
复制 MD 格式

The MaxCompute data source is a data hub that provides a bidirectional channel for reading from and writing to MaxCompute.

Features

Note

The MaxCompute data source in DataWorks uses a tunnel endpoint to access the tunnel service of a MaxCompute project. This lets you synchronize data by uploading it to or downloading it from the project.

For MaxCompute data sources created after December 11, 2023, if the DataWorks service and the target MaxCompute project are in different regions, you cannot directly synchronize data by using a tunnel endpoint. You must first purchase a Cloud Enterprise Network (CEN) instance to establish a network connection. Cross-region data synchronization is possible only after the connection is established. For more information about CEN and its operations, see Cloud Enterprise Network (CEN).

Batch read

  • MaxCompute Reader supports reading data from partitioned and non-partitioned tables, but not from virtual views or external tables.

  • When you perform a batch read from a MaxCompute partitioned table, you cannot directly configure field mapping for partition key columns. To synchronize partition key values, you can add a custom field, manually enter the partition name, and then configure the field mapping.

  • Use this method to synchronize data from a partition that corresponds to the scheduling time.

    For example, a partitioned table named t0 contains the id and name columns. The level-1 partition key is pt and the level-2 partition key is ds. To read data from the partition where pt=<Business date> and ds=hangzhou, you must specify the partition values as pt=${scheduling parameter} and ds=hangzhou when you configure the source. Then, you can configure the field mapping for the id and name columns.

  • You can write partition key columns to a destination table by adding them as custom fields.

  • MaxCompute Reader supports data filtering by using a WHERE clause.

Batch write

  • MaxCompute Writer does not support the VARCHAR data type if the source data contains null values.

  • If the destination table is a DeltaTable, expand Advanced Configuration and set Visible After Synchronization to Yes. Otherwise, the task reports an error if the concurrency is greater than 1.

  • Synchronizing data from a source to a MaxCompute external table is not supported.

  • If a column in the destination table is not mapped to a source column, Data Integration sets its value to null after synchronization, even if a default value was specified when the table was created.

Real-time write

  • Real-time data synchronization tasks support serverless resource groups.

  • Real-time data synchronization tasks do not support tables without a primary key.

  • Synchronizing data from a source to a MaxCompute external table is not supported.

  • When data is synchronized in real time to the default MaxCompute data source (usually odps_first), a temporary AccessKey is used by default. This temporary AccessKey automatically expires after seven days, causing the task to fail. When the platform detects that a task failed due to the temporary AccessKey, it automatically restarts the task. If you have configured this type of monitoring alert for the task, you will receive an alert.

  • On the same day you configure a one-click real-time synchronization task to MaxCompute, you can query only the historical full data. You can query incremental data in MaxCompute only after it is merged the next day.

  • A one-click real-time synchronization task to MaxCompute generates a full data partition each day. To prevent excessive storage consumption, the automatically created MaxCompute table has a default 30-day lifecycle. If this duration is unsuitable, you can modify the lifecycle by clicking the MaxCompute table name during task configuration.

  • Data Integration uses the data synchronization channels of the MaxCompute engine for data uploads and downloads. For information about the service level agreement (SLA) for these channels, see Data transfer service (upload) scenarios and tools. Evaluate your data synchronization methods based on the SLA of these channels.

  • When using one-click real-time synchronization to MaxCompute in instance mode, the exclusive resource group for Data Integration requires at least 8 vCPUs and 16 GiB of memory.

  • Data Integration supports only user-created MaxCompute data sources in the same region as the current workspace. Although a connection test for a data source in a different region may succeed, the synchronization task will fail during table creation and report an "engine does not exist" error.

  • When MaxCompute is the destination for whole-database synchronization, it supports only the whole-database full and incremental task and the real-time incremental stream mode for regular tables. If the destination table is a DeltaTable, both real-time synchronization and the whole-database full and incremental task are supported.

    Note

    When you use a user-created MaxCompute data source, you must still associate a MaxCompute engine with your DataWorks project. Otherwise, MaxCompute SQL nodes cannot be created, which prevents the 'done' marker node for full synchronization from being created.

Supported data types

MaxCompute supports 1.0, 2.0, and Hive-compatible data types. The following sections describe the supported data types for each version.

MaxCompute 1.0 data types

Type

Batch read

Batch write

Real-time write

BIGINT

Supported

Supported

Supported

DOUBLE

Supported

Supported

Supported

DECIMAL

Supported

Supported

Supported

STRING

Supported

Supported

Supported

DATETIME

Supported

Supported

Supported

BOOLEAN

Supported

Supported

Supported

ARRAY

Supported

Supported

Supported

MAP

Supported

Supported

Supported

STRUCT

Supported

Supported

Supported

MaxCompute 2.0 and Hive-compatible data types

Type

Batch read (MaxCompute Reader)

Batch write (MaxCompute Writer)

Real-time write

TINYINT

Supported

Supported

Supported

SMALLINT

Supported

Supported

Supported

INT

Supported

Supported

Supported

BIGINT

Supported

Supported

Supported

BINARY

Supported

Supported

Supported

FLOAT

Supported

Supported

Supported

DOUBLE

Supported

Supported

Supported

DECIMAL(precision,scale)

Supported

Supported

Supported

VARCHAR(n)

Supported

Supported

Supported

CHAR(n)

Not supported

Supported

Supported

STRING

Supported

Supported

Supported

DATE

Supported

Supported

Supported

DATETIME

Supported

Supported

Supported

TIMESTAMP

Supported

Supported

Supported

BOOLEAN

Supported

Supported

Supported

ARRAY

Supported

Supported

Supported

MAP

Supported

Supported

Supported

STRUCT

Supported

Supported

Supported

Data type conversion

The following table describes the data type conversion rules for MaxCompute Reader.

Category

Data Integration type

MaxCompute data type

Integer

LONG

BIGINT, INT, TINYINT, and SMALLINT

Boolean

BOOLEAN

BOOLEAN

Date and time

DATE

DATETIME, TIMESTAMP, and DATE

Floating point

DOUBLE

FLOAT, DOUBLE, and DECIMAL

Binary

BYTES

BINARY

Complex

STRING

ARRAY, MAP, and STRUCT

Important

If data type conversion or writing data to the destination data source fails, the data is treated as dirty data. You can use the tolerance threshold for dirty data to control how the system handles these records.

Prerequisites

When reading from or writing to a MaxCompute table, you can enable specific properties as needed.

Connect to MaxCompute and enable project-level configurations

  • Log on to the MaxCompute client. For more information, see Connect using the local client (odpscmd).

  • Enable the required project-level configurations in MaxCompute. These operations require project owner permissions. For more information about MaxCompute permissions, see Role planning.

Enable ACID semantics

To enable ACID semantics, run the following command on the client as a project owner. For more information about MaxCompute ACID semantics, see ACID semantics.

setproject odps.sql.acid.table.enable=true;

(Optional) Enable V2.0 data types

To use the TIMESTAMP data type, run the following command on the client as a project owner to enable MaxCompute V2.0 data types.

setproject odps.sql.type.system.odps2=true;

(Optional) Account authorization

When you bind a MaxCompute compute engine to a workspace, a MaxCompute data source is automatically created in DataWorks. You can use this data source for data synchronization within the current workspace. To synchronize data from this MaxCompute data source in another workspace, ensure the access account for the data source in the other workspace has the required permissions to access this MaxCompute compute engine. For information about cross-account authorization, see Cross-account authorization (MaxCompute and Hologres).

Create a MaxCompute data source

Before you develop data synchronization tasks, you must add your MaxCompute project as a MaxCompute data source in DataWorks. For detailed instructions, see Bind a MaxCompute compute resource.

Note
  • Workspaces in standard mode support data source isolation. You can add and isolate data sources in your development and production environments to enhance data security. For more information, see Isolate data sources in development and production environments.

  • The odps_first data source is a default data source if it appears in your workspace without being manually created. It was automatically created by the first MaxCompute engine bound to your workspace before the data source feature was upgraded. When you perform data synchronization, selecting this data source means that data is read from or written to that MaxCompute engine project.

    On the data source's configuration page, you can view the name of the MaxCompute project it uses to confirm which project the data is read from or written to. For more information, see Data source management.

Data synchronization tasks

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Single-table batch synchronization task

Single-table real-time synchronization task

See Configure a single-table real-time synchronization task.

Full-database synchronization task

See Configure a full-database batch synchronization task, Configure a full-database real-time synchronization task, and Configure a full-database full and incremental synchronization task.

FAQ

For more FAQs on Data Integration, see Data Integration.

Appendix: Script demos and parameters

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script demo

Important

Before you run the code, remove the comments.

{
    "type":"job",
    "version":"2.0",
    "steps":[
        {
            "stepType":"odps",// The plugin name.
            "parameter":{
                "partition":[],// The partitions to read data from.
                "isCompress":false,// Specifies whether to compress data.
                "datasource":"",// The data source.
                "column":[// The columns in the source table.
                    "id"
                ],
                "where": "",// The WHERE clause for data filtering.
                "enableWhere":false,// Specifies whether to use a WHERE clause for data filtering.
                "table":""// The table name.
            },
            "name":"Reader",
            "category":"reader"
        },
        { 
            "stepType":"stream",
            "parameter":{
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The error record count.
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. If this parameter is set to true, the mbps parameter takes effect.
            "concurrent":1, // The job concurrency.
            "mbps":"12"// The throttling rate. 1 mbps = 1 MB/s.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

To specify a MaxCompute Tunnel endpoint, you can manually configure the data source in script mode. Replace the "datasource":"", line in the preceding sample with your data source parameters. For example:

"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"*****", 

Reader script parameters

Parameter

Description

Required

Default

datasource

The name of the data source. In script mode, this value must match the name of the added data source.

Yes

None

table

The name of the source table. The name is not case-sensitive.

Yes

None

partition

The partition to read data from.

  • You can use Linux shell wildcards to specify partitions. The asterisk (*) matches zero or more characters and the question mark (?) matches a single character.

  • By default, the job fails if a specified partition does not exist. To allow the job to succeed even if the partition is missing, you can add "successOnNoPartition": true to the ODPS parameter section in script mode.

For example, consider a partitioned table named test with four partitions: pt=1,ds=hangzhou, pt=1,ds=shanghai, pt=2,ds=hangzhou, and pt=2,ds=beijing. The following examples show how to configure this parameter:

  • To read data from the pt=1,ds=hangzhou partition, set this parameter to "partition":"pt=1,ds=hangzhou".

  • To read data from all partitions where pt=1, set this parameter to "partition":"pt=1,ds=*".

  • To read data from all partitions in the test table, set this parameter to "partition":"pt=*,ds=*".

You can also specify conditions to select partitions:

  • To select the latest partition, add /*query*/ ds=(select MAX(ds) from DataXODPSReaderPPR).

  • To filter partitions based on a condition, add a /*query*/ pt+expression expression. For example, /*query*/ pt>=20170101 and pt<20170110 selects all data from partitions where the pt value is between January 1, 2017 (inclusive) and January 10, 2017 (exclusive).

Note

The content after /*query*/ is treated as a WHERE clause.

Required only for partitioned tables.

None

column

The columns to read from the MaxCompute source table. For example, a table named test contains the id, name, and age columns:

  • To read the id, name, and age columns in order, set this parameter to "column":["id","name","age"] or "column":["*"].

    Note

    We recommend against using * to specify all columns. If you use *, the system reads all columns in their default order. Changes to the order, type, or number of columns in the source table can cause a data mismatch with the destination table, which can cause incorrect results or job failure.

  • To read the name and id columns in order, set this parameter to "column":["name","id"].

  • You can add a constant column to match the column structure of a destination table. For example, to read the age column, the name column, a constant date 1988-08-08 08:08:08, and the id column, set this parameter to "column":["age","name","'1988-08-08 08:08:08'","id"]. Enclose the constant value in single quotation marks (').

    The system identifies a field as a constant column if it starts and ends with a single quotation mark ('). The system uses the value inside the marks, removing the surrounding ' characters.

    Note
    • You can use MaxCompute functions in the column parameter only when data filtering is enabled (enableWhere is true and where is not empty).

    • You must specify the columns to synchronize. This parameter cannot be empty.

Yes

None

enableWhere

Specifies whether to use a WHERE clause for data filtering.

No

false

where

The WHERE clause to use for data filtering.

No

None

Writer script demo

The following code is a sample configuration.

{
    "type":"job",
    "version":"2.0",// The version number.
    "steps":[
        {
            "stepType":"stream",
            "parameter":{},
            "name":"Reader",
            "category":"reader"
        },
        {
            "stepType":"odps",// The plugin name.
            "parameter":{
                "partition":"",// The destination partition.
                "truncate":true,// The cleanup rule.
                "isCompress":false,// Specifies whether to compress data.
                "datasource":"odps_first",// The data source name.
            "column": [// The destination column names.
                "id",
                "name",
                "age",
                "sex",
                "salary",
                "interest"
                ],
                "table":""// The destination table name.
            },
            "name":"Writer",
            "category":"writer"
        }
    ],
    "setting":{
        "errorLimit":{
            "record":"0"// The error record count, which is the maximum number of dirty data records allowed.
        },
        "speed":{
            "throttle":true,// Specifies whether to enable throttling. If this parameter is set to true, the mbps parameter takes effect.
            "concurrent":1, // The job concurrency.
            "mbps":"12"// The throttling rate. 1 mbps = 1 MB/s.
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

To specify a MaxCompute Tunnel endpoint, you can manually configure the data source in script mode. Replace the "datasource":"", line in the preceding sample with your data source parameters. The following code provides an example.

"accessId":"<yourAccessKeyId>",
 "accessKey":"<yourAccessKeySecret>",
 "endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
 "odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api", 
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com", 
"project":"**********", 

Writer script parameters

Parameter

Description

Required

Default

datasource

The name of the data source. In script mode, this value must match the name of the added data source.

Yes

None

table

The name of the destination table. The name is not case-sensitive. You can specify only one table.

Yes

None

partition

The partition to write data to. You must specify the lowest-level partition. For example, to write to a table with three partition levels, you must specify the full partition path, such as pt=20150101, type=1, biz=2:

  • For a non-partitioned table, do not configure this parameter. Data is written directly to the table.

  • MaxCompute Writer cannot route data to different partitions, so you must write to a single, lowest-level partition.

Required only for partitioned tables.

None

column

Specifies the destination columns for the data. To write data to all columns, set this parameter to "column": ["*"]. To write data to a subset of columns, specify their names, for example, "column": ["id","name"]:

  • MaxCompute Writer supports column filtering and reordering. For example, if a table has columns a, b, and c, and you want to synchronize only columns c and b, you can set this parameter to "column": ["c","b"]. During the synchronization, column a is automatically filled with null values.

  • You must specify the columns to synchronize. This parameter cannot be empty.

Yes

None

truncate

Setting "truncate": "true" ensures the write operation is idempotent. If a job fails and is rerun, MaxCompute Writer first cleans up data from the previous run and then writes the new data. This guarantees data consistency across reruns.

MaxCompute Writer uses MaxCompute SQL for the data cleanup. Because this is not an atomic operation, the truncate option is not atomic. Consequently, running concurrent cleanup jobs on the same table or partition can cause timing issues.

To prevent such issues, avoid running DDL operations from multiple jobs on the same partition simultaneously. Alternatively, you can create the partitions before starting the concurrent jobs.

Yes

None

emptyAsNull

Specifies whether to convert an empty string to a null value before writing.

No

false

consistencyCommit

Controls data visibility.

  • true: Data becomes visible only after the synchronization job completes successfully. However, the job fails if the data volume exceeds 1 TB. This is because MaxCompute has a limit of 300,000 blocks per synchronization.

  • false: Data being written to MaxCompute can be queried before the job is complete. This means that downstream applications that use this table may read incomplete data during the synchronization process.

No

false