本文为您介绍如何使用DataStream连接器,即通过DataStream的方式读写数据。
Datastream连接器的依赖和使用
如果您通过DataStream的方式读写数据,则需要使用对应的DataStream连接器连接实时计算Flink版。Maven中央库中已经放置了VVR DataStream连接器,以供您在作业开发时直接使用。
请使用我们在支持的连接器中指明提供DataStream API的连接器。如果某个连接器未注明提供DataStream API,请勿自行使用,因为未来接口和参数可能会被修改。
DataStream连接器均添加了商业化加密保护,直接运行会报错。如需本地调试和运行,请参见本地运行和调试包含连接器的作业。
您可以选择以下任意一种方式来使用连接器:
(推荐)上传连接器JAR包作为附加依赖文件引入
在作业的Maven POM文件中添加您需要的连接器作为项目依赖,其作用域为provided。
说明${vvr.version}
是作业运行环境引擎版本,如您的作业运行在vvr-8.0.9-flink-1.17
版本引擎上,其对应的Flink版本为1.17.2
。建议您使用最新的引擎,具体版本详见引擎版本。由于将连接器的JAR包作为附加依赖文件引入,则无需将该依赖打入JAR包中,所以需要声明作用域为
provided
。
<!-- MySQL 连接器依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> <scope>provided</scope> </dependency>
如果您有开发新连接器或者拓展现有连接器功能的需求,项目还需要依赖连接器公共包
flink-connector-base
或ververica-connector-common
。<!-- Flink 连接器公共接口基础依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 阿里云连接器公共接口基础依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
部署JAR作业并在附加依赖文件项中添加相应的连接器JAR包。您可以上传您自己开发的连接器,也可以上传实时计算Flink版提供的连接器。
直接将连接器作为项目依赖打进作业JAR包
在作业的Maven POM文件中添加您需要的连接器作为项目依赖。例如引入Kafka连接器和MySQL连接器。
说明${vvr.version}
是作业运行环境引擎版本,如您的作业运行在vvr-8.0.9-flink-1.17
版本引擎上,其对应的Flink版本为1.17.2
。建议您使用最新的引擎,具体版本详见引擎。由于将连接器作为项目依赖直接打入JAR包,它们必须在默认作用域(compile)中。
<!-- Kafka 连接器依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr.version}</version> </dependency> <!-- MySQL 连接器依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-mysql</artifactId> <version>${vvr.version}</version> </dependency>
如果您有开发新连接器或者拓展现有连接器功能的需求,项目还需要依赖连接器公共包
flink-connector-base
或ververica-connector-common
。<!-- Flink 连接器公共接口基础依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <!-- 阿里云连接器公共接口基础依赖 --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-common</artifactId> <version>${vvr.version}</version> </dependency>
为了避免JAR包依赖冲突,请您注意以下几点:
${flink.version}
为作业运行对应的Flink版本。请使用与作业部署页面选择的VVR引擎所使用的Flink版本一致。例如您在部署页面选择的引擎为vvr-8.0.9-flink-1.17
,其对应的Flink版本为1.17.2
,建议您使用最新的引擎,具体版本详见引擎版本。Flink相关依赖,作用域请使用provided,即在依赖中添加
<scope>provided</scope>
。主要包含org.apache.flink
组下以flink-
开头的非Connector依赖。Flink源代码中只有明确标注了@Public或者@PublicEvolving的才是公开供用户调用的方法,阿里云实时计算Flink版只对这些方法的兼容性做出产品保证。
如果是Flink服务内置的Connector支持的DataStream API,建议使用其内置的依赖。
相关文档
完整的开发参考示例详情请参见JAR作业开发。
更多支持Datastream的连接器请参见支持的连接器。
DataStream连接器均添加了商业化加密保护,直接运行会报错。如需本地调试和运行,请参见本地运行和调试包含连接器的作业。