Hologres数据源

Hologres数据源为您提供读取和写入Hologres双向通道的功能,本文为您介绍DataWorks的Hologres数据同步的能力支持情况。

支持的版本

Hologres支持的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。

使用限制

离线读写

  • Hologres数据源支持使用Serverless资源组(推荐)独享数据集成资源组

  • Hologres Writer不支持写入数据至Hologres的外部表。

  • Hologres数据源连通性获取Hologres端点的逻辑:

    • 当前地域的Hologres实例,Hologres端点获取顺序:any Tunnel > single Tunnel > Public(公网)

    • 跨地域的Hologres实例,Hologres端点获取顺序:Public(公网) > single Tunnel

整库实时写

支持的字段类型

字段类型

离线读(Hologres Reader)

离线写(Hologres Writer)

实时写

UUID

不支持

不支持

不支持

CHAR

支持

支持

支持

NCHAR

支持

支持

支持

VARCHAR

支持

支持

支持

LONGVARCHAR

支持

支持

支持

NVARCHAR

支持

支持

支持

LONGNVARCHAR

支持

支持

支持

CLOB

支持

支持

支持

NCLOB

支持

支持

支持

SMALLINT

支持

支持

支持

TINYINT

支持

支持

支持

INTEGER

支持

支持

支持

BIGINT

支持

支持

支持

NUMERIC

支持

支持

支持

DECIMAL

支持

支持

支持

FLOAT

支持

支持

支持

REAL

支持

支持

支持

DOUBLE

支持

支持

支持

TIME

支持

支持

支持

DATE

支持

支持

支持

TIMESTAMP

支持

支持

支持

BINARY

支持

支持

支持

VARBINARY

支持

支持

支持

BLOB

支持

支持

支持

LONGVARBINARY

支持

支持

支持

BOOLEAN

支持

支持

支持

BIT

支持

支持

支持

JSON

支持

支持

支持

JSONB

支持

支持

支持

实现原理

离线读写

Hologres Reader通过PSQL读取Hologres表中的数据,根据表的Shard Count发起多个并发,每个Shard对应一个Select并发任务:

  • Hologres在创建表时,在同一个CREATE TABLE事务中,通过CALL set_table_property('table_name', 'shard_count', 'xx')配置表的Shard Count。

    默认情况下,使用数据库默认的Shard Count,具体数值取决于Hologres实例的配置。

  • Select语句通过表的内置列hg_shard_id的Shard筛选数据。

离线写

Hologres Writer通过数据同步框架获取Reader生成的协议数据,根据conflictMode(冲突策略)的配置决定写入数据时的冲突解决策略。

您可以通过配置conflictMode,决定新导入的数据和已有数据的主键发生冲突时,如何处理新导入的数据:

重要

conflictMode仅适用于有主键的表。具体写入原理和性能,详情请参考技术原理

  • conflictModeReplace(整行更新)模式时,新数据覆盖旧数据,整行所有列全部覆盖,没有配置列映射的字段会强制写NULL。

  • conflictModeUpdate(更新)模式时,新数据覆盖旧数据,只覆盖配置有列映射的字段。

  • conflictModeIgnore(忽略)模式时,忽略新数据。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建并管理数据源详细的配置参数解释可在配置界面查看对应参数的文案提示

数据同步任务开发

数据同步任务的配置入口和通用配置流程可参见下文的配置指导。

单表离线同步任务配置指导

单表、整库实时同步任务配置指导

操作流程请参见DataStudio侧实时同步任务配置

单表、整库全增量实时写任务配置指导

操作流程请参见数据集成侧同步任务配置

附录:脚本Demo与参数说明

离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要按照统一的脚本格式要求,在任务脚本中编写相应的参数,详情请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下数据源的参数配置详情。

