The Doris data source enables you to read data from and write data to Doris databases for large-scale data processing. This topic describes how to use DataWorks to synchronize data with Doris.
Supported data types
Different Doris versions support different data types and aggregation models. For information about all data types supported in each Doris version, see the official Doris documentation. The following table describes the mainly supported data types.
Data type | Supported model | Doris version |
SMALLINT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
INT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
BIGINT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
LARGEINT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
FLOAT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DOUBLE | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DECIMAL | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DECIMALV3 | Aggregate,Unique,Duplicate | Versions later than 1.2.1, 2.x |
DATE | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DATETIME | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
DATEV2 | Aggregate,Unique,Duplicate | 1.2.x, 2.x |
DATATIMEV2 | Aggregate,Unique,Duplicate | 1.2.x, 2.x |
CHAR | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
VARCHAR | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
STRING | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
VARCHAR | Aggregate,Unique,Duplicate | 1.1.x, 1.2.x, 2.x |
ARRAY | Duplicate | 1.2.x, 2.x |
JSONB | Aggregate,Unique,Duplicate | 1.2.x, 2.x |
HLL | Aggregate | 0.x.x, 1.1.x, 1.2.x, 2.x |
BITMAP | Aggregate | 0.x.x, 1.1.x, 1.2.x, 2.x |
QUANTILE_STATE | Aggregate | 1.2.x, 2.x |
Prepare an ApsaraDB for OceanBase environment before data synchronization
Before you use DataWorks to synchronize data to a Doris data source, you must prepare a Doris environment. This ensures that a data synchronization task can be configured and can synchronize data to the Doris data source as expected. The following information describes how to prepare a Doris environment for data synchronization to a Doris data source.
Create an account and grant permissions
You must create an account that is used to log on to the Doris database for subsequent operations. You must specify a password for the account for subsequent connections to the Doris database. If you want to use the default root user of Doris to log on to the Doris database, you must specify a password for the root user. By default, the root user does not have a password. You can execute an SQL statement in Doris to specify the password:
SET PASSWORD FOR 'root' = PASSWORD('Password')Configure the network connection for Doris
To use the StreamLoad method to write data, you need to access the private IP address of an FE node. If you access the public IP address of the FE node, you are redirected to the private IP address of a BE node. For more information about the redirection, see Data operation issues. In this case, you must establish network connections between your data source and a serverless resource group or an exclusive resource group for Data Integration to enable the resource group to access the data source over an internal network. For more information about how to establish a network connection between the Doris database and a resource group, see Network connectivity solutions.
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data source management. You can view parameter descriptions in the DataWorks console to understand the meanings of the parameters when you add a data source.
Take note of the configuration requirements for the following configuration items of the Doris data source:
JdbcUrl: Enter the JDBC connection string, which includes the IP address, port number, database, and connection parameters. Both public and private IP addresses are supported. If you use a public IP address, make sure that the Data Integration resource group can access the host where your Doris instance is located.
FE endpoint: Enter the IP addresses and ports of the FE nodes. If your cluster has multiple FE nodes, you can enter multiple endpoints separated by commas, for example,
ip1:port1,ip2:port2. When you test the connection, DataWorks tests the connectivity to all specified FE endpoints.Username: Enter the username for accessing the Doris database.
Password: Enter the password that corresponds to the username.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix: Code and parameters.
Appendix: Code and parameters
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Use the Code Editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo
{
"type": "job",
"version": "2.0",// The version number.
"steps": [
{
"stepType": "doris",// The plug-in name.
"parameter": {
"column": [// The names of the columns.
"id"
],
"connection": [
{
"querySql": [
"select a,b from join1 c join join2 d on c.id = d.id;"
],
"datasource": ""// The name of the data source.
}
],
"where": "",// The WHERE clause.
"splitPk": "",// The shard key.
"encoding": "UTF-8"// The encoding format.
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"// The maximum number of dirty data records allowed.
},
"speed": {
"throttle": true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent": 1,// The maximum number of parallel threads.
"mbps": "12"// The maximum transmission rate. Unit: MB/s.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Reader script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table from which you want to read data. Each synchronization task can be used to synchronize data from only one table. For a sharded table, you can use the table parameter to specify the partitions from which you want to read data. Examples:
Note Doris Reader reads data from the columns that are specified by the column parameter in the partitions that are specified by the table parameter. If a specified partition or column does not exist, the synchronization task fails. | Yes | No default value |
column | The columns that you want to synchronize. The columns are described in a JSON array. By default, all columns are synchronized. For example, use
| Yes | No default value |
splitPk | To improve read performance, you can use the splitPk parameter to specify a shard key. Data Integration uses this key to partition the data and run concurrent tasks.
| No | No default value |
where | The filter condition. In many business scenarios, you may want to synchronize only the data from the current day. To do this, you can specify the where condition as
| No | No default value |
querySql (advanced parameter, which is available only in the code editor) | In some business scenarios, the where parameter may not be sufficient to describe the desired filter conditions. You can use this parameter to specify a custom SQL query for filtering. If you configure this parameter, Data Integration ignores the table, column, where, and splitPk parameters, and uses the custom query to retrieve data. For example, to join multiple tables before synchronization, you can use a query like Note The name of the querySql parameter is case-sensitive. For example, querysql does not take effect. | No | No default value |
Writer script demo
{
"stepType": "doris",// The plug-in name.
"parameter":
{
"postSql":// The SQL statement that you want to execute after the synchronization task is run.
[],
"preSql":
[],// The SQL statement that you want to execute before the synchronization task is run.
"datasource":"doris_datasource",// The name of the data source.
"table": "doris_table_name",// The name of the table.
"column":
[
"id",
"table_id",
"table_no",
"table_name",
"table_status"
],
"loadProps":{
"column_separator": "\\x01",// The column delimiter of data in the CSV format.
"line_delimiter": "\\x02"// The row delimiter of data in the CSV format.
}
},
"name": "Writer",
"category": "writer"
}Writer script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table from which you want to read data. | Yes | No default value |
column | The destination columns to write data to. Specify column names in an array, for example, | Yes | No default value |
preSql | The SQL statements to execute before the data synchronization task starts. In the codeless UI, you can execute only one SQL statement. In the code editor, you can execute multiple SQL statements. For example, you can use these statements to clear existing data from the table. | No | No default value |
postSql | The SQL statement that you want to execute after the synchronization task is run. For example, you can set this parameter to the SQL statement that is used to add a timestamp. You can execute only one SQL statement on the codeless UI and multiple SQL statements in the code editor. | No | No default value |
maxBatchRows | The maximum number of rows that you can write to the destination table at a time. Both this parameter and the batchSize parameter determine the number of data records that you can write to the destination table at a time. Each time the cached data reaches the value of either parameter, the writer starts to write the data to the destination table. | No | 500000 |
batchSize | The maximum amount of data that you can write to the destination table at a time. Both this parameter and the maxBatchRows parameter determine the number of data records that you can write to the destination table at a time. Each time the cached data reaches the value of either parameter, the writer starts to write the data to the destination table. | No | 104857600 |
maxRetries | The maximum number of retries allowed after you failed to write multiple data records to the destination table at a time. | No | 3 |
labelPrefix | The label prefix for each uploaded file batch. The final label is a combination of | No | datax_doris_writer_ |
loadProps | The request parameters for StreamLoad, mainly used to configure the import data format. By default, data is imported in CSV format. If the loadProps parameter is not configured, the default CSV format is used, with If you want to write data in the JSON format, use the following settings: | No | No default value |
Write data of aggregate types
You can use Doris Writer to write data to columns of specific aggregation types. When you use Doris Writer to write data to columns of specific aggregation types, you must configure the additional parameters.
For example, in the following Doris table, uuid is of the bitmap type (aggregation type) and sex is of the HLL type (aggregation type).
CREATE TABLE `example_table_1` (
`user_id` int(11) NULL,
`date` varchar(10) NULL DEFAULT "10.5",
`city` varchar(10) NULL,
`uuid` bitmap BITMAP_UNION NULL, -- Aggregation type
`sex` HLL HLL_UNION -- Aggregation type
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32Insert raw data into the table:
user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'When you use Doris Writer to write data to a column of an aggregation type, you must specify the column in writer.parameter.column and configure an aggregate function in writer.parameter.loadProps.columns. For example, you must use the aggregate function bitmap_hash for the uuid column and use the aggregate function hll_hash for the sex column.
Sample code:
{
"stepType": "doris",// The plug-in name.
"writer":
{
"parameter":
{
"column":
[
"user_id",
"date",
"city",
"uuid",// The aggregation type is bitmap.
"sex"// The aggregation type is HLL.
],
"loadProps":
{
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02",
"columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// You must specify the aggregate functions.
},
"postSql":
[
"select count(1) from example_tbl_3"
],
"preSql":
[],
"datasource":"doris_datasource",// The name of the data source.
"table": "doris_table_name",// The name of the table.
}
"name": "Writer",
"category": "writer"
}
}