Doris数据源

DataWorks数据集成支持使用Doris Writer导入表数据至Doris。本文为您介绍DataWorks的Doris数据同步能力支持情况。

支持的Doris版本

Doris Writer使用的驱动版本是MySQL Driver 5.1.47,该驱动支持的内核版本如下。驱动能力详情请参见Doris官网文档

Doris 版本

是否支持

0.x.x

支持

1.1.x

支持

1.2.x

支持

2.x

支持

使用限制

数据集成仅支持离线写入Doris。

支持的字段类型

不同Doris版本支持不同的数据类型和聚合模型。各版本Doris的全量字段类型请参见Doris的官方文档,下面为您介绍Doris当前主要字段的支持情况。

类型

支持模型

Doris版本

离线写入(Doris Writer)

SMALLINT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

INT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

BIGINT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

LARGEINT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

FLOAT

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DOUBLE

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DECIMAL

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DECIMALV3

Aggregate,Unique,Duplicate

1.2.1+、2.x

支持

DATE

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DATETIME

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

DATEV2

Aggregate,Unique,Duplicate

1.2.x、2.x

支持

DATATIMEV2

Aggregate,Unique,Duplicate

1.2.x、2.x

支持

CHAR

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

VARCHAR

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

STRING

Aggregate,Unique,Duplicate

0.x.x、1.1.x、1.2.x、2.x

支持

VARCHAR

Aggregate,Unique,Duplicate

1.1.x、1.2.x、2.x

支持

ARRAY

Duplicate

1.2.x、2.x

支持

JSONB

Aggregate,Unique,Duplicate

1.2.x、2.x

支持

HLL

Aggregate

0.x.x、1.1.x、1.2.x、2.x

支持

BITMAP

Aggregate

0.x.x、1.1.x、1.2.x、2.x

支持

QUANTILE_STATE

Aggregate

1.2.x、2.x

支持

实现原理

Doris Writer通过Doris原生支持的StreamLoad方式导入数据,Doris Writer会将Reader端读取到的数据缓存在内存中,并拼接成文本,然后批量导入至Doris数据库。更多详情请参见Doris官方文档

数据同步前准备

在DataWorks上进行数据同步前,您需要参考本文提前在Doris侧进行数据同步环境准备,以便在DataWorks上进行Doris数据同步任务配置与执行时服务正常。以下为您介绍Doris同步前的相关环境准备。

准备工作1:确认Doris的版本

数据集成对Doris版本有要求,您可以参考上文支持的Doris版本章节,查看当前待同步的Doris是否符合版本要求。您可以在Doris的官方网站下载对应的版本,并且安装。

准备工作2:创建账号,并配置账号权限

您需要规划一个数据仓库的登录账号用于后续操作,同时,您需要为该账号设置密码,以便后续连接到数据仓库。如果您要使用Doris默认的root用户进行登录,那么您需要为root用户设置密码。root用户默认没有密码,您可以在Doris上执行SQL来设置密码:

SET PASSWORD FOR 'root' = PASSWORD('密码')

准备工作3:配置Doris的网络连接

使用StreamLoad导入数据,需要访问FE节点的私网地址。如果访问FE的公网地址,则会被重定向到BE节点的内网IP(数据操作问题)。因此,您需要将Doris的网络与数据集成使用的Serverless资源组或独享数据集成资源组打通,使之通过内网地址进行访问。网络打通的具体操作可以参考网络连通方案

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

下面对Doris数据源的几个配置项进行说明:

  • JdbcUrl:请填写JDBC连接串,包含IP、端口号、数据库和连接参数。支持公网IP和私网IP,如果使用公网IP,请确保数据集成资源组能够正常访问Doris所在的主机。

  • FE endpoint:请填写FE节点的IP和端口。如果您的集群中有多个FE节点,可以配置多个FE节点的IP和端口,每个IP和端口以逗号分隔,例如ip1:port1,ip2:port2。在测试连通性时,会对所有的FE endpoint做连通性测试。

  • 用户名:请填写Doris数据库的用户名。

  • 密码:请填写Doris数据库对应用户的密码。

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