Reader脚本Demo

  • 配置非分区表

    • 配置从Hologres非分区表读取数据至内存,如下所示。

      {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
          {
            "stepType":"holo",//插件名。
            "parameter":{
              "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
              "accessId": "***************", //访问Hologres的accessId。
              "accessKey": "*******************", //访问Hologres的accessKey。
              "database": "postgres",
              "table": "holo_reader_****",
              "column" : [ //字段。
                "tag",
                "id",
                "title"
              ]
            },
            "name":"Reader",
            "category":"reader"
          },
          {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
          }
        ],
        "setting":{
          "errorLimit":{
            "record":"0"//错误记录数。
          },
          "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1,//作业并发数。 
                        "mbps":"12"//限流,此处1mbps = 1MB/s。
          }
        },
        "order":{
          "hops":[
            {
              "from":"Reader",
              "to":"Writer"
            }
          ]
        }
      }
    • Hologres表的DDL语句,如下所示。

      begin;
      drop table if exists holo_reader_basic_src;
      create table holo_reader_basic_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id));
        call set_table_property('holo_reader_basic_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_src', 'shard_count', '3');
      commit;
  • 配置分区表

    • 配置从内存产生的数据同步至Hologres分区表的子表。

      说明

      请注意partition的配置。

      {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
          {
            "stepType":"holo",//插件名。
            "parameter":{
              "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
              "accessId": "***************", //访问Hologres的accessId。
              "accessKey": "*******************", //访问Hologres的accessKey。
              "database": "postgres",
              "table": "holo_reader_basic_****",
              "partition": "tag=foo",
              "column" : [
                "*"
              ],
              "fetchSize": "100"
            },
            "name":"Reader",
            "category":"reader"
          },
          {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
          }
        ],
        "setting":{
          "errorLimit":{
            "record":"0"//错误记录数。
          },
          "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1,//作业并发数。
                        "mbps":"12"//限流,此处1mbps = 1MB/s。
          }
        },
        "order":{
          "hops":[
            {
              "from":"Reader",
              "to":"Writer"
            }
          ]
        }
      }
    • Hologres表的DDL语句,如下所示。

      begin;
      drop table if exists holo_reader_basic_part_src;
      create table holo_reader_basic_part_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id))
        partition by list( tag );
        call set_table_property('holo_reader_basic_part_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_part_src', 'shard_count', '3');
      commit;
      
      create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo');
      
      # 确保分区表子表已经创建且导入数据。
      postgres=# \d+ holo_reader_basic_part_src
                               Table "public.holo_reader_basic_part_src"
       Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
      --------+---------+-----------+----------+---------+----------+--------------+-------------
       tag    | text    |           | not null |         | extended |              | 
       id     | integer |           | not null |         | plain    |              | 
       title  | text    |           | not null |         | extended |              | 
       body   | text    |           |          |         | extended |              | 
      Partition key: LIST (tag)
      Indexes:
          "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id)
      Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')

Reader脚本参数

参数

描述

是否必选

默认值

endpoint

目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以从交互式分析实例的管理页面获取。

endpoint包括经典网络、公网和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:

  • 经典网络示例:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port

  • 公网示例:instance-id-region-endpoint.hologres.aliyuncs.com:port

  • VPC示例:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建议数据集成资源组和Hologres实例配在同一个地域的同一个可用区,以保证网络端口连通,实现最大性能。

accessId

访问Hologres的accessId

accessKey

访问Hologres的accessKey,请确保该密钥对目标表有写入权限。

database

Hologres实例内部数据库的名称。

table

Hologres的表名称,如果是分区表,请指定父表的名称。

column

定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"]表示全部列。

partition

针对分区表,表示分区Column以及对应的Value,格式为column=value

重要
  • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。

  • 请确认该参数和表DDL的分区配置匹配。

  • 请确认对应的子表已经创建,且已经导入数据。

,表示非分区表。

fetchSize

指定使用Select语句一次性读取数据的条数。

1,000

