Doris

更新时间:
复制 MD 格式

The Doris data source enables you to read data from and write data to Doris databases for large-scale data processing. This topic describes how to use DataWorks to synchronize data with Doris.

Supported data types

Different Doris versions support different data types and aggregation models. For information about all data types supported in each Doris version, see the official Doris documentation. The following table describes the mainly supported data types.

Data type

Supported model

Doris version

SMALLINT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

INT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

BIGINT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

LARGEINT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

FLOAT

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DOUBLE

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DECIMAL

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DECIMALV3

Aggregate,Unique,Duplicate

Versions later than 1.2.1, 2.x

DATE

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DATETIME

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

DATEV2

Aggregate,Unique,Duplicate

1.2.x, 2.x

DATATIMEV2

Aggregate,Unique,Duplicate

1.2.x, 2.x

CHAR

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

VARCHAR

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

STRING

Aggregate,Unique,Duplicate

0.x.x, 1.1.x, 1.2.x, 2.x

VARCHAR

Aggregate,Unique,Duplicate

1.1.x, 1.2.x, 2.x

ARRAY

Duplicate

1.2.x, 2.x

JSONB

Aggregate,Unique,Duplicate

1.2.x, 2.x

HLL

Aggregate

0.x.x, 1.1.x, 1.2.x, 2.x

BITMAP

Aggregate

0.x.x, 1.1.x, 1.2.x, 2.x

QUANTILE_STATE

Aggregate

1.2.x, 2.x

Prepare an ApsaraDB for OceanBase environment before data synchronization

Before you use DataWorks to synchronize data to a Doris data source, you must prepare a Doris environment. This ensures that a data synchronization task can be configured and can synchronize data to the Doris data source as expected. The following information describes how to prepare a Doris environment for data synchronization to a Doris data source.

Create an account and grant permissions

You must create an account that is used to log on to the Doris database for subsequent operations. You must specify a password for the account for subsequent connections to the Doris database. If you want to use the default root user of Doris to log on to the Doris database, you must specify a password for the root user. By default, the root user does not have a password. You can execute an SQL statement in Doris to specify the password:

SET PASSWORD FOR 'root' = PASSWORD('Password')

Configure the network connection for Doris

To use the StreamLoad method to write data, you need to access the private IP address of an FE node. If you access the public IP address of the FE node, you are redirected to the private IP address of a BE node. For more information about the redirection, see Data operation issues. In this case, you must establish network connections between your data source and a serverless resource group or an exclusive resource group for Data Integration to enable the resource group to access the data source over an internal network. For more information about how to establish a network connection between the Doris database and a resource group, see Network connectivity solutions.

Add a data source

Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data source management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.

Take note of the configuration requirements for the following configuration items of the Doris data source:

  • JdbcUrl: Enter the JDBC connection string, which includes the IP address, port number, database, and connection parameters. Both public and private IP addresses are supported. If you use a public IP address, make sure that the Data Integration resource group can access the host where your Doris instance is located.

  • FE endpoint: Enter the IP addresses and ports of the FE nodes. If your cluster has multiple FE nodes, you can enter multiple endpoints separated by commas, for example, ip1:port1,ip2:port2. When you test the connection, DataWorks tests the connectivity to all specified FE endpoints.

  • Username: Enter the username for accessing the Doris database.

  • Password: Enter the password that corresponds to the username.

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Appendix: Code and parameters

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script demo