{
  "type": "job",
  "version": "2.0",//版本号。
  "steps": [
    {
      "stepType": "doris",//插件名。
      "parameter": {
        "column": [//列名。
          "id"
        ],
        "connection": [
          {
            "querySql": [
              "select a,b from join1 c join join2 d on c.id = d.id;"
            ],
            "datasource": ""//数据源名称。
          }
        ],
        "where": "",//过滤条件。
        "splitPk": "",//切分键。
        "encoding": "UTF-8"//编码格式。
      },
      "name": "Reader",
      "category": "reader"
    },
    {
      "stepType": "stream",
      "parameter": {},
      "name": "Writer",
      "category": "writer"
    }
  ],
  "setting": {
    "errorLimit": {
      "record": "0"//错误记录数。
    },
    "speed": {
      "throttle": true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
      "concurrent": 1,//作业并发数。
      "mbps": "12"//限流,此处1mbps = 1MB/s。
    }
  },
  "order": {
    "hops": [
      {
        "from": "Reader",
        "to": "Writer"
      }
    ]
  }
}

Reader脚本参数

脚本参数名

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

table

选取的需要同步的表名称。一个数据集成任务只能从一张表中读取数据。

table用于配置范围的高级用法示例如下:

  • 您可以通过配置区间读取分库分表,例如'table_[0-99]'表示读取'table_0''table_1''table_2'直到'table_99'

  • 如果您的表数字后缀的长度一致,例如'table_000''table_001''table_002'直到'table_999',您可以配置为'"table": ["table_00[0-9]", "table_0[10-99]", "table_[100-999]"]'

说明

任务会读取匹配到的所有表,具体读取这些表中column配置项指定的列。如果表不存在,或者读取的列不存在,会导致任务失败。

column

所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息 。默认使用所有列配置,例如[ * ]

  • 支持列裁剪:列可以挑选部分列进行导出。

  • 支持列换序:列可以不按照表schema信息顺序进行导出。

  • 支持常量配置:您需要按照Doris SQL语法格式,例如["id","table","1","'mingya.wmy'","'null'","to_char(a+1)","2.3","true"]

    • id为普通列名。

    • table为包含保留字的列名。

    • 1为整型数字常量。

    • 'mingya.wmy'为字符串常量(注意需要加上一对单引号)。

    • 关于null

      • " "表示空。

      • null表示null。

      • 'null'表示null这个字符串。

    • to_char(a+1)为计算字符串长度函数。

    • 2.3为浮点数。

    • true为布尔值。

  • column必须显示指定同步的列集合,不允许为空。

splitPk

Doris Reader进行数据抽取时,如果指定splitPk,表示您希望使用splitPk代表的字段进行数据分片,数据同步因此会启动并发任务进行数据同步,提高数据同步的效能。

  • 推荐splitPk用户使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。

  • 目前splitPk仅支持整型数据切分,不支持字符串、浮点和日期等其他类型 。如果您指定其他非支持类型,忽略splitPk功能,使用单通道进行同步。

  • 如果不填写splitPk,包括不提供splitPk或者splitPk值为空,数据同步视作使用单通道同步该表数据 。

where

筛选条件,在实际业务场景中,往往会选择当天的数据进行同步,将where条件指定为gmt_create>$bizdate

  • where条件可以有效地进行业务增量同步。如果不填写where语句,包括不提供where的key或value,数据同步均视作同步全量数据。

  • 不可以将where条件指定为limit 10,这不符合Doris SQL WHERE子句约束。

querySql(高级模式,向导模式不支持此参数的配置)

在部分业务场景中,where配置项不足以描述所筛选的条件,您可以通过该配置项来自定义筛选SQL。配置该项后,数据同步系统会忽略tables、columns和splitPk配置项,直接使用该项配置的内容对数据进行筛选。例如,需要进行多表join后同步数据,使用select a,b from table_a join table_b on table_a.id = table_b.id。当您配置querySql时,Doris Reader直接忽略table、column、where和splitPk条件的配置,querySql优先级大于tablecolumnwheresplitPk选项。datasource通过它解析出用户名和密码等信息。