Writer脚本Demo

  • 配置非分区表

    • 配置从内存产生的数据导入至Hologres普通表,示例为通过JDBC模式导入的配置。

      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "mysql",
                  "parameter": {
                      "envType": 0,
                          "datasource": "<mysql_source_name>",
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "connection": [
                          {
                              "datasource": "<mysql_source_name>",//mysql数据源名
                              "table": [
                                  "<mysql_table_name>"
                              ]
                          }
                      ],
                      "where": "",
                      "splitPk": "",
                      "encoding": "UTF-8"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "holo",
                  "parameter": {
                      "maxConnectionCount": 9,
                      "datasource": "<holo_sink_name>",//Hologres数据源名称
                      "truncate":true,//清理规则。
                      "conflictMode": "ignore",
                      "envType": 0,
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "table": "<holo_table_name>"
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "setting": {
              "executeMode": null,
              "errorLimit": {
                  "record": ""
              },
              "speed": {
                  "concurrent": 2,//作业并发数
                  "throttle": false//限流
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。

      begin;
      drop table if exists mysql_to_holo_test;
      create table mysql_to_holo_test(
        tag text not null,
        id int not null,
        body text not null,
        brrth date,
        primary key (tag, id));
        call set_table_property('mysql_to_holo_test', 'orientation', 'column');
        call set_table_property('mysql_to_holo_test', 'distribution_key', 'id');
        call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth');
      commit;
  • 配置分区表

    说明
    • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。

    • 请确认该参数和表DDL的分区配置匹配。

    • 配置从内存产生的数据同步至Hologres分区表的子表。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "mysql",
            "parameter": {
              "envType": 0,
              "datasource": "<mysql_source_name>",
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "connection": [
                {
                  "datasource": "<mysql_source_name>",
                  "table": [
                    "<mysql_table_name>"
                  ]
                }
              ],
              "where": "",
              "splitPk": "<mysql_pk>",//mysql的pk字段
              "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "holo",
            "parameter": {
              "maxConnectionCount": 9,
              "partition": "<partition_key>",//Hologres分区键
              "datasource": "<holo_sink_name>",//Hologres数据源名
              "conflictMode": "ignore",
              "envType": 0,
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "table": "<holo_table_name>"
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "setting": {
          "executeMode": null,
          "errorLimit": {
            "record": ""
          },
          "speed": {
            "concurrent": 2,//作业并发数
            "throttle": false//限流
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
    • Hologres表的DDL语句,如下所示。

      BEGIN;
      CREATE TABLE public.hologres_parent_table(
        a text ,
        b int,
        c timestamp,
        d text,
        ds text,
        primary key(ds,b)
        )
        PARTITION BY LIST(ds);
      CALL set_table_property('public.hologres_parent_table', 'orientation', 'column');
      CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215');
      CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216');
      CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217');
      COMMIT;

Writer脚本参数

参数

描述

是否必选

默认值

endpoint

目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以从交互式分析实例的管理页面获取。

endpoint包括公网、经典网络和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:

  • 公网示例:instance-id-region-endpoint.hologres.aliyuncs.com:port

  • 经典网络示例:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port

  • VPC示例:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建议数据集成资源组和Hologres实例在同一个地域的同一个可用区,以确保网络连通,实现最大性能。

accessId

访问Hologres的accessId

accessKey

访问Hologres的accessKey,请确保该密钥对目标表有写入权限。

database

Hologres实例内部数据库的名称。

table

Hologres的表名称,目前支持表名称中包含Schema,例如schema_name.table_name

conflictMode

conflictMode包括ReplaceUpdateIgnore,详情请参见实现原理

column

定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"]表示全部列。

partition

针对分区表,表示分区Column以及对应的Value,格式为column=value

说明
  • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。

  • 请确认该参数和表DDL的分区配置匹配。

,表示非分区表

truncate

写入Holo表之前是否需要清空目标表。

  • true:清空目标表。

    说明
    • 目前仅支持清空非分区表和静态分区表,不支持清空动态分区表,如果您是动态分区表,并且设置了参数值为true,同步任务将会异常退出。

    • 如果您是静态分区表,并设置了参数值为true,则会清空该分区子表数据,不会清空父表数据。

  • false:不清空目标表。

false