如果您通过DataStream的方式读写Hologres数据,则需要使用Hologres DataStream Connector连接Flink全托管。本文为您介绍如何在Flink全托管控制台上使用Hologres DataStream Connector来读写Hologres数据。
背景信息
Hologres DataStream Connector的连接配置与对应Hologres SQL Connector配置完全相同,SQL配置详情请参见Hologres源表WITH参数、Hologres结果表WITH参数或Hologres维表WITH参数。
Maven中央库中已经放置了VVR Connector,以供您在作业开发时直接使用。您可以通过以下任何一种方式来使用DataStream Connector:
(推荐)直接将Connector作为项目依赖打进作业JAR包
- 在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
- 检查您的settings.xml配置文件中是否存在
<mirrorOf>*</mirrorOf>
配置。如果存在
<mirrorOf>*</mirrorOf>
配置,则需要将此配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。修改的目的是为了避免SNAPSHOT仓库被覆盖,因为mirrorOf中只使用星号(*)会导致第一步中配置的两个repository被覆盖。
- 在作业的Maven POM文件中添加您需要的Connector作为项目依赖。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-hologres</artifactId> <version>${connector.version}</version> </dependency>
每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。Connector版本、VVR/Flink版本和Connector类型的对应关系请参见Connector列表。完整的依赖信息请参见Hologres示例代码中的pom.xml文件。注意- 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本,在Maven中央库(search.maven.org)上会查找不到。
- 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
- 修改Hologres连接配置和Schema信息。
类别 说明 Hologres连接配置 替换成您Hologres的连接配置信息。与对应SQL配置完全相同,详情请参见Hologres源表WITH参数、Hologres结果表WITH参数或Hologres维表WITH参数。 Schema信息 表的模式信息。表的类型映射与SQL中表的映射关系完全一致,详情请参见Hologres源表类型映射、Hologres结果表类型映射或Hologres维表类型映射。 - Hologres源表
VVR提供了RichInputFormat的实现类HologresBulkreadInputFormat来读取Hologres表数据。以下为构建Hologres Source读取表数据的示例。
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。 TableSchema schema = TableSchema.builder() .field("a", DataTypes.INT()) .build(); // Hologres的相关参数,具体参数含义请参见SQL档。 Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "yourEndpoint"); config.setString(HologresConfigs.USERNAME, "yourUserName"); config.setString(HologresConfigs.PASSWORD, "yourPassword"); config.setString(HologresConfigs.DATABASE, "yourDatabaseName"); config.setString(HologresConfigs.TABLE, "yourTableName"); // 构建JDBC Options。 JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); String query = JDBCUtils.getSimpleSelectFromStatement( jdbcOptions.getTable(), schema.getFieldNames()); // 构建HologresBulkreadInputFormat读取表数据。 HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType()); env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo) .print(); env.execute();
- Hologres Binlog源表
VVR提供了Source的实现类HologresBinlogSource来读取Hologres Binlog数据。以下为构建Hologres Binlog Source的示例。
// 初始化读取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定义部分字段。 TableSchema schema = TableSchema.builder() .field("a", DataTypes.INT()) .build(); // Hologres的相关参数,具体参数含义请参见SQL文档。 Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "yourEndpoint"); config.setString(HologresConfigs.USERNAME, "yourUserName"); config.setString(HologresConfigs.PASSWORD, "yourPassword"); config.setString(HologresConfigs.DATABASE, "yourDatabaseName"); config.setString(HologresConfigs.TABLE, "yourTableName"); config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true); // 构建JDBC Options。 JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config); jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions)); RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions); // 构建Hologres Binlog Source。 long startTimeMs = 0; HologresBinlogSource<RowData> source = new HologresBinlogSource<>( schema, config, jdbcOptions, recordConverter, startTimeMs); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print(); env.execute();
说明- 方法buildRecordConverter不在VVR Connector依赖中,是示例代码中提供的方法。
- Hologres Binlog注意事项和实现原理等详情,请参见Flink实时消费Hologres。
- Hologres结果表
VVR提供了OutputFormatSinkFunction的实现类HologresSinkFunction来写入数据。以下为构建Hologres Sink的示例。
// 初始化读取的表的Schema。 TableSchema schema = TableSchema.builder() .field("a", DataTypes.INT()) .field("b", DataTypes.STRING()) .build(); // Hologres的相关参数,具体参数含义请参见SQL文档。 Configuration config = new Configuration(); config.setString(HologresConfigs.ENDPOINT, "yourEndpoint"); config.setString(HologresConfigs.USERNAME, "yourUserName"); config.setString(HologresConfigs.PASSWORD, "yourPassword"); config.setString(HologresConfigs.DATABASE, "yourDatabaseName"); config.setString(HologresConfigs.TABLE, "yourTableName"); config.setBoolean(HologresConfigs.USE_RPC_MODE, true); HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config); // 构建Hologres Writer,以RowData的方式写入数据。 AbstractHologresWriter<RowData> hologresWriter = buildHologresWriter(schema, config, hologresConnectionParam); // 构建Hologres Sink。 HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType()); int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE); env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo) .addSink(sinkFunction); env.execute();
说明 方法buildHologresWriter不在VVR Connector依赖中,是示例代码中提供的方法。
- Hologres源表
上传Connector JAR包到Flink全托管开发控制台后,填写配置信息
- 登录实时计算管理控制台。
- 在Flink全托管页签,单击目标工作空间操作列下的控制台。
- 在左侧导航栏,单击资源上传。
- 单击上传资源,选择您要上传的目标Connector的JAR包。
您可以上传您自己开发的Connector,也可以上传Flink全托管产品提供的Connector。Flink全托管产品提供的Connector官方JAR包的下载地址,请参见Connector列表。
- 在目标作业开发页面附加依赖文件项,选择目标Connector的JAR包。