Hologres Writer实现了导入数据至实时数仓Hologres的功能,您可以把多种数据源的数据导入Hologres进行实时分析。

使用限制

实现原理

Hologres Writer通过数据同步框架获取Reader生成的协议数据,根据writeMode的配置选择不同的写入方式,并根据conflictMode的配置决定写入数据时的冲突解决策略:
  • (推荐使用)writeModeSQL模式时,通过PSQL的INSERT INTO命令(JDBC方式)写入数据,能提供更好的写入性能,建议您采用此写入模式同步数据。
  • writeModeSDK模式时,通过Hologres的写入接口来进行同步数据,后续该模式将逐步不再使用,建议使用SQL模式写入数据。
您可以通过配置conflictMode,决定新导入的数据和已有数据的主键发生冲突时,如何处理新导入的数据:
  • conflictModeReplace模式时,新数据覆盖旧数据。
  • conflictModeIgnore模式时,忽略新数据。
在不同的writeMode下,conflictMode的实现方式也不同。writeModeSDK 模式时,通过设置Hologres Table属性来改变此次导入的冲突解决模式。
注意 conflictMode仅适用于有主键的表。

参数说明

参数 描述 是否必选 默认值
endpoint 目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以从交互式分析实例的管理页面获取。
endpoint包括公网、经典网络和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:
  • 公网示例:instance-id-region-endpoint.hologres.aliyuncs.com:port
  • 经典网络示例:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port
  • VPC示例:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建议数据集成资源组和Hologres实例在同一个地域的同一个可用区,以确保网络连通,实现最大性能。

accessId 访问Hologres的accessId
accessKey 访问Hologres的accessKey,请确保该密钥对目标表有写入权限。
database Hologres实例内部数据库的名称。
table Hologres的表名称,目前支持表名称中包含Schema,例如schema_name.table_name
writeMode writeMode包括SDKSQL(INSERT INTO),详情请参见实现原理
在脚本模式中,SDK的可选配置如下:
  • maxCommitSize:定义HoloWriter聚合的最大值。该参数可选,默认值为1,048,576 Byte。
  • maxRetryCount:定义HoloWriter通过SDK模式写入的最大重试次数。该参数可选,默认值为500
  • retryInterval:定义HoloWriter通过SDK模式写入出错时的重试间隔,单位为ms。该参数可选,默认值为1,000
conflictMode conflictMode包括ReplaceIgnore,详情请参见实现原理
column 定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"]表示全部列。
partition 针对分区表,表示分区Column以及对应的Value,格式为column=value
说明
  • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。
  • 请确认该参数和表DDL的分区配置匹配。
,表示非分区表
truncate 写入Holo表之前是否需要清空目标表。
说明 仅脚本模式支持配置该参数。配置方式请参见脚本开发介绍
  • true:清空目标表。
    说明 目前仅支持清空非分区表和静态分区表。不支持清空动态分区表,如果您是动态分区表,并且设置了参数值为true,同步任务将会异常退出。
  • false:不清空目标表。
false

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向数据来源
    参数 描述
    数据源 通常输入您配置的数据源名称。第一次数据同步需要先创建Hologres数据源,详情请参见配置Hologres数据源
    需要同步的Hologres目标表,可以通过一键生成目标表自动创建,也可以提前在Hologres中创建。即上述参数说明中的table
    说明
    • 一键生成目标表创建的仅是基础表,请根据实际业务修改建表语句并添加相应的索引,以获得更好的性能,详情请参见CREATE TABLE
    • 支持自动路由分区,若是通过一键生成目标表创建表,需要将表创建为分区表,以实现分区自动路由的功能,若是无对应分区子表则会自动创建,详情请参见CREATE PARTITION TABLE
    分区信息 Hologres的分区键。
    写入模式 即上述参数说明中的writeMode
    • (推荐)SQL(INSERT INTO),默认使用SQL模式,性能好。
    • SDK,后续逐步不再使用,不推荐使用此模式。
    写入冲突策略 即上述参数说明中的conflictMode
    最大连接数 JDBC使用的最大连接数,仅在SQL模式下使用,在开启任务时请确保实例有充足的空闲连接。一个任务最多使用9个连接,即一个任务使用9个连接。
  2. 字段映射,即上述参数说明中的column,左侧的源头表字段和右侧的目标表字段为一一对应关系。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组概述新增和使用独享数据集成资源组

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置离线同步任务。配置非分区表和分区表的示例如下:
  • 配置非分区表
    • 配置从内存产生的数据导入至Hologres普通表,示例为通过JDBC模式导入的配置。
      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "mysql",
                  "parameter": {
                      "envType": 0,
                          "datasource": "<mysql_source_name>",
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "connection": [
                          {
                              "datasource": "<mysql_source_name>",//mysql数据源名
                              "table": [
                                  "<mysql_table_name>"
                              ]
                          }
                      ],
                      "where": "",
                      "splitPk": "",
                      "encoding": "UTF-8"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "holo",
                  "parameter": {
                      "maxConnectionCount": 9,
                      "datasource": "<holo_sink_name>",//Hologres数据源名称
                      "truncate":true,//清理规则。
                      "conflictMode": "ignore",
                      "envType": 0,
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "writeMode": "insert",
                      "table": "<holo_table_name>"
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "setting": {
              "executeMode": null,
              "errorLimit": {
                  "record": ""
              },
              "speed": {
                  "concurrent": 2,//作业并发数
                  "throttle": false//限流
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。
      begin;
      drop table if exists mysql_to_holo_test;
      create table mysql_to_holo_test(
        tag text not null,
        id int not null,
        body text not null,
        brrth date,
        primary key (tag, id));
        call set_table_property('mysql_to_holo_test', 'orientation', 'column');
        call set_table_property('mysql_to_holo_test', 'distribution_key', 'id');
        call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth');
      commit;
  • 配置分区表
    说明
    • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。
    • 请确认该参数和表DDL的分区配置匹配。
    • 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SQL模式导入的配置。
      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "mysql",
            "parameter": {
              "envType": 0,
              "datasource": "<mysql_source_name>",
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "connection": [
                {
                  "datasource": "<mysql_source_name>",
                  "table": [
                    "<mysql_table_name>"
                  ]
                }
              ],
              "where": "",
              "splitPk": "<mysql_pk>",//mysql的pk字段
              "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "holo",
            "parameter": {
              "maxConnectionCount": 9,
              "partition": "<partition_key>",//Hologres分区键
              "datasource": "<holo_sink_name>",//Hologres数据源名
              "conflictMode": "ignore",
              "envType": 0,
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "writeMode": "insert",
              "table": "<holo_table_name>"
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "setting": {
          "executeMode": null,
          "errorLimit": {
            "record": ""
          },
          "speed": {
            "concurrent": 2,//作业并发数
            "throttle": false//限流
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
    • Hologres表的DDL语句,如下所示。
      BEGIN;
      CREATE TABLE public.hologres_parent_table(
        a text ,
        b int,
        c timestamp,
        d text,
        ds text,
        primary key(ds,b)
        )
        PARTITION BY LIST(ds);
      CALL set_table_property('public.hologres_parent_table', 'orientation', 'column');
      CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215');
      CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216');
      CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217');
      COMMIT;