本文为您介绍如何使用云数据库Tair连接器(Redis开源版)。
背景信息
阿里云数据库Tair是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求,更多内容详情请参见什么是云数据库 Tair(兼容 Redis)。
Redis连接器支持的信息如下。
类别 | 详情 |
类别 | 详情 |
支持类型 | 维表和结果表 |
支持模式 | 流模式 |
数据格式 | String |
特有监控指标 |
指标含义详情,请参见监控指标说明。 |
API 种类 | SQL |
是否支持更新或删除结果表数据 | 是 |
前提条件
使用限制
目前Redis连接器是仅提供Best Effort语义,无法保证数据的Exactly Once,需要您自行保证语义上的幂等性。
维表使用限制有:
仅支持读取Redis数据存储中STRING和HASHMAP类型的数据。
维表的字段必须为STRING,且必须声明且只能声明一个主键。
维表JOIN时,ON条件必须包含主键的等值条件。
已知缺陷及解决方案
实时计算引擎VVR 8.0.9版本缓存功能存在问题,需要在结果表WITH参数中添加 'sink.buffer-flush.max-rows' = '0' 禁用。
语法结构
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- 结果表时必填。
);
WITH参数
通用
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
connector | 表类型。 | String | 是 | 无 | 固定值为redis。 |
host | Redis Server连接地址。 | String | 是 | 无 | 推荐您使用内网地址。 由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。 |
port | Redis Server连接端口。 | Int | 否 | 6379 | 无。 |
password | Redis数据库密码。 | String | 否 | 空字符串,表示不进行校验。 | 无。 |
dbNum | 选择操作的数据库编号。 | Int | 否 | 0 | 无。 |
clusterMode | Redis集群是否为集群模式。 | Boolean | 否 | false | 无。 |
hostAndPorts | Redis集群的主机和端口号。 如果启用了集群模式,且不需要连接高可用,可以通过host和port配置项只配置其中一台主机,也可以只配置该项。该配置项的优先级高于独立的host和port配置项。 | String | 否 | 空 | 如果 |
key-prefix | 表主键值的前缀。 | String | 否 | 无 | 配置后,Redis维表和结果表的主键字段值在查询或者写入Redis时会被自动添加前缀,该前缀是由键前缀(key-prefix)和其后的前缀分隔符(key-prefix-delimiter)组成。 仅实时计算引擎VVR 8.0.7及以上版本支持该参数。 |
key-prefix-delimiter | 表主键值与表主键值前缀之间的分隔符。 | String | 否 | 无 | |
connection.pool.max-total | 连接池可以分配的最大连接数。 | Int | 否 | 8 | 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。 |
connection.pool.max-idle | 连接池中最大空闲连接数。 | Int | 否 | 8 | |
connection.pool.min-idle | 连接池中最小空闲连接数。 | Int | 否 | 0 | |
connect.timeout | 建立连接的超时时间。 | Duration | 否 | 3000ms | |
socket.timeout | 从Redis服务器接收数据的超时时间(即套接字超时)。 | Duration | 否 | 3000ms |
结果表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
mode | 对应Redis的数据结构。 | String | 是 | 无 | 云数据库Tair结果表支持5种Redis数据结构,其DDL必须按指定格式定义且主键必须被定义。详情请参见Redis结果表数据结构格式。 |
flattenHash | 是否按照多值模式写入HASHMAP类型数据。 | Boolean | 否 | false | 参数取值如下:
|
ignoreDelete | 是否忽略Retraction消息。 | Boolean | 否 | false | 参数取值如下:
|
expiration | 为写入数据对应的Key设置TTL。 | Long | 否 | 0,代表不设置TTL。 | 如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。 仅实时计算引擎VVR 4.0.13及以上版本支持该参数。 |
sink.buffer-flush.max-rows | 缓存可保存的最大记录数。 | Int | 否 | 200 | 缓存记录包括所有追加、修改和删除的事件,超过最大记录数时将刷写缓存。
|
sink.buffer-flush.interval | 缓存刷写时间间隔。 | Duration | 否 | 1000ms | 异步刷写缓存。
|
维表独有
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
参数 | 说明 | 数据类型 | 是否必填 | 默认值 | 备注 |
mode | 读取Redis的数据类型。 | String | 否 | STRING | 参数取值如下: STRING:不指定时,默认以STRING类型读取。 HASHMAP:当指定mode为HASHMAP时,将按照多值模式读取HASHMAP类型数据。 此时DDL需要声明多个非主键字段,主键字段值对应key,每个非主键字段的字段名对应field,字段值对应value。 仅实时计算引擎VVR 8.0.7及以上版本支持该参数。 如果您需要以单值模式读取HASHMAP类型数据时,请配置hashName参数。 |
hashName | 单值模式读取HASHMAP类型数据时使用的key。 | String | 否 | 无 | 如果您未指定mode参数,还希望以单值模式读取HASHMAP类型数据。此时,您需要配置hashName。 此时DDL仅需要声明两个字段,第一个主键字段的字段值对应field,第二个非主键字段的字段值对应value。 |
cache | 缓存策略。 | String | 否 | None | 云数据库Tair维表支持以下缓存策略: None(默认值):无缓存。 LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。 ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在。全量的Cache有一个过期时间,过期后会重新加载一遍全量Cache。 仅实时计算引擎VVR 8.0.3及以上版本支持ALL缓存策略。 ALL缓存策略目前仅支持单值模式读取hashmap类型数据(即hashName参数不为空,mode参数为空时)。 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。 |
cacheSize | 缓存大小。 | Long | 否 | 10000 | 当选择LRU缓存策略时,需要设置缓存大小。 |
cacheTTLMs | 缓存超时时长,单位为毫秒。 | Long | 否 | 无 | cacheTTLMs配置和cache有关: 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。 |
cacheEmpty | 是否缓存空结果。 | Boolean | 否 | true | 无。 |
cacheReloadTimeBlackList | 更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。 | String | 否 | 无 | 格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示: 用英文逗号(,)来分隔多个黑名单。 用箭头(->)来分割黑名单的起始结束时间。 |
Redis结果表数据结构格式
类型 | 格式 | Redis插入数据的命令 |
类型 | 格式 | Redis插入数据的命令 |
STRING类型 | DDL为两列:
|
|
LIST类型 | DDL为两列:
|
|
SET类型 | DDL为两列:
|
|
HASHMAP类型 | 默认情况下,DDL为三列:
|
|
flattenHash参数配置为true时,DDL支持多列,以4列的情况为例:
|
| |
SORTEDSET类型 | DDL为三列:
|
|
类型映射
类型 | Redis字段类型 | Flink字段类型 |
类型 | Redis字段类型 | Flink字段类型 |
通用 | STRING | STRING |
结果表独有 | SCORE | DOUBLE |
因为Redis的SCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。
使用示例
结果表
写入STRING类型数据:在代码示例中,
redis_sink
结果表中user_id
列的值会作为key,login_time
列的值会作为value写入到Redis中。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 用户ID login_time STRING -- 登录时间戳 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_logins', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_sink ( user_id STRING, -- Redis的key login_time STRING, -- Redis的value PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', -- 使用STRING模式 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
多值模式写入HASHMAP类型数据:在代码示例中,
redis_sink
结果表中的order_id
列的值会作为key,product_name
列的值会作为field为product_name的value,quantity
列的值会作为field为quantity的value,amount
列的值会作为field为amount的value,写入到Redis中。CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 订单 ID product_name STRING, -- 商品名称 quantity STRING, -- 商品数量 amount STRING -- 订单金额 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- 订单ID,作为Redis的key product_name STRING, -- 商品名称,作为Redis Hash的field quantity STRING, -- 商品数量,作为Redis Hash的field amount STRING, -- 订单金额,作为Redis Hash的field PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', -- 使用HASHMAP模式 'flattenHash' = 'true', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
单值模式写入HASHMAP类型数据:在代码示例中,
redis_sink
结果表中的order_id
列的值会作为key,product_name
列的值会作为field,quantity
列的值会作为value写入到Redis中。CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- 订单ID product_name STRING, -- 商品名称 quantity STRING -- 商品数量 ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Redis的key product_name STRING, -- Redis的field quantity STRING, -- Redis的value PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
维表
读取STRING类型数据:在代码示例中,
redis_dim
维表中的user_id
列的值对应key,user_name
列的值对应value。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 用户ID proctime AS PROCTIME() -- 处理时间 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- 用户ID(Redis key) user_name STRING, -- 用户名(Redis value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis主机地址 'port' = 'yourPort', -- Redis端口 'password' = 'yourPassword', -- Redis密码 'mode' = 'STRING' -- 使用STRING模式 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- 用户ID redis_user_id STRING, -- Redis中的用户ID user_name STRING -- 用户名 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- 用户ID(来自 Kafka) t2.user_id, -- Redis中的用户ID t2.user_name -- 用户名(来自Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;
多值模式读取HASHMAP类型数据:在代码示例中,
redis_dim
维表中的user_id
列的值对应key,user_name
列的值对应field为user_name的value,email
列的值对应field为email的value,register_time
列的值对应field为register_time的value。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 用户ID click_time TIMESTAMP(3), -- 点击时间 proctime AS PROCTIME() -- 处理时间 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- 用户ID(Redis key) user_name STRING, -- 用户名(Redis field-value对的一部分) email STRING, -- 邮箱(Redis field-value对的一部分) register_time STRING, -- 注册时间(Redis field-value对的一部分) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword', 'mode' = 'HASHMAP' -- 使用HASHMAP模式 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- 用户ID user_name STRING, -- 用户名 email STRING, -- 邮箱 register_time STRING, -- 注册时间 click_time TIMESTAMP(3) -- 点击时间 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- 用户ID t2.user_name, -- 用户名 t2.email, -- 邮箱 t2.register_time, -- 注册时间 t1.click_time -- 点击时间 FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;
单值模式读取HASHMAP类型数据:在代码示例中,
hashName
参数的值testKey为key,redis_dim
维表中的user_id
列的值对应field,user_name
列的值对应value。CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- 用户ID proctime AS PROCTIME() -- 处理时间 ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka主题 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker地址 'format' = 'json', -- 数据格式为JSON 'scan.startup.mode' = 'earliest-offset' -- 从最早的消息开始消费 ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- 用户ID(Redis hash field) user_name STRING, -- 用户名(Redis hash value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis主机地址 'port' = 'yourPort', -- Redis端口 'password' = 'yourPassword',-- Redis密码 'hashName' = 'testkey' -- 固定的Redis hash名称 ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- 用户ID redis_user_id STRING, -- Redis中的用户ID user_name STRING -- 用户名 ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- 用户ID(来自Kafka) t2.user_id, -- Redis中的用户ID t2.user_name -- 用户名(来自 Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;
- 本页导读 (1)
- 背景信息
- 前提条件
- 使用限制
- 已知缺陷及解决方案
- 语法结构
- WITH参数
- 通用
- 结果表独有
- 维表独有
- Redis结果表数据结构格式
- 类型映射
- 使用示例