Hologres连接器DataStream调试

本文为您介绍如何调试和运行使用Hologres连接器的DataStream作业。

重要

本地调试需要正确下载额外JAR包并配置依赖,详情请参见本地运行和调试包含连接器的作业

基于DataStreamHologres Source开发指南

您可以创建如下所示的DataStream API程序来使用Hologres Source。具体的操作步骤如下:

  1. 创建执行环境。

    您可以使用StreamExecutionEnvironmentgetExecutionEnvironment方法自动发现可用的Flink执行环境。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. 配置基础参数。

    您需要创建一个Flink Configuration类的实例来携带相关参数,详情请参见参数配置(VVR 11及以上版本);参数名称为全大写字母、并使用下划线分割。必须提供的参数如下所示。

    Configuration config = new Configuration();
    config.setString(HologresConfigs.ENDPOINT, "Hologres Endpoint");
    config.setString(HologresConfigs.USERNAME, "Hologres 用户名");
    config.setString(HologresConfigs.PASSWORD, "Hologres 密码");
    config.setString(HologresConfigs.DATABASE, "Hologres 数据库名称");
    config.setString(HologresConfigs.TABLE, "Hologres 表名");

    此外,您还可以自定义Hologres SourceCDC模式下的行为。如果您希望启用全增量一体模式消费Hologres表的数据,那么应该将BINLOG选项设置为true。此外,您还可以使用BINLOG_CHANGE_LOG_MODE选项配置输出的Changelog模式:

    // 此选项配置为 false 时,不会消费 Binlog 增量数据。
    config.set(HologresConfigs.BINLOG, true);
    
    // 此选项配置为 ALL 时,会下发完整的、包含 +I、-U、+U、-D 的变更数据。
    config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL);
    // 或者,此选项配置为 UPSERT 时,会下发仅包含 +I、+U、-D 的变更数据。
    config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.UPSERT);
    // 或者,此选项配置为 ALL_AS_APPEND_ONLY 时,变更数据会作为 +I 记录下发。
    config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL_AS_APPEND_ONLY);

    最后,请将Configuration实例转换为HologresConnectionParam:

    HologresConnectionParam param = new HologresConnectionParam(config);
  3. 构造Hologres Source Schema。使用TableSchema提供的Builder来快速构建表结构描述对象。

    TableSchema schema = TableSchema.builder()
                             .field("id", DataTypes.BIGINT())
                             .field("name", DataTypes.STRING())
                             .build();
  4. 构造HologresBinlogSource。

    HologresBinlogSource source =
        new HologresBinlogSource(
            param,
            schema,
            config,
            StartupMode.INITIAL, // 可以调整这个枚举值来控制启动位点
            "Hologres 表名",
            "", // 谓词下推,如果只需扫描部分数据可在此传递过滤条件
            -1, // 最多读取的数据条数。设置为 -1 代表不限制
            Collections.emptySet(), // 在这个集合里的值会被视为 NULL 处理
            Collections.emptyList() // 需要携带的元数据列
        );
  5. 构造完成后,您就可以在DataStream算子链中使用该Source了。

    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Holo source").print();
    env.execute();

基于DataStreamHologres Lookup Source开发指南

您也可以将任意Hologres表作为维表来使用。具体的操作步骤如下:

  1. 创建执行环境,并配置基础参数。

    详情请参见基于DataStreamHologres Source开发指南

  2. 定义维表Schema。

    TableSchema dimSchema = TableSchema.builder()
                                .field("id", DataTypes.INT().notNull())
                                .field("number", DataTypes.BIGINT())
                                .primaryKey("id")
                                .build();
  3. 构造Hologres Lookup Function。

    HologresTableSchema tableSchema = HologresTableSchema.get(hologresConnectionParam);
    
    // 此选项可配置为 NONE、LRU 或 ALL。
    CacheConfig cacheConfig = CacheConfig.createCacheConfig(config, "NONE");
    
    // 设定查询关联字段列表。
    String[] lookupKeys = hologresTableSchema.get().getPrimaryKeys();
    
    // 创建 Hologres Reader。
    AbstractHologresReader<RowData> hologresReader =
            HologresJDBCReader.createTableReader(
                    param, dimSchema, lookupKeys, tableSchema);
    
    // 可以根据需要使用 HologresAsyncLookupFunction 或者 HologresLookupFunction。
    var lookupFunction = new HologresLookupFunction(
            "dim_table",
            schema,
            lookupKeys,
            cacheConfig.getCacheStrategy(),
            hologresReader,
            true);
  4. 构造完成后,您就可以在DataStream算子链中使用该Lookup Source了。

基于 DataStream 的 Hologres Sink 开发指南

  1. 创建执行环境,并配置基础参数。

    详情请参见基于DataStreamHologres Source开发指南

  2. 定义结果表的Schema。

    TableSchema sinkSchema = TableSchema.builder()
                                .field("id", DataTypes.INT().notNull())
                                .field("number", DataTypes.BIGINT())
                                .primaryKey("id")
                                .build();
  3. 构造Hologres Sink Writer。

    // 创建 Hologres Writer。
    AbstractHologresWriter<RowData> hologresWriter =
        HologresJDBCWriter.createRowDataWriter(
                param,
                schema,
                HologresTableSchema.get(param),
                new Integer[0]);
  4. 构造Hologres Sink,即可在DataStream作业中作为Sink使用了。

    HologresSinkFunction sinkFunction =
        new HologresSinkFunction(param, hologresWriter);
    env.from(...)
       .addSink(sinkFunction);
       
    env.execute();