DataStream连接器设置方法

本文为您介绍如何使用DataStream连接器,即通过DataStream的方式读写数据。

Datastream连接器的依赖和使用

如果您通过DataStream的方式读写数据,则需要使用对应的DataStream连接器连接实时计算Flink版。Maven中央库中已经放置了VVR DataStream连接器,以供您在作业开发时直接使用。

重要

请使用我们在支持的连接器中指明提供DataStream API的连接器。如果某个连接器未注明提供DataStream API,请勿自行使用,因为未来接口和参数可能会被修改。

DataStream连接器均添加了商业化加密保护,直接运行会报错。如需本地调试和运行,请参见本地运行和调试包含连接器的作业

您可以选择以下任意一种方式来使用连接器:

(推荐)上传连接器JAR包作为附加依赖文件引入

  1. 在作业的Maven POM文件中添加您需要的连接器作为项目依赖,其作用域为provided。

    说明
    • ${vvr.version}是作业运行环境引擎版本,如您的作业运行在vvr-8.0.9-flink-1.17版本引擎上,其对应的Flink版本为1.17.2。建议您使用最新的引擎,具体版本详见引擎版本

    • 由于将连接器的JAR包作为附加依赖文件引入,则无需将该依赖打入JAR包中,所以需要声明作用域为provided

    <!-- MySQL 连接器依赖 -->
      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-mysql</artifactId>
          <version>${vvr.version}</version>
          <scope>provided</scope>
      </dependency>
  2. 如果您有开发新连接器或者拓展现有连接器功能的需求,项目还需要依赖连接器公共包flink-connector-baseververica-connector-common

    <!-- Flink 连接器公共接口基础依赖 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 阿里云连接器公共接口基础依赖 -->
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-common</artifactId>
        <version>${vvr.version}</version>
    </dependency>
  3. 部署JAR作业并在附加依赖文件项中添加相应的连接器JAR包。您可以上传您自己开发的连接器,也可以上传实时计算Flink版提供的连接器

    image

直接将连接器作为项目依赖打进作业JAR

  1. 在作业的Maven POM文件中添加您需要的连接器作为项目依赖。例如引入Kafka连接器和MySQL连接器。

    说明
    • ${vvr.version}是作业运行环境引擎版本,如您的作业运行在vvr-8.0.9-flink-1.17版本引擎上,其对应的Flink版本为1.17.2。建议您使用最新的引擎,具体版本详见引擎

    • 由于将连接器作为项目依赖直接打入JAR包,它们必须在默认作用域(compile)中。

            <!-- Kafka 连接器依赖 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-kafka</artifactId>
                <version>${vvr.version}</version>
            </dependency>
            <!-- MySQL 连接器依赖 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-mysql</artifactId>
                <version>${vvr.version}</version>
            </dependency>
  2. 如果您有开发新连接器或者拓展现有连接器功能的需求,项目还需要依赖连接器公共包flink-connector-baseververica-connector-common

            <!-- Flink 连接器公共接口基础依赖 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-base</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- 阿里云连接器公共接口基础依赖 -->
            <dependency>
                <groupId>com.alibaba.ververica</groupId>
                <artifactId>ververica-connector-common</artifactId>
                <version>${vvr.version}</version>
            </dependency>
重要

为了避免JAR包依赖冲突,请您注意以下几点:

  • ${flink.version}为作业运行对应的Flink版本。请使用与作业部署页面选择的VVR引擎所使用的Flink版本一致。例如您在部署页面选择的引擎为vvr-8.0.9-flink-1.17,其对应的Flink版本为1.17.2,建议您使用最新的引擎,具体版本详见引擎版本

  • Flink相关依赖,作用域请使用provided,即在依赖中添加<scope>provided</scope>。主要包含org.apache.flink组下以flink-开头的非Connector依赖。

  • Flink源代码中只有明确标注了@Public或者@PublicEvolving的才是公开供用户调用的方法,阿里云实时计算Flink版只对这些方法的兼容性做出产品保证。

  • 如果是Flink服务内置的Connector支持的DataStream API,建议使用其内置的依赖。

相关文档