本文为您介绍实时计算如何通过实时数据API实时写入数据至Hologres。

前提条件

说明
  • 请确保开通的实时计算与Hologres地域一致,以免连接失败。
  • Blink 3.6之前的版本未内置Hologres Connector。您可以提交工单获取需要的JAR文件。
  • 当Hologres中接收数据的表已设置主键,默认按照主键更新实时写入的数据。

操作步骤

  1. 创建Hologres表。
    在Hologres中建一张用于接收数据的表。建表SQL语句示例如下。
    create table blink_test(a int primary key, b text);
  2. 创建实时计算作业。
    实时计算通过调用实时数据API接口,实时写入数据至Hologres。操作步骤如下所示:
    1. 获取Endpoint。
      在Hologres商业化版本的实例中执行以下命令获取实时数据API的Endpoint。
      show hg_datahub_endpoints;
    2. 新建作业。
      登录实时计算控制台,在实时计算页面新建作业,SQL语句示例如下。
      create table randomSource (a int, b VARCHAR, c VARCHAR, d DOUBLE, e BIGINT) with (type = 'random');
      
      create table test (
        a int,
        b VARCHAR,
        c VARCHAR,
        PRIMARY KEY (a)
      ) with (
        type = 'custom',
        tableFactoryClass = 'com.alibaba.blink.connectors.hologres.table.factory.HologresTableFactory',
        `endpoint` = '$ip:$port', //Hologres实时数据API的地址以及端口号
        `username` = '当前阿里云账号的Access ID',
        `password` = '当前阿里云账号的Access Key',
        `dbName` = '当前Hologres的数据库名',
        `tableName` = 'blink_test', //当前Hologres接收数据的表名
        `batchSize` = '100'
      );
      
      insert
        into test
      select
        a,b,c
      from
        randomSource;
      858
      注意 Blink 3.6之前的版本未内置Hologres Connector,您需要获取并安装相关的JAR文件,修改Sink表如下参数:
      • 配置type参数为custom
      • 新增参数tableFactoryClass = 'com.alibaba.blink.connectors.hologres.table.factory.HologresTableFactory'
    3. 上线作业。
      完成新建作业后,单击编辑框的语法检查,如果显示成功,则表明语法正确。
      单击保存保存作业,并单击上线,提交作业至生产环境。根据业务情况填写作业上线配置,示例如下:
      • 配置初始资源4
      • 配置资源配置5
    4. 启动作业。
      提交作业至生产环境后,需要手动启动作业。
      单击页面右上方的运维,跳转进入运维界面,选择需要启动的作业,单击右上角的启动6
  3. Hologres实时查询。
    查询Hologres中用于接收数据的表,就可以实时获取到已写入的数据。SQL查询语句示例如下。
    select * from blink_test;
    7

数据类型映射

实时计算数据类型与Hologres数据类型的映射关系请参见数据类型