Hologres Reader实现了从交互式分析(Hologres) 数仓导出数据的功能,您可以根据数据集成标准协议从Hologres表中导出数据至其它数据源。

背景信息

注意
Hologres Reader通过PSQL读取Hologres表中的数据,根据表的Shard Count发起多个并发,每个Shard对应一个Select并发任务:
  • Hologres在创建表时,在同一个CREATE TABLE事务中,通过CALL set_table_property('table_name', 'shard_count', 'xx')配置表的Shard Count。

    默认情况下,使用数据库默认的Shard Count,具体数值取决于Hologres实例的配置。

  • Select语句通过表的内置列hg_shard_id的Shard筛选数据。

参数说明

参数 描述 是否必选 默认值
endpoint 目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以从交互式分析实例的管理页面获取。
endpoint包括经典网络、公网和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:
  • 经典网络示例:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port
  • 公网示例:instance-id-region-endpoint.hologres.aliyuncs.com:port
  • VPC示例:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建议数据集成资源组和Hologres实例配在同一个地域的同一个可用区,以保证网络端口连通,实现最大性能。

accessId 访问Hologres的accessId
accessKey 访问Hologres的accessKey,请确保该密钥对目标表有读取权限。
database Hologres实例内部数据库的名称。
table Hologres的表名称,如果是分区表,请指定父表的名称。
说明 当前插件不支持view。
column 定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"]表示全部列。
partition 针对分区表,表示分区Column以及对应的Value,格式为column=value
注意
  • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。
  • 请确认该参数和表DDL的分区配置匹配。
  • 请确认对应的子表已经创建,且已经导入数据。
,表示非分区表。
fetchSize 指定使用Select语句一次性读取数据的条数。 1,000

向导开发介绍

  1. 选择数据源。
    配置同步任务的数据来源数据去向数据源
    参数 描述
    数据源 通常输入您配置的数据源名称。
    即上述参数说明中的table
    数据过滤 您将要同步数据的筛选条件,SQL语法与选择的数据源一致,请勿填写where关键字。
  2. 字段映射,即上述参数说明中的column
    左侧的源头表字段和右侧的目标表字段为一一对应关系。单击添加一行可以增加单个字段,鼠标放至需要删除的字段上,即可单击删除图标进行删除 。字段映射
    参数 描述
    同名映射 单击同名映射,可以根据名称建立相应的映射关系,请注意匹配数据类型。
    同行映射 单击同行映射,可以在同行建立相应的映射关系,请注意匹配数据类型。
    取消映射 单击取消映射,可以取消建立的映射关系。
    自动排版 可以根据相应的规律自动排版。
    手动编辑源表字段 请手动编辑字段,一行表示一个字段,首尾空行会被采用,其他空行会被忽略。
    添加一行
    • 可以输入常量,输入的值需要使用英文单引号,例如'abc’'123’等。
    • 可以配合调度参数使用,例如${bizdate}等。
    • 可以输入关系数据库支持的函数,例如now()count(1)等。
    • 如果您输入的值无法解析,则类型显示为未识别。
  3. 通道控制。通道配置
    参数 描述
    任务期望最大并发数 数据同步任务内,可以从源并行读取或并行写入数据存储端的最大线程数。向导模式通过界面化配置并发数,指定任务所使用的并行度。
    同步速率 设置同步速率可以保护读取端数据库,以避免抽取速度过大,给源库造成太大的压力。同步速率建议限流,结合源库的配置,请合理配置抽取速率。
    错误记录数 错误记录数,表示脏数据的最大容忍条数。
    分布式处理能力

    数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置,详情请参见独享数据集成资源组新增和使用独享数据集成资源组

脚本开发介绍

  • 配置非分区表
    • 配置从Hologres非分区表读取数据至内存,如下所示。
      {
          "type":"job",
          "version":"2.0",//版本号。
          "steps":[
              {
                  "stepType":"holo",//插件名。
                  "parameter":{
                      "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
                      "accessId": "***************", //访问Hologres的accessId。
                      "accessKey": "*******************", //访问Hologres的accessKey。
                      "database": "postgres",
                      "table": "holo_reader_****",
                      "column" : [ //字段。
                          "tag",
                          "id",
                          "title"
                      ]
                  },
                  "name":"Reader",
                  "category":"reader"
              },
              {
                  "stepType":"stream",
                  "parameter":{},
                  "name":"Writer",
                  "category":"writer"
              }
          ],
          "setting":{
              "errorLimit":{
                  "record":"0"//错误记录数。
              },
              "speed":{
                  "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "concurrent":1,//作业并发数。 
                        "mbps":"12"//限流
              }
          },
          "order":{
              "hops":[
                  {
                      "from":"Reader",
                      "to":"Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。
      begin;
      drop table if exists holo_reader_basic_src;
      create table holo_reader_basic_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id));
        call set_table_property('holo_reader_basic_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_src', 'shard_count', '3');
      commit;
  • 配置分区表
    • 配置从内存产生的数据同步至Hologres分区表的子表,示例为通过SDK模式导入的配置。
      说明 请注意partition的配置。
      {
          "type":"job",
          "version":"2.0",//版本号。
          "steps":[
              {
                  "stepType":"holo",//插件名。
                  "parameter":{
                      "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
                      "accessId": "***************", //访问Hologres的accessId。
                      "accessKey": "*******************", //访问Hologres的accessKey。
                      "database": "postgres",
                      "table": "holo_reader_basic_****",
                      "partition": "tag=foo",
                      "column" : [
                          "*"
                      ],
                      "fetchSize": "100"
                  },
                  "name":"Reader",
                  "category":"reader"
              },
              {
                  "stepType":"stream",
                  "parameter":{},
                  "name":"Writer",
                  "category":"writer"
              }
          ],
          "setting":{
              "errorLimit":{
                  "record":"0"//错误记录数。
              },
              "speed":{
                  "throttle":true,//当throttle值为flase时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
                  "concurrent":1,//作业并发数。
                        "mbps":"12"//限流
              }
          },
          "order":{
              "hops":[
                  {
                      "from":"Reader",
                      "to":"Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。
      begin;
      drop table if exists holo_reader_basic_part_src;
      create table holo_reader_basic_part_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id))
        partition by list( tag );
        call set_table_property('holo_reader_basic_part_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_part_src', 'shard_count', '3');
      commit;
      
      create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo');
      
      # 确保分区表子表已经创建且导入数据。
      postgres=# \d+ holo_reader_basic_part_src
                               Table "public.holo_reader_basic_part_src"
       Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
      --------+---------+-----------+----------+---------+----------+--------------+-------------
       tag    | text    |           | not null |         | extended |              | 
       id     | integer |           | not null |         | plain    |              | 
       title  | text    |           | not null |         | extended |              | 
       body   | text    |           |          |         | extended |              | 
      Partition key: LIST (tag)
      Indexes:
          "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id)
      Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')