本文为您介绍如何调试和运行使用Hologres连接器的DataStream作业。
本地调试需要正确下载额外JAR包并配置依赖,详情请参见本地运行和调试包含连接器的作业。
基于DataStream的Hologres Source开发指南
您可以创建如下所示的DataStream API程序来使用Hologres Source。具体的操作步骤如下:
创建执行环境。
您可以使用StreamExecutionEnvironment的getExecutionEnvironment方法自动发现可用的Flink执行环境。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
配置基础参数。
您需要创建一个Flink Configuration类的实例来携带相关参数,详情请参见参数配置(VVR 11及以上版本);参数名称为全大写字母、并使用下划线分割。必须提供的参数如下所示。
Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "Hologres Endpoint"); config.setString(HologresConfigs.USERNAME, "Hologres 用户名"); config.setString(HologresConfigs.PASSWORD, "Hologres 密码"); config.setString(HologresConfigs.DATABASE, "Hologres 数据库名称"); config.setString(HologresConfigs.TABLE, "Hologres 表名");
此外,您还可以自定义Hologres Source在CDC模式下的行为。如果您希望启用全增量一体模式消费Hologres表的数据,那么应该将
BINLOG
选项设置为true
。此外,您还可以使用BINLOG_CHANGE_LOG_MODE
选项配置输出的Changelog模式:// 此选项配置为 false 时,不会消费 Binlog 增量数据。 config.set(HologresConfigs.BINLOG, true); // 此选项配置为 ALL 时,会下发完整的、包含 +I、-U、+U、-D 的变更数据。 config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL); // 或者,此选项配置为 UPSERT 时,会下发仅包含 +I、+U、-D 的变更数据。 config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.UPSERT); // 或者,此选项配置为 ALL_AS_APPEND_ONLY 时,变更数据会作为 +I 记录下发。 config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL_AS_APPEND_ONLY);
最后,请将Configuration实例转换为HologresConnectionParam:
HologresConnectionParam param = new HologresConnectionParam(config);
构造Hologres Source Schema。使用TableSchema提供的Builder来快速构建表结构描述对象。
TableSchema schema = TableSchema.builder() .field("id", DataTypes.BIGINT()) .field("name", DataTypes.STRING()) .build();
构造HologresBinlogSource。
HologresBinlogSource source = new HologresBinlogSource( param, schema, config, StartupMode.INITIAL, // 可以调整这个枚举值来控制启动位点 "Hologres 表名", "", // 谓词下推,如果只需扫描部分数据可在此传递过滤条件 -1, // 最多读取的数据条数。设置为 -1 代表不限制 Collections.emptySet(), // 在这个集合里的值会被视为 NULL 处理 Collections.emptyList() // 需要携带的元数据列 );
构造完成后,您就可以在DataStream算子链中使用该Source了。
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Holo source").print(); env.execute();
基于DataStream的Hologres Lookup Source开发指南
您也可以将任意Hologres表作为维表来使用。具体的操作步骤如下:
创建执行环境,并配置基础参数。
定义维表Schema。
TableSchema dimSchema = TableSchema.builder() .field("id", DataTypes.INT().notNull()) .field("number", DataTypes.BIGINT()) .primaryKey("id") .build();
构造Hologres Lookup Function。
HologresTableSchema tableSchema = HologresTableSchema.get(hologresConnectionParam); // 此选项可配置为 NONE、LRU 或 ALL。 CacheConfig cacheConfig = CacheConfig.createCacheConfig(config, "NONE"); // 设定查询关联字段列表。 String[] lookupKeys = hologresTableSchema.get().getPrimaryKeys(); // 创建 Hologres Reader。 AbstractHologresReader<RowData> hologresReader = HologresJDBCReader.createTableReader( param, dimSchema, lookupKeys, tableSchema); // 可以根据需要使用 HologresAsyncLookupFunction 或者 HologresLookupFunction。 var lookupFunction = new HologresLookupFunction( "dim_table", schema, lookupKeys, cacheConfig.getCacheStrategy(), hologresReader, true);
构造完成后,您就可以在DataStream算子链中使用该Lookup Source了。
基于 DataStream 的 Hologres Sink 开发指南
创建执行环境,并配置基础参数。
定义结果表的Schema。
TableSchema sinkSchema = TableSchema.builder() .field("id", DataTypes.INT().notNull()) .field("number", DataTypes.BIGINT()) .primaryKey("id") .build();
构造Hologres Sink Writer。
// 创建 Hologres Writer。 AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter( param, schema, HologresTableSchema.get(param), new Integer[0]);
构造Hologres Sink,即可在DataStream作业中作为Sink使用了。
HologresSinkFunction sinkFunction = new HologresSinkFunction(param, hologresWriter);
env.from(...) .addSink(sinkFunction); env.execute();