本文为您介绍如何使用云数据库Tair连接器(Redis开源版)。
背景信息
阿里云数据库Tair是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求,更多内容详情请参见什么是云数据库 Tair(兼容 Redis)。
Redis连接器支持的信息如下。
| 类别 | 详情 | 
| 支持类型 | 维表和结果表 | 
| 支持模式 | 流模式 | 
| 数据格式 | String | 
| 特有监控指标 | 
 说明  指标含义详情,请参见监控指标说明。 | 
| API 种类 | SQL | 
| 是否支持更新或删除结果表数据 | 是 | 
前提条件
使用限制
- 目前Redis连接器是仅提供Best Effort语义,无法保证数据的Exactly Once,需要您自行保证语义上的幂等性。 
- 维表使用限制有: - 仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。 
- 维表的字段必须为STRING,且必须声明且只能声明一个主键。 
- 维表JOIN时,ON条件必须包含主键的等值条件。 
 
已知缺陷及解决方案
实时计算引擎VVR 8.0.9版本缓存功能存在问题,需要在结果表WITH参数中添加 'sink.buffer-flush.max-rows' = '0' 禁用。
语法结构
CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING'  -- 结果表时必填。
);WITH参数
通用
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| connector | 表类型。 | String | 是 | 无 | 固定值为redis。 | 
| host | Redis Server连接地址。 | String | 是 | 无 | 推荐您使用内网地址。 说明  由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。 | 
| port | Redis Server连接端口。 | Int | 否 | 6379 | 无。 | 
| password | Redis数据库密码。 | String | 否 | 空字符串,表示不进行校验。 | 无。 | 
| dbNum | 选择操作的数据库编号。 | Int | 否 | 0 | 无。 | 
| clusterMode | Redis集群是否为集群模式。 | Boolean | 否 | false | 无。 | 
| hostAndPorts | Redis集群的主机和端口号。 说明  如果启用了集群模式,且不需要连接高可用,可以通过host和port配置项只配置其中一台主机,也可以只配置该项。该配置项的优先级高于独立的host和port配置项。 | String | 否 | 空 | 如果 | 
| key-prefix | 表主键值的前缀。 | String | 否 | 无 | 配置后,Redis维表和结果表的主键字段值在查询或者写入Redis时会被自动添加前缀,该前缀是由键前缀(key-prefix)和其后的前缀分隔符(key-prefix-delimiter)组成。 说明  仅实时计算引擎VVR 8.0.7及以上版本支持该参数。 | 
| key-prefix-delimiter | 表主键值与表主键值前缀之间的分隔符。 | String | 否 | 无 | |
| connection.pool.max-total | 连接池可以分配的最大连接数。 | Int | 否 | 8 | 说明  仅实时计算引擎VVR 8.0.9及以上版本支持该参数。 | 
| connection.pool.max-idle | 连接池中最大空闲连接数。 | Int | 否 | 8 | |
| connection.pool.min-idle | 连接池中最小空闲连接数。 | Int | 否 | 0 | |
| connect.timeout | 建立连接的超时时间。 | Duration | 否 | 3000ms | |
| socket.timeout | 从Redis服务器接收数据的超时时间(即套接字超时)。 | Duration | 否 | 3000ms | |
| cacert.filepath | SSL/TLS证书文件的完整路径,文件格式必须为jks。 | String | 否 | 无,表示不开启SSL/TLS加密。 | 参考开启TLS加密下载CA证书,并在作业的附加依赖文件中上传,上传后,CA证书将存储在/flink/usrlib目录下。如何在附加依赖文件中上传文件,详情请参见部署作业。示例: 说明  仅实时计算引擎VVR 11.1及以上版本支持该参数。 | 
结果表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| mode | 对应Redis的数据类型。 | String | 是 | 无 | 云数据库Tair结果表支持5种Redis数据类型,其DDL必须按指定格式定义且主键必须被定义。详情请参见Redis结果表数据类型格式。 | 
| flattenHash | 是否按照多值模式写入HASHMAP类型数据。 | Boolean | 否 | false | 参数取值如下: 
 说明  
 | 
| ignoreDelete | 是否忽略Retraction消息。 | Boolean | 否 | false | 参数取值如下: 
 | 
| expiration | 为写入数据对应的Key设置TTL。 | Long | 否 | 0,代表不设置TTL。 | 如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。 | 
| sink.buffer-flush.max-rows | 缓存可保存的最大记录数。 | Int | 否 | 200 | 缓存记录包括所有追加、修改和删除的事件,超过最大记录数时将刷写缓存。 说明  
 | 
| sink.buffer-flush.interval | 缓存刷写时间间隔。 | Duration | 否 | 1000ms | 异步刷写缓存。 说明  
 | 
维表独有
| 参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 | 
| mode | 读取Redis的数据类型。 | String | 否 | STRING | 参数取值如下: STRING:默认以STRING类型读取。 HASHMAP:按照多值模式读取HASHMAP类型数据。 此时DDL需要声明多个非主键字段。 
 说明  仅实时计算引擎VVR 8.0.7及以上版本支持该参数。 如果您需要以单值模式读取HASHMAP类型数据时,请配置hashName参数。 | 
| hashName | 单值模式读取HASHMAP类型数据时使用的key。 | String | 否 | 无 | 如果您未指定mode参数,还希望以单值模式读取HASHMAP类型数据。此时,您需要配置hashName。 此时DDL仅需要声明两个字段,第一个主键字段的字段值对应field,第二个非主键字段的字段值对应value。 | 
| cache | 缓存策略。 | String | 否 | None | 云数据库Tair维表支持以下缓存策略: None(默认值):无缓存。 LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。 ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在。全量的Cache有一个过期时间,过期后会重新加载一遍全量Cache。 重要  
 | 