{
  "type": "job",
  "version": "2.0",// The version number. 
  "steps": [
    {
      "stepType": "doris",// The plug-in name. 
      "parameter": {
        "column": [// The names of the columns. 
          "id"
        ],
        "connection": [
          {
            "querySql": [
              "select a,b from join1 c join join2 d on c.id = d.id;"
            ],
            "datasource": ""// The name of the data source. 
          }
        ],
        "where": "",// The WHERE clause. 
        "splitPk": "",// The shard key. 
        "encoding": "UTF-8"// The encoding format. 
      },
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {
      "record": "0"// The maximum number of dirty data records allowed. 
    },
    "speed": {
      "throttle": true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true. 
      "concurrent": 1,// The maximum number of parallel threads. 
      "mbps": "12"// The maximum transmission rate. Unit: MB/s. 
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Reader script parameters

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

table

The name of the table from which you want to read data. Each synchronization task can be used to synchronize data from only one table.

For a sharded table, you can use the table parameter to specify the partitions from which you want to read data. Examples:

  • You can configure a range to read data from sharded databases and tables. For example, 'table_[0-99]' specifies that data is read from 'table_0', 'table_1', and 'table_2', up to 'table_99'.

  • If your tables have numeric suffixes of the same length, such as 'table_000', 'table_001', and 'table_002', up to 'table_999', you can configure the parameter as '"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]'.

Note

Doris Reader reads data from the columns that are specified by the column parameter in the partitions that are specified by the table parameter. If a specified partition or column does not exist, the synchronization task fails.

Yes

No default value

column

The columns that you want to synchronize. The columns are described in a JSON array. By default, all columns are synchronized. For example, use ["*"].

  • Column pruning: You can select a subset of columns to export.

  • The column order can be changed. This indicates that you can specify columns in an order different from the order specified by the schema of the source table.

  • Constant configuration: You must follow the Doris SQL syntax. Example: ["id","`table`","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"].

    • id: a column name.

    • table: the name of a column that contains reserved keywords.

    • 1: an integer constant.

    • 'mingya.wmy': a string constant, which is enclosed in single quotation marks (').

    • null:

      • " " indicates an empty string.

      • null indicates a null value.

      • 'null' indicates the string "null".

    • to_char(a+1): a function expression that is used to calculate the length of a string.

    • 2.3: a floating-point constant.

    • true: a Boolean value.

  • The column parameter must explicitly specify all the columns from which you want to read data. This parameter cannot be left empty.

Yes

No default value

splitPk

To improve read performance, you can use the splitPk parameter to specify a shard key. Data Integration uses this key to partition the data and run concurrent tasks.

  • We recommend that you set the splitPk parameter to the name of the primary key column of the table. Data can be evenly distributed to different shards based on the primary key column, instead of being intensively distributed only to specific shards.

  • The splitPk parameter supports sharding for data only of integer data types. If you set the splitPk parameter to a field of an unsupported data type, such as a string, floating point, or date data type, the setting of this parameter is ignored, and a single thread is used to read data.

  • If the splitPk parameter is not provided or is left empty, a single thread is used to read data.

No

No default value

where

The filter condition. In many business scenarios, you may want to synchronize only the data from the current day. To do this, you can specify the where condition as gmt_create > $bizdate.

  • You can use the WHERE clause to read incremental data. If the where parameter is not provided or is left empty, Doris Reader reads all data.

  • Do not set the where parameter to limit 10. This value does not conform to the constraints of Doris on the SQL WHERE clause.

No

No default value

querySql (advanced parameter, which is available only in the code editor)

In some business scenarios, the where parameter may not be sufficient to describe the desired filter conditions. You can use this parameter to specify a custom SQL query for filtering. If you configure this parameter, Data Integration ignores the table, column, where, and splitPk parameters, and uses the custom query to retrieve data. For example, to join multiple tables before synchronization, you can use a query like select a,b from table_a join table_b on table_a.id = table_b.id. The querySql parameter has a higher priority than the table, column, where, and splitPk parameters. The datasource parameter is still required for the system to retrieve connection details such as the username and password.

Note

The name of the querySql parameter is case-sensitive. For example, querysql does not take effect.

No

No default value

Writer script demo

{
  "stepType": "doris",// The plug-in name. 
  "parameter":
  {
    "postSql":// The SQL statement that you want to execute after the synchronization task is run. 
    [],
    "preSql":
    [],// The SQL statement that you want to execute before the synchronization task is run. 
    "datasource":"doris_datasource",// The name of the data source. 
    "table": "doris_table_name",// The name of the table. 
    "column":
    [
      "id",
      "table_id",
      "table_no",
      "table_name",
      "table_status"
    ],
    "loadProps":{
      "column_separator": "\\x01",// The column delimiter of data in the CSV format.
      "line_delimiter": "\\x02"// The row delimiter of data in the CSV format.
    }
  },
  "name": "Writer",
  "category": "writer"
}

Writer script parameters

Parameter

Description

Required

Default value

datasource

The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor.

Yes

No default value

table

The name of the table from which you want to read data.

Yes

No default value

column

The destination columns to write data to. Specify column names in an array, for example, "column":["id","name","age"]. To write data to all columns in order, use an asterisk (*) in the array, for example, "column":["*"].

Yes

No default value

preSql

The SQL statements to execute before the data synchronization task starts. In the codeless UI, you can execute only one SQL statement. In the code editor, you can execute multiple SQL statements. For example, you can use these statements to clear existing data from the table.

No

No default value

postSql

The SQL statement that you want to execute after the synchronization task is run. For example, you can set this parameter to the SQL statement that is used to add a timestamp. You can execute only one SQL statement on the codeless UI and multiple SQL statements in the code editor.

No

No default value

maxBatchRows

The maximum number of rows that you can write to the destination table at a time. Both this parameter and the batchSize parameter determine the number of data records that you can write to the destination table at a time. Each time the cached data reaches the value of either parameter, the writer starts to write the data to the destination table.

No

500000

batchSize

The maximum amount of data that you can write to the destination table at a time. Both this parameter and the maxBatchRows parameter determine the number of data records that you can write to the destination table at a time. Each time the cached data reaches the value of either parameter, the writer starts to write the data to the destination table.

No

104857600

maxRetries

The maximum number of retries allowed after you failed to write multiple data records to the destination table at a time.

No

3

labelPrefix

The label prefix for each uploaded file batch. The final label is a combination of labelPrefix + UUID to ensure that each import is idempotent, preventing data duplication.

No

datax_doris_writer_

loadProps

The request parameters for StreamLoad, mainly used to configure the import data format. By default, data is imported in CSV format. If the loadProps parameter is not configured, the default CSV format is used, with \t as the column delimiter and \n as the row delimiter. The default configuration is as follows:

"loadProps": {
    "format":"csv",
    "column_separator": "\t",
    "line_delimiter": "\n"
}

If you want to write data in the JSON format, use the following settings:

"loadProps": {
    "format": "json"
}

No

No default value

Write data of aggregate types

You can use Doris Writer to write data to columns of specific aggregation types. When you use Doris Writer to write data to columns of specific aggregation types, you must configure the additional parameters.

For example, in the following Doris table, uuid is of the bitmap type (aggregation type) and sex is of the HLL type (aggregation type).

CREATE TABLE `example_table_1` (
  `user_id` int(11) NULL,
  `date` varchar(10) NULL DEFAULT "10.5",
  `city` varchar(10) NULL,
  `uuid` bitmap BITMAP_UNION NULL, -- Aggregation type
  `sex` HLL HLL_UNION  -- Aggregation type
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32

Insert raw data into the table:

user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'

When you use Doris Writer to write data to a column of an aggregation type, you must specify the column in writer.parameter.column and configure an aggregate function in writer.parameter.loadProps.columns. For example, you must use the aggregate function bitmap_hash for the uuid column and use the aggregate function hll_hash for the sex column.

Sample code:

{
    "stepType": "doris",// The plug-in name. 
    "writer":
    {
        "parameter":
        {
            "column":
            [
                "user_id",
                "date",
                "city",
                "uuid",// The aggregation type is bitmap.
                "sex"// The aggregation type is HLL.
            ],
            "loadProps":
            {
                "format": "csv",
                "column_separator": "\\x01",
                "line_delimiter": "\\x02",
                "columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// You must specify the aggregate functions.
            },
            "postSql":
            [
                "select count(1) from example_tbl_3"
            ],
            "preSql":
            [],
            "datasource":"doris_datasource",// The name of the data source. 
                    "table": "doris_table_name",// The name of the table. 
        }
          "name": "Writer",
              "category": "writer"
    }
}