DataWorks数据集成支持读写Doris。本文为您介绍DataWorks的Doris数据同步能力支持情况。
支持的Doris版本
Doris Writer使用的驱动版本是MySQL Driver 5.1.47,该驱动支持的内核版本如下。驱动能力详情请参见Doris官网文档。
Doris 版本  | 是否支持  | 
0.x.x  | 支持  | 
1.1.x  | 支持  | 
1.2.x  | 支持  | 
2.x  | 支持  | 
使用限制
数据集成仅支持离线写入Doris。
支持的字段类型
不同Doris版本支持不同的数据类型和聚合模型。各版本Doris的全量字段类型请参见Doris的官方文档,下面为您介绍Doris当前主要字段的支持情况。
类型  | 支持模型  | Doris版本  | 离线写入(Doris Writer)  | 
SMALLINT  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
INT  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
BIGINT  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
LARGEINT  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
FLOAT  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
DOUBLE  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
DECIMAL  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
DECIMALV3  | Aggregate,Unique,Duplicate  | 1.2.1+、2.x  | 支持  | 
DATE  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
DATETIME  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
DATEV2  | Aggregate,Unique,Duplicate  | 1.2.x、2.x  | 支持  | 
DATATIMEV2  | Aggregate,Unique,Duplicate  | 1.2.x、2.x  | 支持  | 
CHAR  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
VARCHAR  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
STRING  | Aggregate,Unique,Duplicate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
VARCHAR  | Aggregate,Unique,Duplicate  | 1.1.x、1.2.x、2.x  | 支持  | 
ARRAY  | Duplicate  | 1.2.x、2.x  | 支持  | 
JSONB  | Aggregate,Unique,Duplicate  | 1.2.x、2.x  | 支持  | 
HLL  | Aggregate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
BITMAP  | Aggregate  | 0.x.x、1.1.x、1.2.x、2.x  | 支持  | 
QUANTILE_STATE  | Aggregate  | 1.2.x、2.x  | 支持  | 
实现原理
Doris Writer通过Doris原生支持的StreamLoad方式导入数据,Doris Writer会将Reader端读取到的数据缓存在内存中,并拼接成文本,然后批量导入至Doris数据库。更多详情请参见Doris官方文档。
数据同步前准备
在DataWorks上进行数据同步前,您需要参考本文提前在Doris侧进行数据同步环境准备,以便在DataWorks上进行Doris数据同步任务配置与执行时服务正常。以下为您介绍Doris同步前的相关环境准备。
确认Doris的版本
数据集成对Doris版本有要求,您可以参考上文支持的Doris版本章节,查看当前待同步的Doris是否符合版本要求。您可以在Doris的官方网站下载对应的版本,并且安装。
创建账号,并配置账号权限
您需要规划一个数据仓库的登录账号用于后续操作,同时,您需要为该账号设置密码,以便后续连接到数据仓库。如果您要使用Doris默认的root用户进行登录,那么您需要为root用户设置密码。root用户默认没有密码,您可以在Doris上执行SQL来设置密码:
SET PASSWORD FOR 'root' = PASSWORD('密码')配置Doris的网络连接
使用StreamLoad导入数据,需要访问FE节点的私网地址。如果访问FE的公网地址,则会被重定向到BE节点的内网IP(数据操作问题)。因此,您需要将Doris的网络与数据集成使用的Serverless资源组或独享数据集成资源组打通,使之通过内网地址进行访问。网络打通的具体操作可以参考网络连通方案。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见数据源管理,详细的配置参数解释可在配置界面查看对应参数的文案提示。
下面对Doris数据源的几个配置项进行说明:
JdbcUrl:请填写JDBC连接串,包含IP、端口号、数据库和连接参数。支持公网IP和私网IP,如果使用公网IP,请确保数据集成资源组能够正常访问Doris所在的主机。
FE endpoint:请填写FE节点的IP和端口。如果您的集群中有多个FE节点,可以配置多个FE节点的IP和端口,每个IP和端口以逗号分隔,例如
ip1:port1,ip2:port2。在测试连通性时,会对所有的FE endpoint做连通性测试。用户名:请填写Doris数据库的用户名。
密码:请填写Doris数据库对应用户的密码。
数据同步任务开发
数据同步任务的配置入口和通用配置流程可参见下文的配置指导。
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
附录:脚本Demo与参数说明
离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见脚本模式配置,以下为您介绍脚本模式下数据源的参数配置详情。
Reader脚本Demo
{
  "type": "job",
  "version": "2.0",//版本号。
  "steps": [
    {
      "stepType": "doris",//插件名。
      "parameter": {
        "column": [//列名。
          "id"
        ],
        "connection": [
          {
            "querySql": [
              "select a,b from join1 c join join2 d on c.id = d.id;"
            ],
            "datasource": ""//数据源名称。
          }
        ],
        "where": "",//过滤条件。
        "splitPk": "",//切分键。
        "encoding": "UTF-8"//编码格式。
      },
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {
      "record": "0"//错误记录数。
    },
    "speed": {
      "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
      "concurrent": 1,//作业并发数。
      "mbps": "12"//限流,此处1mbps = 1MB/s。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}Reader脚本参数
脚本参数名  | 描述  | 是否必选  | 默认值  | 
datasource  | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。  | 是  | 无  | 
table  | 选取的需要同步的表名称。一个数据集成任务只能从一张表中读取数据。 table用于配置范围的高级用法示例如下: 
 说明  任务会读取匹配到的所有表,具体读取这些表中column配置项指定的列。如果表不存在,或者读取的列不存在,会导致任务失败。  | 是  | 无  | 
column  | 所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息 。默认使用所有列配置,例如 
  | 是  | 无  | 
splitFactor  | 切分因子,可以配置同步数据的切分份数,如果配置了多并发,会按照并发数 * splitFactor份来切分。例如,并发数=5,splitFactor=5,则会按照5*5=25份来切分,在5个并发线程上执行。 说明  建议取值范围:1~100,过大会导致内存溢出。  | 否  | 5  | 
splitPk  | Doris Reader进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效率。 
  | 否  | 无  | 
where  | 筛选条件,在实际业务场景中,往往会选择当天的数据进行同步,将where条件指定为 
  | 否  | 无  | 
querySql(高级模式,向导模式不支持此参数的配置)  | 在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置项来自定义筛选SQL。配置该项后,数据同步系统会忽略tables、columns和splitPk配置项,直接使用该项配置的内容对数据进行筛选。例如,需要进行多表join后同步数据,使用 说明  querySql需要区分大小写,例如,写为querysql会不生效。  | 否  | 无  | 
Writer脚本Demo
{
  "stepType": "doris",//插件名。
  "parameter":
  {
    "postSql"://执行数据同步任务之后率先执行的SQL语句。
    [],
    "preSql":
    [],//执行数据同步任务之前率先执行的SQL语句。
    "datasource":"doris_datasource",//数据源名。
    "table": "doris_table_name",//表名。
    "column":
    [
      "id",
      "table_id",
      "table_no",
      "table_name",
      "table_status"
    ],
    "loadProps":{
      "column_separator": "\\x01",//指定CSV格式的列分隔符
      "line_delimiter": "\\x02"//指定CSV格式的行分隔符
    }
  },
  "name": "Writer",
  "category": "writer"
}Writer脚本参数
参数  | 描述  | 是否必选  | 默认值  | 
datasource  | 数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。  | 是  | 无  | 
table  | 选取的需要同步的表名称。  | 是  | 无  | 
column  | 目标表需要写入数据的字段,字段之间用英文逗号分隔。例如  | 是  | 无  | 
preSql  | 执行数据同步任务之前,需率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如,执行前清空表中的旧数据。  | 否  | 无  | 
postSql  | 执行数据同步任务之后执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如,加上某个时间戳。  | 否  | 无  | 
maxBatchRows  | 每批次导入数据的最大行数,与batchSize共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。  | 否  | 500000  | 
batchSize  | 每批次导入数据的最大数据量,与maxBatchRows共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。  | 否  | 104857600  | 
maxRetries  | 每批次导入数据失败后的重试次数。  | 否  | 3  | 
labelPrefix  | 每批次上传文件的 label 前缀。最终的label将有  | 否  | datax_doris_writer_  | 
loadProps  | StreamLoad的请求参数,主要用于配置导入的数据格式。默认以CSV格式导入。如果loadProps没有配置,则采用默认的CSV格式,以 如果您需要指定为JSON格式导入,则配置如下所示。  | 否  | 无  | 
聚合类型脚本(Doris Writer写入聚合类型)
Doris Writer支持写入聚合类型,但是需要在脚本模式中做额外的配置。如下所示。
例如,对于如下一张Doris的表,其中,uuid是bitmap类型(聚合类型)、sex是HLL类型(聚合类型)。
CREATE TABLE `example_table_1` (
  `user_id` int(11) NULL,
  `date` varchar(10) NULL DEFAULT "10.5",
  `city` varchar(10) NULL,
  `uuid` bitmap BITMAP_UNION NULL, -- 聚合类型
  `sex` HLL HLL_UNION  -- 聚合类型
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32往表中插入原始数据:
user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'通过Doris Writer写入聚合类型时,不仅需要在writer.parameter.column中指定该列,还需要在writer.parameter.loadProps.columns中配置聚合函数。例如,为uuid使用聚合函数bitmap_hash,为sex使用聚合函数hll_hash。
脚本模式示例:
{
    "stepType": "doris",//插件名。
    "writer":
    {
        "parameter":
        {
            "column":
            [
                "user_id",
                "date",
                "city",
                "uuid",// 聚合类型bitmap
                "sex"// 聚合类型HLL
            ],
            "loadProps":
            {
                "format": "csv",
                "column_separator": "\\x01",
                "line_delimiter": "\\x02",
                "columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// 需要指定聚合函数
            },
            "postSql":
            [
                "select count(1) from example_tbl_3"
            ],
            "preSql":
            [],
            "datasource":"doris_datasource",//数据源名。
                    "table": "doris_table_name",//表名。
        }
          "name": "Writer",
              "category": "writer"
    }
}