本文为您介绍如何创建实时计算Flink版云数据库Redis版结果表以及创建过程中涉及的WITH参数、类型映射和属性字段。

注意
  • 本文仅支持实时计算Flink版3.2.0及以上版本。
  • 实时计算Flink版Redis结果表支持自建Redis服务。

Remote Dictionary Server(Redis)是一个开源的、使用ANSIC语言编写的Key-Value存储系统,其优势在于高性能与具有丰富的数据类型。实时计算Flink版支持使用Redis作为流式数据存储维表或结果表。

语法示例

目前Redis结果表支持5种Redis数据结构,其DDL定义如下:
  • STRING类型
    DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为set key value
    create table resik_output (
      a varchar,
      b varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'string',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除之前插入的数据,默认为false。
    );
  • LIST类型
    DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为lpush key value
    create table resik_output (
      a varchar,
      b varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'list',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除之前插入的数据,默认为false。
    );
  • SET类型
    DDL为两列:第1列为key,第2列为value。Redis插入数据的命令为sadd key value
    create table resik_output (
      a varchar,
      b varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'set',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除之前插入的数据,默认为false。
    );
  • HASHMAP类型
    DDL为三列:第1列为key,第2列为hash_key,第3列为hash_key对应的hash_value。Redis插入数据的命令为hmset key hash_key hash_value
    create table resik_output (
      a varchar,
      b varchar, 
      c varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'hashmap',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除之前插入的数据,默认为false。
    );
  • SORTEDSET类型
    DDL为三列:第1列为key,第2列为score,第3列为value。Redis插入数据的命令为add key score value
    create table resik_output (
      a varchar,
      b double,  --必须为DOUBLE类型。
      c varchar,
      primary key(a)
    ) with (
      type = 'redis',
      mode = 'sortedset',
      host = '${redisHost}', -- 例如,'127.0.0.1'。
      port = '${redisPort}', -- 例如,'6379'。
      dbNum = '${dbNum}', -- 默认为0。
      ignoreDelete = 'true' -- 收到Retraction时,是否删除之前插入的数据,默认为false。
    );

WITH参数

参数 参数说明 是否必填 取值
type 结果表类型 固定值为redis
mode 对应Redis的数据结构 取值如下:
  • string
  • list
  • set
  • hashmap
  • sortedset
host Redis Server对应地址 取值示例:127.0.0.1
port Redis Server对应端口 默认值为6379。
dbNum Redis Server对应数据库序号 默认值为0。
ignoreDelete 是否忽略Retraction消息 默认值为false,可取值为true或false。如果设置为true,收到Retraction时,同时删除数据对应的key及之前插入的数据。
password Redis Server对应的密码 默认值为空,不进行权限验证。

类型映射

Redis和实时计算Flink版字段类型对应关系如下。建议您使用该对应关系进行DDL声明。

Redis字段类型 实时计算Flink版字段类型
STRING VARCHAR
SCORE DOUBLE
说明 因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。

代码示例

包含Redis结果表的实时计算Flink版作业代码示例如下。
CREATE TABLE random_stream (
  v VARCHAR, 
  p VARCHAR) with (
    type = 'random'
);

create table resik_output (
  a VARCHAR,
  b VARCHAR,
  primary key(a) 
) with (
  type = 'redis',
  mode = 'string',
  host = '<yourRedisHost>',
  password = '<yourRedisPassword>'
);

INSERT INTO resik_output 
SELECT v, p
FROM random_stream;