本文为您介绍如何创建实时计算Flink版自定义结果表,自定义结果表可以满足您各种差异化的输出需求。

注意
  • 本文仅适用于Blink 1.4.5及以上版本。
  • 本文仅适用于独享模式。

搭建环境

您可以通过以下两种方式搭建自定义结果表的开发环境:
  • 直接使用示例中的环境。
    为了便于您快速开发业务,实时计算Flink版提供如下自定义结果表示例:
    说明 示例中已为您配置对应版本的开发环境,您无需搭建环境。
  • 下载JAR包自行搭建环境
    说明 Maven工程中引用以下依赖包时,Scope设置为<scope>provided</scope>
    • 实时计算Flink版3.0版本
      • JAR包下载
        请在POM文件中添加如下信息,完成flink-table_2.11JAR包的自动下载。
        <profiles>
          <profile>
             <id>allow-snapshots</id>
                <activation><activeByDefault>true</activeByDefault></activation>
             <repositories>
               <repository>
                 <id>snapshots-repo</id>
                 <url>https://oss.sonatype.org/content/repositories/snapshots</url>
                 <releases><enabled>false</enabled></releases>
                 <snapshots><enabled>true</enabled></snapshots>
               </repository>
             </repositories>
           </profile>
        </profiles>
      • 依赖包
         <dependencies>
          <dependency>
            <groupId>com.alibaba.blink</groupId>
            <artifactId>blink-connector-common</artifactId>
            <version>blink-3.2.1-SNAPSHOT</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>com.alibaba.blink</groupId>
            <artifactId>blink-connector-custom</artifactId>
            <version>blink-3.2.1-SNAPSHOT</version>
            <scope>provided</scope>
          </dependency>
          <dependency>
            <groupId>com.alibaba.blink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>blink-3.2.1-SNAPSHOT</version>
            <scope>provided</scope>
          </dependency>
        </dependencies>
    • 实时计算Flink版2.0版本
      • JAR包下载
      • 依赖包
          <dependencies>
            <dependency>
              <groupId>com.alibaba.blink</groupId>
              <artifactId>blink-table</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-table_2.11</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
            <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-core</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
            <dependency>
              <groupId>com.alibaba.blink</groupId>
              <artifactId>blink-connector-common</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
            <dependency>
              <groupId>com.alibaba.blink</groupId>
              <artifactId>blink-connector-custom</artifactId>
              <version>blink-2.2.4-SNAPSHOT</version>
              <scope>provided</scope>
            </dependency>
          </dependencies>
    • 实时计算Flink版1.0版本
      • JAR包下载
      • 依赖包
              <dependencies>
                <dependency>
                    <groupId>com.alibaba.blink</groupId>
                    <artifactId>blink-connector-common</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>com.alibaba.blink</groupId>
                    <artifactId>blink-connector-custom</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-core</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
                <dependency>
                    <groupId>com.alibaba.blink</groupId>
                    <artifactId>blink-table</artifactId>
                    <version>blink-1.4-SNAPSHOT</version>
                    <scope>provided</scope>
                </dependency>
            </dependencies>

接口说明

自定义结果表Class需要继承自定义Sink插件的基类CustomSinkBase,并使用如下方法实现。
protected Map<String,String> userParamsMap;// userParamsMap是自定义SQL的WITH语句中定义的键值对,所有的键均为小写。
protected Set<String> primaryKeys;// primaryKeys是自定义的主键字段名。
protected List<String> headerFields;// headerFields是标记为header的字段列表。
protected RowTypeInfo rowTypeInfo;//字段类型和名称。
/**
 * 初始化方法。每次初始建立和Failover的时候会调用一次。
 * 
 * @param taskNumber taskNumber为当前节点的编号。
 * @param numTasks   numTasks为Sink节点的总数。
 * @throws IOException
 */
public abstract void open(int taskNumber,int numTasks) throws IOException;

/**
 * close方法,释放资源。
 *
 * @throws IOException
 */
public abstract void close() throws IOException;

/**
 * 处理插入单行数据。
 *
 * @param row
 * @throws IOException
 */
public abstract void writeAddRecord(Row row) throws IOException;

/**
 * 处理删除单行数据。
 *
 * @param row
 * @throws IOException
 */
public abstract void writeDeleteRecord(Row row) throws IOException;

/**
 * 如果进行批量插入,该方法需要把线程中缓存的数据全部刷入下游存储;如果不进行批量插入,可以不使用该方法。
 *
 * @throws IOException
 */
public abstract void sync() throws IOException;

/** 
* 返回类名。 
*/ 
public String getName();

自定义Redis结果表示例

在实时计算Flink版开发控制台上传JAR包,引用资源之后,对于自定义的Sink插件,需要指明type = 'custom',并且指明实现接口的Class。自定义Redis结果表示例请参见示例
create table in_table(
    kv varchar 
)with(
    type = 'random'
);

create table out_table(
    `key` varchar,
    `value` varchar
)with(
    type = 'custom',
    class = 'com.alibaba.blink.connector.custom.demo.RedisSink',
    //1. 可以定义更多自定义参数, 在open函数中通过userParamsMap获取。
    //2. with参数里key大小写不敏感。在实时计算Flink版中,参数key的值直接处理为全小写。建议您在引用数据存储的DDL中使用小写声明key。
    host = 'r-uf****.redis.rds.aliyuncs.com',
    port = '6379',
    db = '0',
    batchsize = '10',
    password = '<yourHostPassword>'
);

insert into out_table
select
substring(kv,0,4) as `key`,
substring(kv,0,6) as `value`
from in_table;
Redis Sink插件的参数说明如下。
参数 说明 是否必填 备注
host Redis实例内网连接地址(host)
port Redis实例端口号
password Redis连接密码
db Redis Database编号 默认值为0,表示db0。
batchsize 每次批量写入的条数 默认值为1,表示不批量写入。