如果您通过DataStream的方式读写MaxCompute数据,则需要使用MaxCompute DataStream Connector连接Flink全托管。本文为您介绍如何在Flink全托管控制台上使用MaxCompute DataStream Connector来读写MaxCompute数据。

背景信息

MaxCompute DataStream Connector基于MaxCompute SQL Connector代码开发,连接配置与对应SQL配置完全相同,SQL配置详情请参见全量MaxCompute源表WITH参数增量MaxCompute源表WITH参数MaxCompute结果表WITH参数

Maven中央库中已经放置了VVR Connector,以供您在作业开发时直接使用。您可以通过以下任何一种方式来使用DataStream Connector:

(推荐)直接将Connector作为项目依赖打进作业JAR包

  1. 在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
    <repositories>
      <repository>
        <id>oss.sonatype.org-snapshot</id>
        <name>OSS Sonatype Snapshot Repository</name>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
      <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
    </repositories>
  2. 检查您的settings.xml配置文件中是否存在<mirrorOf>*</mirrorOf>配置。

    如果存在<mirrorOf>*</mirrorOf>配置,则需要将此配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

    修改的目的是为了避免SNAPSHOT仓库被覆盖,因为mirrorOf中只使用星号(*)会导致第一步中配置的两个repository被覆盖。

  3. 在作业的Maven POM文件中添加您需要的Connector作为项目依赖。
    • MaxCompute源表和结果表依赖
      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-odps</artifactId>
          <version>${connector.version}</version>
      </dependency>
    • MaxCompute增量源表依赖
      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-continuous-odps</artifactId>
          <version>${connector.version}</version>
      </dependency>
    每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。Connector版本、VVR/Flink版本和Connector类型的对应关系请参见Connector列表。完整的依赖信息请参见MaxCompute示例代码中的pom.xml文件。
    注意
    • 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本,在Maven中央库(search.maven.org)上会查找不到。
    • 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
      <transformers>
          <!-- The service transformer is needed to merge META-INF/services files -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
              <projectName>Apache Flink</projectName>
              <encoding>UTF-8</encoding>
          </transformer>
      </transformers>
  4. 修改MaxCompute连接配置和Schema信息。
    类别 说明
    MaxCompute连接配置 替换成您MaxCompute的链接配置信息。与对应SQL配置完全相同,详情请参见:
    Schema信息 表的模式信息。
    • MaxCompute源表
      VVR提供了SourceFunction的实现类ODPSStreamSource来读取MaxCompute表数据。以下为构建MaxCompute Source读取分区数据的示例。
      //MaxCompute连接配置。
      Configuration conf = new Configuration();
      conf.setString("endpoint", "yourEndpoint");
      conf.setString("tunnelEndpoint","YourTunnelEndpoint");
      conf.setString("project", "yourProjectName");
      conf.setString("tablename", "yourTableName");
      conf.setString("accessid", "yourAccessKeyID");
      conf.setString("accesskey", "yourAccessKeySecret");
      conf.setString("partition", "ds=yourPartitionName");
      
      //Schema信息。
      TableSchema schema = org.apache.flink.table.api.TableSchema.builder()
              .field("a", DataTypes.STRING())
              .field("b", DataTypes.STRING())
              .build();
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      //构建MaxCompute Source Function。
      ODPSStreamSource odpsSource =
          new OdpsSourceBuilder(schema, conf).buildSourceFunction();
      DataStreamSource<RowData> source = env.addSource(odpsSource);
      source.addSink(new PrintSinkFunction<>());
      env.execute("odps source");
      说明
      • 构建类OdpsSourceBuilder不在VVR Connector依赖中,是MaxCompute示例代码中提供的以方便构建ODPSStreamSource。
      • MaxCompute源表是以RowData格式(GenericRowData)读取数据的。
    • MaxCompute增量源表
      VVR提供了SourceFunction的实现类ContinuousODPSStreamSource来增量读取MaxCompute表数据。以下为构建MaxCompute Source增量读取分区数据的示例。
      //MaxCompute连接配置。
      Configuration conf = new Configuration();
      conf.setString("endpoint", "yourEndpoint");
      conf.setString("tunnelEndpoint","YourTunnelEndpoint");
      conf.setString("project", "yourProjectName");
      conf.setString("tablename", "yourTableName");
      conf.setString("accessid", "yourAccessKeyID");
      conf.setString("accesskey", "yourAccessKeySecret");
      conf.setString("partition", "ds=yourPartitionName");
      
      //Schema信息。
      TableSchema schema = org.apache.flink.table.api.TableSchema.builder()
              .field("a", DataTypes.STRING())
              .field("b", DataTypes.STRING())
              .build();
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      //构建Continuous MaxCompute Source Function。
      ContinuousODPSStreamSource continuousOdpsSource =
              new OdpsSourceBuilder(schema, conf).buildContinuousOdpsSource();
      DataStreamSource<RowData> source = env.addSource(continuousOdpsSource);
      source.addSink(new PrintSinkFunction<>());
      env.execute("continuous odps source");
      说明
      • 构建类OdpsSourceBuilder不在VVR Connector依赖中,是示例代码中提供的以方便构建ODPSStreamSource。
      • MaxCompute增量源表是以RowData格式(GenericRowData)读取数据的。
    • MaxCompute结果表
      VVR提供了OutputFormat的实现类OdpsOutputFormat,基于MaxCompute TableTunnel实现对表或分区进行数据上传。以下为构建OdpsOutputFormat上传数据到特定分区的代码示例。
      //MaxCompute连接配置。
      DescriptorProperties properties = new DescriptorProperties();
      properties.putString("endpoint", "yourEndpoint");
      properties.putString("tunnelEndpoint","YourTunnelEndpoint");
      properties.putString("project", "yourProjectName");
      properties.putString("tablename", "yourTableName");
      properties.putString("accessid", "yourAccessKeyID");
      properties.putString("accesskey", "yourAccessKeySecret");
      properties.putString("partition", "ds=yourPartitionName");
      
      //Schema信息。
      TableSchema schema = TableSchema.builder()
              .field("a", DataTypes.STRING())
              .field("b", DataTypes.STRING())
              .build();
      
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      //MaxCompute Sink在Checkpoint时提交已上传的数据, 来尽量实现exactly-once。
      env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE);
      
      // 构建MaxCompute Sink Function。
      TupleOutputFormatSinkFunction<Row> odpsSink = new TupleOutputFormatSinkFunction<>(
              new OdpsOutputFormat(schema, properties));
      
      env.fromCollection(
              Arrays.asList(Row.of("aa", "111"), Row.of("bb", "222"))
      ).map(new MapFunction<Row, Tuple2<Boolean, Row>>() {
          @Override
          public Tuple2<Boolean, Row> map(Row row) throws Exception {
              return Tuple2.of(true, row);
          }
      }).addSink(odpsSink);
      env.execute("odps sink");
      说明
      • MaxCompute Sink在Checkpoint时以及任务结束时才会提交已经上传的数据(数据在提交后才可见)。如果您要保证数据实时可见,则需要开启Checkpoint,更详细内容参见实现原理
      • MaxCompute Sink接收的数据格式是Tuple2<Boolean, Row>,因此在写入前需要通过MapFunction将数据转化为该格式。

上传Connector JAR包到Flink全托管开发控制台后,填写配置信息

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的目标Connector的JAR包。

    您可以上传您自己开发的Connector,也可以上传Flink全托管产品提供的Connector。Flink全托管产品提供的Connector官方JAR包的下载地址,请参见Connector列表

  5. 在目标作业开发页面附加依赖文件项,选择目标Connector的JAR包。