Hologres Writer实现了导入数据至实时数仓Hologres的功能,您可以把多种数据源的数据导入Hologres进行实时分析。

使用限制

  • Hologres Writer不支持写入数据至Hologres的外部表。
  • Hologres Writer仅支持使用独享数据集成资源组,不支持使用其他类型的数据集成资源组。

支持的Hologres版本

Hologres支持的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。

支持的字段类型

不支持UUID类型的字段。
字段类型离线读(Hologres Reader)离线写(Hologres Writer)实时写
UUID不支持不支持不支持
CHAR 支持支持支持
NCHAR 支持支持支持
VARCHAR 支持支持支持
LONGVARCHAR 支持支持支持
NVARCHAR 支持支持支持
LONGNVARCHAR 支持支持支持
CLOB支持支持支持
NCLOB 支持支持支持
SMALLINT 支持支持支持
TINYINT支持支持支持
INTEGER 支持支持支持
BIGINT 支持支持支持
NUMERIC 支持支持支持
DECIMAL 支持支持支持
FLOAT 支持支持支持
REAL 支持支持支持
DOUBLE 支持支持支持
TIME 支持支持支持
DATE 支持支持支持
TIMESTAMP 支持支持支持
BINARY 支持支持支持
VARBINARY 支持支持支持
BLOB 支持支持支持
LONGVARBINARY 支持支持支持
BOOLEAN 支持支持支持
BIT 支持支持支持
JSON支持支持支持
JSONB支持支持支持

实现原理

Hologres Writer通过数据同步框架获取Reader生成的协议数据,根据writeMode(写入模式)的配置选择不同的写入方式,并根据conflictMode(冲突策略)的配置决定写入数据时的冲突解决策略:
  • (推荐使用)writeModeSQL模式时,通过PSQL的INSERT INTO命令(JDBC方式)写入数据,能提供更好的写入性能,建议您采用此写入模式同步数据。
  • writeModeSDK模式时,通过Hologres的写入接口来进行同步数据,后续该模式将逐步不再使用,建议使用SQL模式写入数据。
您可以通过配置conflictMode,决定新导入的数据和已有数据的主键发生冲突时,如何处理新导入的数据:
重要 conflictMode仅适用于有主键的表。具体写入原理和性能,详情请参考技术原理
  • conflictModeReplace(整行更新)模式时,新数据覆盖旧数据,整行所有列全部覆盖,没有配置列映射的字段会强制写NULL。
  • conflictModeUpdate(更新)模式时,新数据覆盖旧数据,只覆盖配置有列映射的字段。
  • conflictModeIgnore(忽略)模式时,忽略新数据。

在不同的writeMode下,conflictMode的实现方式也不同。writeModeSDK 模式时,通过设置Hologres Table属性来改变此次导入的冲突解决模式。

参数说明

参数描述是否必选默认值
endpoint目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以从交互式分析实例的管理页面获取。
endpoint包括公网、经典网络和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:
  • 公网示例:instance-id-region-endpoint.hologres.aliyuncs.com:port
  • 经典网络示例:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port
  • VPC示例:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建议数据集成资源组和Hologres实例在同一个地域的同一个可用区,以确保网络连通,实现最大性能。

accessId访问Hologres的accessId
accessKey访问Hologres的accessKey,请确保该密钥对目标表有写入权限。
databaseHologres实例内部数据库的名称。
tableHologres的表名称,目前支持表名称中包含Schema,例如schema_name.table_name
writeModewriteMode包括SDKSQL(INSERT INTO),详情请参见实现原理
在脚本模式中,SDK的可选配置如下:
  • maxCommitSize:定义HoloWriter聚合的最大值。该参数可选,默认值为1,048,576 Byte。
  • maxRetryCount:定义HoloWriter通过SDK模式写入的最大重试次数。该参数可选,默认值为500
  • retryInterval:定义HoloWriter通过SDK模式写入出错时的重试间隔,单位为ms。该参数可选,默认值为1,000
conflictModeconflictMode包括ReplaceUpdateIgnore,详情请参见实现原理
column定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"]表示全部列。
partition针对分区表,表示分区Column以及对应的Value,格式为column=value
说明
  • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。
  • 请确认该参数和表DDL的分区配置匹配。
,表示非分区表
truncate写入Holo表之前是否需要清空目标表。
  • true:清空目标表。
    说明
    • 目前仅支持清空非分区表和静态分区表,不支持清空动态分区表,如果您是动态分区表,并且设置了参数值为true,同步任务将会异常退出。
    • 如果您是静态分区表,并设置了参数值为true,则会清空该分区子表数据,不会清空父表数据。
  • false:不清空目标表。
