PolarDB data source

更新时间:
复制 MD 格式

The PolarDB data source allows you to read data from and write data to PolarDB. You can configure a synchronization task using the codeless UI or script mode.

Limitations

Batch read and write

You can read data from views.

Real-time read

If you use a PolarDB for MySQL cluster as the source, you must enable the binary log. PolarDB for MySQL is a cloud-native database that is fully compatible with MySQL. By default, PolarDB for MySQL uses high-level physical logs instead of the binary log. To better integrate with the MySQL ecosystem, PolarDB allows you to enable the binary log.

Supported data types

Batch read

The following table lists the data type mappings for PolarDB Reader.

Category

PolarDB data type

Integer

INT, TINYINT, SMALLINT, MEDIUMINT, and BIGINT

Floating-point

FLOAT, DOUBLE, and DECIMAL

String

VARCHAR, CHAR, TINYTEXT, TEXT, MEDIUMTEXT, and LONGTEXT

Date and time

DATE, DATETIME, TIMESTAMP, TIME, and YEAR

Boolean

BIT and BOOL

Binary

TINYBLOB, MEDIUMBLOB, BLOB, LONGBLOB, and VARBINARY

Note
  • Data types that are not listed in the table are not supported.

  • The PolarDB Reader plug-in treats TINYINT(1) as an integer.

Batch write

Similar to PolarDB Reader, PolarDB Writer supports most, but not all, PolarDB data types. Ensure that your data types are supported.

The following table lists the data type mappings for PolarDB Writer.

Category

PolarDB data type

Integer

INT, TINYINT, SMALLINT, MEDIUMINT, BIGINT, and YEAR

Floating-point

FLOAT, DOUBLE, and DECIMAL

String

VARCHAR, CHAR, TINYTEXT, TEXT, MEDIUMTEXT, and LONGTEXT

Date and time

DATE, DATETIME, TIMESTAMP, and TIME

Boolean

BOOL

Binary

TINYBLOB, MEDIUMBLOB, BLOB, LONGBLOB, and VARBINARY

Prerequisites

Configure an IP address whitelist

Add the CIDR block of the VPC that contains your Serverless resource group or Exclusive Resource Group for Data Integration to the IP address whitelist of your PolarDB cluster. For more information, see Set a cluster whitelist.

Create an account and grant permissions

Create an account and grant it the required permissions.

Create a database account for data synchronization. The account must have the SELECT, REPLICATION SLAVE, REPLICATION CLIENT permissions on the database.

  1. Create an account.

    For more information, see Create and manage a database account.

  2. Grant permissions.

    Run the following command to grant the required permissions to the account. Alternatively, you can grant the SUPER permission.

    -- CREATE USER 'sync_account'@'%' IDENTIFIED BY 'your_password';
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'sync_account'@'%';

Enable binary logging

For more information, see Enable binary logging.

Add a data source

Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data source management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.

Configure a synchronization task

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Configure a single-table batch task

Configure real-time sync for a table or database

For more information, see Configure a real-time synchronization task (legacy).

Configure full-database batch read and incremental real-time sync

For more information, see Configure a real-time full-database synchronization task.

FAQ

Why are errors repeatedly reported when a real-time synchronization task runs to synchronize data from Oracle, PolarDB, or MySQL?

Appendix: Script demo and parameters

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script demo

The following code provides a script example for reading data from a single table in a database. For more information about the parameters, see the parameter description section.

{
    "type": "job",
    "steps": [
        {
            "parameter": {
                "datasource": "test_005",                // The name of the data source.
                "column": [                              // The source column names.
                    "id",
                    "name",
                    "age",
                    "sex",
                    "salary",
                    "interest"
                ],
                "where": "id=1001",                      // The filter condition.
                "splitPk": "id",                         // The sharding key.
                "table": "PolarDB_person",               // The source table name.
              	"useReadonly": "false"                   // Specifies whether to read data from a secondary database.
            },
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {}
    ],
    "version": "2.0",                                // The version number.
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {                              // The error tolerance.
            "record": ""
        },
        "speed": {
            "concurrent": 6,                         // The concurrency level.
            "throttle": true,                        // If throttle is set to false, the mbps parameter does not take effect and throttling is disabled. If throttle is set to true, throttling is enabled.
      "mbps":"12"                              // The throttling rate, in MB/s.
        }
    }
}