说明

querySql需要区分大小写,例如,写为querysql会不生效。

Writer脚本Demo

{
  "stepType": "doris",//插件名。
  "parameter":
  {
    "postSql"://执行数据同步任务之后率先执行的SQL语句。
    [],
    "preSql":
    [],//执行数据同步任务之前率先执行的SQL语句。
    "datasource":"doris_datasource",//数据源名。
    "table": "doris_table_name",//表名。
    "column":
    [
      "id",
      "table_id",
      "table_no",
      "table_name",
      "table_status"
    ],
    "loadProps":{
      "column_separator": "\\x01",//指定CSV格式的列分隔符
      "line_delimiter": "\\x02"//指定CSV格式的行分隔符
    }
  },
  "name": "Writer",
  "category": "writer"
}

Writer脚本参数

参数

描述

是否必选

默认值

datasource

数据源名称,脚本模式支持添加数据源,此配置项填写的内容必须与添加的数据源名称保持一致。

table

选取的需要同步的表名称。

column

目标表需要写入数据的字段,字段之间用英文逗号分隔。例如"column":["id","name","age"]。如果要依次写入全部列,使用(*)表示,例如"column":["*"]

preSql

执行数据同步任务之前率先执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如,执行前清空表中的旧数据。

postSql

执行数据同步任务之后执行的SQL语句。目前向导模式仅允许执行一条SQL语句,脚本模式可以支持多条SQL语句,例如,加上某个时间戳。

maxBatchRows

每批次导入数据的最大行数,与batchSize共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。

500000

batchSize

每批次导入数据的最大数据量,与maxBatchRows共同控制每批次的导入数量。每批次数据达到两个阈值之一,即开始导入这一批次的数据。

104857600

maxRetries

每批次导入数据失败后的重试次数。

3

labelPrefix

每批次上传文件的 label 前缀。最终的label将有labelPrefix + UUID组成全局唯一的label,确保数据不会重复导入。

datax_doris_writer_

loadProps

StreamLoad的请求参数,主要用于配置导入的数据格式。默认以CSV格式导入。如果loadProps没有配置,则采用默认的CSV格式,以\t为列分隔符,\n为行分隔符,配置如下所示。

"loadProps": {
    "format":"csv",
    "column_separator": "\t",
    "line_delimiter": "\n"
}

如果您需要指定为JSON格式导入,则配置如下所示。

"loadProps": {
    "format": "json"
}

聚合类型脚本(Doris Writer写入聚合类型)

Doris Writer支持写入聚合类型,但是需要在脚本模式中做额外的配置。如下所示。

例如,对于如下一张Doris的表,其中,uuidbitmap类型(聚合类型)、sexHLL类型(聚合类型)。

CREATE TABLE `example_table_1` (
  `user_id` int(11) NULL,
  `date` varchar(10) NULL DEFAULT "10.5",
  `city` varchar(10) NULL,
  `uuid` bitmap BITMAP_UNION NULL, -- 聚合类型
  `sex` HLL HLL_UNION  -- 聚合类型
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32

往表中插入原始数据:

user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'

通过Doris Writer写入聚合类型时,不仅需要在writer.parameter.column中指定该列,还需要在writer.parameter.loadProps.columns中配置聚合函数。例如,为uuid使用聚合函数bitmap_hash,为sex使用聚合函数hll_hash

脚本模式示例:

{
    "stepType": "doris",//插件名。
    "writer":
    {
        "parameter":
        {
            "column":
            [
                "user_id",
                "date",
                "city",
                "uuid",// 聚合类型bitmap
                "sex"// 聚合类型HLL
            ],
            "loadProps":
            {
                "format": "csv",
                "column_separator": "\\x01",
                "line_delimiter": "\\x02",
                "columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// 需要指定聚合函数
            },
            "postSql":
            [
                "select count(1) from example_tbl_3"
            ],
            "preSql":
            [],
            "datasource":"doris_datasource",//数据源名。
                    "table": "doris_table_name",//表名。
        }
          "name": "Writer",
              "category": "writer"
    }
}