| cacheSize | 缓存大小。 | Long | 否 | 10000 | 当选择LRU缓存策略时,需要设置缓存大小。 | 
| cacheTTLMs | 缓存超时时长,单位为毫秒。 | Long | 否 | 无 | cacheTTLMs配置和cache有关: 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。 | 
| cacheEmpty | 是否缓存空结果。 | Boolean | 否 | true | 无。 | 
| cacheReloadTimeBlackList | 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 | String | 否 | 无 | 格式为: 
 分隔符的使用情况如下所示: 
 | 
| async | 是否异步返回数据。 | Boolean | 否 | false | 
 | 
Redis结果表数据类型格式
| 类型 | 格式 | Redis插入数据的命令 | 
| STRING类型 | DDL为两列: 
 | 
 | 
| LIST类型 | DDL为两列: 
 | 
 | 
| SET类型 | DDL为两列: 
 | 
 | 
| HASHMAP类型 | 默认情况下,DDL为三列: 
 | 
 | 
| flattenHash参数配置为true时,DDL支持多列,以4列的情况为例: 
 | 
 | |
| SORTEDSET类型 | DDL为三列: 
 | 
 | 
类型映射
| 类型 | Redis字段类型 | Flink字段类型 | 
| 通用 | STRING | STRING | 
| 结果表独有 | SCORE | DOUBLE | 
因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。
使用示例
- 结果表 - 写入STRING类型数据:在代码示例中, - redis_sink结果表中- user_id列的值会作为key,- login_time列的值会作为value写入到Redis中。- CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 用户ID login_time STRING -- 登录时间戳 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_logins', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_sink ( user_id STRING, -- Redis的key login_time STRING, -- Redis的value PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', -- 使用STRING模式 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
- 多值模式写入HASHMAP类型数据:在代码示例中, - redis_sink结果表中的- order_id列的值会作为key,- product_name列的值会作为field为product_name的value,- quantity列的值会作为field为quantity的value,- amount列的值会作为field为amount的value,写入到Redis中。- CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 订单 ID product_name STRING, -- 商品名称 quantity STRING, -- 商品数量 amount STRING -- 订单金额 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- 订单ID,作为Redis的key product_name STRING, -- 商品名称,作为Redis Hash的field quantity STRING, -- 商品数量,作为Redis Hash的field amount STRING, -- 订单金额,作为Redis Hash的field PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', -- 使用HASHMAP模式 'flattenHash' = 'true', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
- 单值模式写入HASHMAP类型数据:在代码示例中, - redis_sink结果表中的- order_id列的值会作为key,- product_name列的值会作为field,- quantity列的值会作为value写入到Redis中。- CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 订单ID product_name STRING, -- 商品名称 quantity STRING -- 商品数量 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Redis的key product_name STRING, -- Redis的field quantity STRING, -- Redis的value PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
 
- 维表 - 读取STRING类型数据:在代码示例中, - redis_dim维表中的- user_id列的值对应key,- user_name列的值对应value。- CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 用户ID proctime AS PROCTIME() -- 处理时间 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- 用户ID(Redis key) user_name STRING, -- 用户名(Redis value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis主机地址 'port' = 'yourPort', -- Redis端口 'password' = 'yourPassword', -- Redis密码 'mode' = 'STRING' -- 使用STRING模式 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- 用户ID redis_user_id STRING, -- Redis中的用户ID user_name STRING -- 用户名 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- 用户ID(来自 Kafka) t2.user_id, -- Redis中的用户ID t2.user_name -- 用户名(来自Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;
- 多值模式读取HASHMAP类型数据:在代码示例中, - redis_dim维表中的- user_id列的值对应key,- user_name列的值对应field为user_name的value,- email列的值对应field为email的value,- register_time列的值对应field为register_time的value。- CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 用户ID click_time TIMESTAMP(3), -- 点击时间 proctime AS PROCTIME() -- 处理时间 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- 用户ID(Redis key) user_name STRING, -- 用户名(Redis field-value对的一部分) email STRING, -- 邮箱(Redis field-value对的一部分) register_time STRING, -- 注册时间(Redis field-value对的一部分) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword', 'mode' = 'HASHMAP' -- 使用HASHMAP模式 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- 用户ID user_name STRING, -- 用户名 email STRING, -- 邮箱 register_time STRING, -- 注册时间 click_time TIMESTAMP(3) -- 点击时间 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- 用户ID t2.user_name, -- 用户名 t2.email, -- 邮箱 t2.register_time, -- 注册时间 t1.click_time -- 点击时间 FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;
- 单值模式读取HASHMAP类型数据:在代码示例中, - hashName参数的值testKey为key,- redis_dim维表中的- user_id列的值对应field,- user_name列的值对应value。- CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 用户ID proctime AS PROCTIME() -- 处理时间 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- 用户ID(Redis hash field) user_name STRING, -- 用户名(Redis hash value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis主机地址 'port' = 'yourPort', -- Redis端口 'password' = 'yourPassword',-- Redis密码 'hashName' = 'testkey' -- 固定的Redis hash名称 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- 用户ID redis_user_id STRING, -- Redis中的用户ID user_name STRING -- 用户名 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- 用户ID(来自Kafka) t2.user_id, -- Redis中的用户ID t2.user_name -- 用户名(来自 Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;