Reader script parameters

Parameter

Description

Required

Default

datasource

The name of the data source. In script mode, the value of this parameter must be the same as the name of the added data source.

Yes

None

table

The name of the source table from which you want to synchronize data.

Yes

None

useReadonly

Specifies whether to read data from a secondary database. If you set this parameter to true, data is read from the secondary database. If you omit this parameter, the default value false is used, and data is read from the primary database.

No

false

column

The columns in the source table that you want to synchronize. The value must be a JSON array. This parameter is required and cannot be empty. Example: ["*"].

  • You can select a subset of columns to export.

  • You can reorder the columns to be exported regardless of the table schema.

  • You can use constants. You must follow the SQL syntax. Example: ["id", "table","1","'mingya.wmy'","'null'", "to_char(a+1)","2.3","true"].

    • id: a regular column name.

    • table: a column name that is also a reserved keyword.

    • 1: an integer constant.

    • 'mingya.wmy': a string constant. Note that the string constant must be enclosed in single quotation marks.

    • 'null': a string constant.

    • to_char(a+1): a function that calculates the length of a string.

    • 2.3: a floating-point number.

    • true: a Boolean value.

  • The column parameter is required and cannot be empty. You must specify the columns that you want to synchronize.

Yes

None

splitPk

The sharding key. You can specify a column for the splitPk parameter to shard data, which enables concurrent processing and improves data synchronization efficiency.

  • We recommend that you use the primary key of the table as the value of the splitPk parameter. A primary key is usually evenly distributed, which prevents data hotspots in the created shards.

  • The splitPk parameter supports only integer-based data sharding. String, floating-point, date, and other data types are not supported. If you specify a column of an unsupported data type, data sharding is disabled and data is synchronized in a single thread.

  • If you do not specify the splitPk parameter or set the parameter to null, a single thread is used for data synchronization.

No

None

splitFactor

The sharding factor. This parameter specifies the number of shards. If you configure a concurrency level, the data is sharded into concurrency × splitFactor shards. For example, if the concurrency is 5 and splitFactor is 5, the data is split into 25 shards and processed by 5 concurrent threads.

Note

We recommend that you set this parameter to a value from 1 to 100. An excessively large value may cause an out-of-memory (OOM) error.

No

5

where

The filter condition. For example, to synchronize only the current day's data, set the where parameter to gmt_create>$bizdate.

  • The where condition allows you to efficiently perform incremental synchronization. If you do not specify a where clause, all data is synchronized.

  • We recommend that you do not specify LIMIT 10 as the where condition because this does not comply with the constraints of a WHERE clause.

No

None

querySql (Advanced mode, not available in the codeless UI)

In some scenarios, the where parameter is insufficient for describing the filtering conditions. You can use this parameter to define a custom SQL query. When this parameter is configured, the data synchronization system ignores the column, table, and where parameters and directly uses the content of this parameter to filter the data. For example, to synchronize data after a multi-table join, use select a,b from table_a join table_b on table_a.id = table_b.id. When you configure querySql, PolarDB Reader directly ignores the column, table, and where parameters. The querySql parameter has a higher priority than the table, column, where, and splitPk parameters. The datasource is used to parse information such as the username and password.

No

None

Writer script demo

The following code provides a sample script configuration. For more information about the parameters, see the parameter description section.