false

向导开发介绍

  1. 选择数据源。

    配置同步任务的数据来源数据去向,同步数据至Hologres时,数据去向配置参数如下。

    参数描述
    数据源通常输入您配置的数据源名称。第一次数据同步需要先创建Hologres数据源,详情请参见配置Hologres数据源
    需要同步的Hologres目标表,可以通过一键生成目标表自动创建,也可以提前在Hologres中创建。即上述参数说明中的table
    说明
    • 一键生成目标表创建的仅是基础表,请根据实际业务修改建表语句并添加相应的索引,以获得更好的性能,详情请参见建表概述
    • 支持自动路由分区,若是通过一键生成目标表创建表,需要将表创建为分区表,以实现分区自动路由的功能,若是无对应分区子表则会自动创建,详情请参见CREATE PARTITION TABLE
    分区信息Hologres的分区键。
    写入模式即上述参数说明中的writeMode
    • (推荐)SQL(INSERT INTO),默认使用SQL模式,性能好。
    • SDK,后续逐步不再使用,不推荐使用此模式。
    写入冲突策略即上述参数说明中的conflictMode
    同步前是否要清空Hologres表即上述参数说明中的truncate
    最大连接数JDBC使用的最大连接数,仅在SQL模式下使用,在开启任务时请确保实例有充足的空闲连接。一个任务最多使用9个连接,即一个任务使用9个连接。
  2. 字段映射,即上述参数说明中的column,左侧的源头表字段和右侧的目标表字段为一一对应关系。字段映射
    参数描述
    同名映射单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射单击取消映射,可以取消建立的映射关系。
    自动排版可以根据相应的规律自动排版。
  3. 通道控制。通道配置
    参数描述
    任务期望最大并发数数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务。配置非分区表和分区表的示例如下:
  • 配置非分区表
    • 配置从内存产生的数据导入至Hologres普通表,示例为通过JDBC模式导入的配置。
      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "mysql",
                  "parameter": {
                      "envType": 0,
                          "datasource": "<mysql_source_name>",
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "connection": [
                          {
                              "datasource": "<mysql_source_name>",//mysql数据源名
                              "table": [
                                  "<mysql_table_name>"
                              ]
                          }
                      ],
                      "where": "",
                      "splitPk": "",
                      "encoding": "UTF-8"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "holo",
                  "parameter": {
                      "maxConnectionCount": 9,
                      "datasource": "<holo_sink_name>",//Hologres数据源名称
                      "truncate":true,//清理规则。
                      "conflictMode": "ignore",
                      "envType": 0,
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "writeMode": "insert",
                      "table": "<holo_table_name>"
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "setting": {
              "executeMode": null,
              "errorLimit": {
                  "record": ""
              },
              "speed": {
                  "concurrent": 2,//作业并发数
                  "throttle": false//限流
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。
      begin;
      drop table if exists mysql_to_holo_test;
      create table mysql_to_holo_test(
        tag text not null,
        id int not null,
        body text not null,
        brrth date,
        primary key (tag, id));
        call set_table_property('mysql_to_holo_test', 'orientation', 'column');
        call set_table_property('mysql_to_holo_test', 'distribution_key', 'id');
        call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth');
      commit;
  • 配置分区表
    说明
    • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。
    • 请确认该参数和表DDL的分区配置匹配。
    • 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SQL模式导入的配置。
      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "mysql",
            "parameter": {
              "envType": 0,
              "datasource": "<mysql_source_name>",
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "connection": [
                {
                  "datasource": "<mysql_source_name>",
                  "table": [
                    "<mysql_table_name>"
                  ]
                }
              ],
              "where": "",
              "splitPk": "<mysql_pk>",//mysql的pk字段
              "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "holo",
            "parameter": {
              "maxConnectionCount": 9,
              "partition": "<partition_key>",//Hologres分区键
              "datasource": "<holo_sink_name>",//Hologres数据源名
              "conflictMode": "ignore",
              "envType": 0,
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "writeMode": "insert",
              "table": "<holo_table_name>"
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "setting": {
          "executeMode": null,
          "errorLimit": {
            "record": ""
          },
          "speed": {
            "concurrent": 2,//作业并发数
            "throttle": false//限流
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
    • Hologres表的DDL语句,如下所示。
      BEGIN;
      CREATE TABLE public.hologres_parent_table(
        a text ,
        b int,
        c timestamp,
        d text,
        ds text,
        primary key(ds,b)
        )
        PARTITION BY LIST(ds);
      CALL set_table_property('public.hologres_parent_table', 'orientation', 'column');
      CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215');
      CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216');
      CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217');
      COMMIT;