Hologres Writer实现了导入数据至交互式分析(Hologres)的功能,您可以把多种数据源的数据导入Hologres进行实时分析。

注意 Hologres Writer仅支持使用新增和使用独享数据集成资源组,不支持使用使用公共资源组自定义资源组

使用限制

Hologres Writer不支持写入数据至Hologres的外部表。

实现原理

Hologres Writer通过数据同步框架获取Reader生成的协议数据,根据writeMode的配置选择不同的写入方式,并根据conflictMode的配置决定写入数据时的冲突解决策略:
  • writeModeSDK模式时,通过Hologres的Holohub来进行同步导入服务写入数据,为您提供最优的性能。
  • writeModeSQL模式时,通过PSQL的INSERT INTO命令(JDBC方式)写入数据,建议您采用此写入模式同步。
您可以通过配置conflictMode,决定新导入的数据和已有数据的主键发生冲突时,如何处理新导入的数据:
  • conflictModeReplace模式时,新数据覆盖旧数据。
  • conflictModeIgnore模式时,忽略新数据。
在不同的writeMode下,conflictMode的实现方式也不同。writeModeSDK模式时,通过设置Hologres Table属性来改变此次导入的冲突解决模式。
注意 conflictMode仅适用于有主键的表。

参数说明

参数 描述 是否必选 默认值
endpoint 目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以从交互式分析实例的管理页面获取。
endpoint包括公网、经典网络和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:
  • 公网示例:instance-id-region-endpoint.hologres.aliyuncs.com:port
  • 经典网络示例:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port
  • VPC示例:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建议数据集成资源组和Hologres实例在同一个地域的同一个可用区,以确保网络连通,实现最大性能。

accessId 访问Hologres的accessId
accessKey 访问Hologres的accessKey,请确保该密钥对目标表有写入权限。
database Hologres实例内部数据库的名称。
table Hologres的表名称,目前支持表名称中包含Schema,例如schema_name.table_name
writeMode writeMode包括SDKSQL(INSERT INTO),详情请参见实现原理
在脚本模式中,SDK的可选配置如下:
  • maxCommitSize:定义HoloWriter聚合的最大值。该参数可选,默认值为1,048,576 Byte。
  • maxRetryCount:定义HoloWriter通过SDK模式写入的最大重试次数。该参数可选,默认值为500
  • retryInterval:定义HoloWriter通过SDK模式写入出错时的重试间隔,单位为ms。该参数可选,默认值为1,000
conflictMode conflictMode包括ReplaceIgnore,详情请参见实现原理
column 定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"]表示全部列。
partition 针对分区表,表示分区Column以及对应的Value,格式为column=value
说明
  • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。
  • 请确认该参数和表DDL的分区配置匹配。
,表示非分区表

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向数据来源
    参数 描述
    数据源 通常输入您配置的数据源名称。
    即上述参数说明中的table
    写入模式 即上述参数说明中的writeMode
    写入冲突策略 即上述参数说明中的conflictMode
  2. 字段映射,即上述参数说明中的column,左侧的源头表字段和右侧的目标表字段为一一对应关系。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

使用脚本模式开发的详情请参见通过脚本模式配置任务。配置非分区表和分区表的示例如下:
  • 配置非分区表
    • 配置从内存产生的数据导入至Hologres普通表,示例为通过SDK(极速写入)模式导入的配置。
      {
          "type":"job",
          "version":"2.0",//版本号。
          "steps":[
              {
                  "stepType":"stream",
                  "parameter":{},
                  "name":"Reader",
                  "category":"reader"
              },
              {
                  "stepType":"holo",
                  "parameter":{
                    "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
                      "accessId": "<yourAccessKeyId>", //访问Hologres的accessId。
                      "accessKey": "<yourAccessKeySecret>", //访问Hologres的accessKey。
                      "database": "postgres",
                      "table": "<yourTableName>",
                      "writeMode": "sdk",
                      "conflictMode": "replace",
                      "column" : [
                          "tag",
                          "id",
                          "title"
                      ],
                      "maxCommitSize": 1048576,
                      "maxRetryCount": 500
                  },
                  "name":"Writer",
                  "category":"writer"
              }
          ],
          "setting":{
              "errorLimit":{
                  "record":"0"//错误记录数。
              },
              "speed":{
                      "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                      "concurrent":1, //作业并发数。
                      "mbps":"12"//限流
              }
          },
          "order":{
              "hops":[
                  {
                      "from":"Reader",
                      "to":"Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。
      begin;
      drop table if exists test_holowriter_sdk_replace;
      create table test_holowriter_sdk_replace(
        tag text not null, 
        id int not null, 
        body text not null
        primary key (tag, id));
        call set_table_property('test_holowriter_sdk_replace', 'orientation', 'column');
        call set_table_property('test_holowriter_sdk_replace', 'shard_count', '3');
      commit;
  • 配置分区表
    • 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SDK(极速写入)模式导入的配置。
      说明 请注意partition的配置。
      {
          "type":"job",
          "version":"2.0",//版本号。
          "steps":[
              {
                  "stepType":"stream",
                  "parameter":{},
                  "name":"Reader",
                  "category":"reader"
              },
              {
                  "stepType":"holo",
                  "parameter":{
                    "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
                      "accessId": "<yourAccessKeyId>", //访问Hologres的accessId。
                      "accessKey": "<yourAccessKeySecret>", //访问Hologres的accessKey。
                      "database": "postgres",
                      "table": "<yourTableName>",
                      "writeMode": "sdk",
                      "conflictMode": "ignore",
                      "column" : [
                          "*"
                      ],
                      "partition": "tag=foo",
                      "maxCommitSize": 1048576,
                      "maxRetryCount": 500
                  },
                  "name":"Writer",
                  "category":"writer"
              }
          ],
          "setting":{
              "errorLimit":{
                  "record":"0"//错误记录数。
              },
              "speed":{
                  "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "concurrent":1, //作业并发数。
                  "mbps":"12"//限流
              }
          },
          "order":{
              "hops":[
                  {
                      "from":"Reader",
                      "to":"Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。
      begin;
      drop table if exists test_holowriter_part_table_sdk_ignore;
      create table test_holowriter_part_table_sdk_ignore(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id))
        partition by list( tag );
        call set_table_property('test_holowriter_part_table_sdk_ignore', 'orientation', 'column');
        call set_table_property('test_holowriter_part_table_sdk_ignore', 'shard_count', '3');
      commit;