{
    "type": "job",
    "steps": [
        {
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "parameter": {
                "postSql": [],                        // The SQL statement to be executed after the synchronization task is complete.
                "datasource": "test_005",             // The name of the data source.
                "column": [                           // The destination column names.
                    "id",
                    "name",
                    "age",
                    "sex",
                    "salary",
                    "interest"
                ],
                "writeMode": "insert",                // The write mode.
                "batchSize": 256,                     // The number of records to submit in each batch.
                "table": "PolarDB_person_copy",       // The destination table name.
                "preSql": []                          // The SQL statement to be executed before the synchronization task starts.
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "version": "2.0",                             // The version number.
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    },
    "setting": {
        "errorLimit": {                           // The error tolerance.
            "record": ""
        },
        "speed": {
            "throttle":true,                      // If throttle is set to false, the mbps parameter does not take effect and throttling is disabled. If throttle is set to true, throttling is enabled.
            "concurrent":6,                       // The concurrency level.
            "mbps":"12"                           // The throttling rate, in MB/s.
        }
    }
}

Writer script parameters

  • All parameters

    Parameter

    Description

    Required

    Default

    datasource

    The name of the data source. In script mode, the value of this parameter must be the same as the name of the added data source.

    Yes

    None

    table

    The name of the destination table to which you want to synchronize data.

    Yes

    None

    writeMode

    The write mode. Valid values:

    • insert: corresponds to INSERT INTO in the codeless UI.

    • update: corresponds to ON DUPLICATE KEY UPDATE in the codeless UI.

    • replace: corresponds to REPLACE INTO in the codeless UI.

    For more information about the modes and examples, see the Details of the writeMode parameter section below.

    Note

    For PolarDB for PostgreSQL, only the insert mode is supported. To update data and prevent primary key conflicts, delete the duplicate data before you run the batch synchronization task. The following methods are recommended:

    • Method 1: In the preSql parameter (which corresponds to Pre-Import Statement in the codeless UI), configure a TRUNCATE statement to clear the destination table.

    • Method 2: Process the destination table in an upstream node to prevent primary key conflicts during data synchronization.

    No

    insert

    column

    The destination columns to which you want to write data. Separate the columns with commas (,). Example: "column": ["id", "name", "age"]. To write data to all columns in order, use an asterisk (*). Example: "column": ["*"].

    Yes

    None

    preSql

    Specifies one or more SQL statements to execute before the task starts. The codeless UI supports a single statement, whereas script mode supports multiple statements, such as statements to clear existing data.

    No

    None

    postSql

    Specifies one or more SQL statements to execute after the task is complete. The codeless UI supports a single statement, whereas script mode supports multiple statements, such as a statement to add a timestamp.

    No

    None

    batchSize

    The number of records to submit in each batch. Larger values can improve throughput by reducing network interactions with PolarDB, but an excessively large value may cause an out-of-memory (OOM) error.

    No

    1024

    updateColumn

    The columns to be updated when a primary key or unique index conflict occurs. This parameter is valid only when writeMode is set to update. You can specify multiple columns, separated by commas. Example: "updateColumn": ["name", "age"].

    Note

    This parameter is supported only for PolarDB for MySQL.

    No

    None

  • Details of the writeMode parameter

    Comparison

    insert (corresponds to INSERT INTO in the codeless UI)

    update (corresponds to ON DUPLICATE KEY UPDATE in the codeless UI)

    replace (corresponds to REPLACE INTO in the codeless UI)

    Conflict handling strategy

    If a primary key or unique index conflict occurs, the conflicting row is not written to the destination table and is processed as dirty data.

    If no primary key or unique index conflict occurs, this mode functions the same as the INSERT INTO mode. If a conflict occurs, the new row updates only the specified fields of the existing row.

    If no primary key or unique index conflict occurs, this mode functions the same as the INSERT INTO mode. If a conflict occurs, the existing row is deleted and the new row is inserted, effectively replacing all of its fields.

    Data example

    • Source table

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+
    • Original destination table

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 2  | wangwu  |     |
      +----+---------+-----+
    • After the task is run, one row is written to the destination table and one row is recorded as dirty data.

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | wangwu  |     |
      +----+---------+-----+
    • Scenario 1: The task is configured to synchronize only some columns: "column": ["id","name"]

      • Source table

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
      • Original destination table

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 2  | wangwu  |  3  |
        +----+---------+-----+
      • After the task is run, two rows are written to the destination table and no dirty data is recorded.

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    | 3   |
        +----+---------+-----+
    • Scenario 2: The task is configured to synchronize all columns: "column": ["id","name","age"]

      • Source table

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
      • Original destination table

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 2  | wangwu  |  3  |
        +----+---------+-----+
      • After the task is run, two rows are written to the destination table and no dirty data is recorded.

        +----+---------+-----+
        | id | name    | age |
        +----+---------+-----+
        | 1  | zhangsan| 1   |
        | 2  | lisi    |     |
        +----+---------+-----+
    • Source table

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+
    • Original destination table

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 2  | wangwu  |  3  |
      +----+---------+-----+
    • After the task is run, two rows are written to the destination table and no dirty data is recorded.

      +----+---------+-----+
      | id | name    | age |
      +----+---------+-----+
      | 1  | zhangsan| 1   |
      | 2  | lisi    |     |
      +----+---------+-----+