云数据库Tair(Redis开源版)

更新时间:2024-11-28 07:36:37

本文为您介绍如何使用云数据库Tair连接器(Redis开源版)。

背景信息

阿里云数据库Tair是兼容开源Redis协议标准、提供内存加硬盘混合存储的数据库服务,基于高可靠双机热备架构及可平滑扩展的集群架构,充分满足高吞吐、低延迟及弹性变配的业务需求,更多内容详情请参见什么是云数据库 Tair(兼容 Redis)

Redis连接器支持的信息如下。

类别

详情

类别

详情

支持类型

维表和结果表

支持模式

流模式

数据格式

String

特有监控指标

  • 维表:无

  • 结果表:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

说明

指标含义详情,请参见监控指标说明

API 种类

SQL

是否支持更新或删除结果表数据

前提条件

使用限制

  • 目前Redis连接器是仅提供Best Effort语义,无法保证数据的Exactly Once,需要您自行保证语义上的幂等性。

  • 维表使用限制有:

    • 仅支持读取Redis数据存储中STRINGHASHMAP类型的数据。

    • 维表的字段必须为STRING,且必须声明且只能声明一个主键。

    • 维表JOIN时,ON条件必须包含主键的等值条件。

已知缺陷及解决方案

实时计算引擎VVR 8.0.9版本缓存功能存在问题,需要在结果表WITH参数中添加 'sink.buffer-flush.max-rows' = '0' 禁用。

语法结构

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING'  -- 结果表时必填。
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值为redis。

    host

    Redis Server连接地址。

    String

    推荐您使用内网地址。

    说明

    由于网络延迟和带宽限制等因素,连接公网地址时可能会出现不稳定的情况。

    port

    Redis Server连接端口。

    Int

    6379

    无。

    password

    Redis数据库密码。

    String

    空字符串,表示不进行校验。

    无。

    dbNum

    选择操作的数据库编号。

    Int

    0

    无。

    clusterMode

    Redis集群是否为集群模式。

    Boolean

    false

    无。

    hostAndPorts

    Redis集群的主机和端口号。

    说明

    如果启用了集群模式,且不需要连接高可用,可以通过hostport配置项只配置其中一台主机,也可以只配置该项。该配置项的优先级高于独立的hostport配置项。

    String

    如果ClusterMode = true,同时需要支持Jedis到自建Redis集群连接的高可用,必须配置该项。配置格式为字符串:"host1:port1,host2:port2"

    key-prefix

    表主键值的前缀。

    String

    配置后,Redis维表和结果表的主键字段值在查询或者写入Redis时会被自动添加前缀,该前缀是由键前缀(key-prefix)和其后的前缀分隔符(key-prefix-delimiter)组成。

    说明

    仅实时计算引擎VVR 8.0.7及以上版本支持该参数。

    key-prefix-delimiter

    表主键值与表主键值前缀之间的分隔符。

    String

    connection.pool.max-total

    连接池可以分配的最大连接数。

    Int

    8

    说明

    仅实时计算引擎VVR 8.0.9及以上版本支持该参数。

    connection.pool.max-idle

    连接池中最大空闲连接数。

    Int

    8

    connection.pool.min-idle

    连接池中最小空闲连接数。

    Int

    0

    connect.timeout

    建立连接的超时时间。

    Duration

    3000ms

    socket.timeout

    Redis服务器接收数据的超时时间(即套接字超时)。

    Duration

    3000ms

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    mode

    对应Redis的数据结构。

    String

    云数据库Tair结果表支持5Redis数据结构,其DDL必须按指定格式定义且主键必须被定义。详情请参见Redis结果表数据结构格式

    flattenHash

    是否按照多值模式写入HASHMAP类型数据。

    Boolean

    false

    参数取值如下:

    • true:按照多值模式写入。此时,您需要在DDL中声明多个非主键字段,主键字段值对应key,每个非主键字段的字段名对应一个field,字段值对应该fieldvalue。

    • false:按照单值模式写入。此时您需要在DDL中声明三个字段,第一个主键字段的字段值对应key,第二个非主键字段的字段值对应field,第三个非主键字段的字段值对应value。

    说明
    • 该参数仅在mode参数取值为HASHMAP时生效。

    • 仅实时计算引擎VVR 8.0.7及以上版本支持该参数。

    ignoreDelete

    是否忽略Retraction消息。

    Boolean

    false

    参数取值如下:

    • true:收到Retraction消息时,忽略Retraction消息。

    • false:收到Retraction消息时,同时删除数据对应的key及已插入的数据。

    expiration

    为写入数据对应的Key设置TTL。

    Long

    0,代表不设置TTL。

    如果该参数的值大于0,则写入数据对应的Key会被设置相应的TTL,单位为毫秒。

    说明

    仅实时计算引擎VVR 4.0.13及以上版本支持该参数。

    sink.buffer-flush.max-rows

    缓存可保存的最大记录数。

    Int

    200

    缓存记录包括所有追加、修改和删除的事件,超过最大记录数时将刷写缓存。

    说明
    • 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。

    • 仅适用于非Redis集群实例,可以设置为0禁用该参数。

    sink.buffer-flush.interval

    缓存刷写时间间隔。

    Duration

    1000ms

    异步刷写缓存。

    说明
    • 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。

    • 仅适用于非Redis集群实例,可以设置为0禁用该参数。

  • 维表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    mode

    读取Redis的数据类型。

    String

    STRING

    参数取值如下:

    • STRING:不指定时,默认以STRING类型读取。

    • HASHMAP:当指定modeHASHMAP时,将按照多值模式读取HASHMAP类型数据。

      此时DDL需要声明多个非主键字段,主键字段值对应key,每个非主键字段的字段名对应field,字段值对应value。

    说明
    • 仅实时计算引擎VVR 8.0.7及以上版本支持该参数。

    • 如果您需要以单值模式读取HASHMAP类型数据时,请配置hashName参数。

    hashName

    单值模式读取HASHMAP类型数据时使用的key。

    String

    如果您未指定mode参数,还希望以单值模式读取HASHMAP类型数据。此时,您需要配置hashName。

    此时DDL仅需要声明两个字段,第一个主键字段的字段值对应field,第二个非主键字段的字段值对应value。

    cache

    缓存策略。

    String

    None

    云数据库Tair维表支持以下缓存策略:

    • None(默认值):无缓存。

    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

    • ALL:缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在。全量的Cache有一个过期时间,过期后会重新加载一遍全量Cache。

    重要
    • 仅实时计算引擎VVR 8.0.3及以上版本支持ALL缓存策略。

    • ALL缓存策略目前仅支持单值模式读取hashmap类型数据(即hashName参数不为空,mode参数为空时)。

    • 需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

    cacheSize

    缓存大小。

    Long

    10000

    当选择LRU缓存策略时,需要设置缓存大小。

    cacheTTLMs

    缓存超时时长,单位为毫秒。

    Long

    cacheTTLMs配置和cache有关:

    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。

    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。

    • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。

    cacheEmpty

    是否缓存空结果。

    Boolean

    true

    无。

    cacheReloadTimeBlackList

    更新时间黑名单。在缓存策略选择为ALL时,启用更新时间黑名单,防止在此时间内做Cache更新(例如双11场景)。

    String

    格式为2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情况如下所示:

    • 用英文逗号(,)来分隔多个黑名单。

    • 用箭头(->)来分割黑名单的起始结束时间。

Redis结果表数据结构格式

类型

格式

Redis插入数据的命令

类型

格式

Redis插入数据的命令

STRING类型

DDL为两列:

  • 1列为key,STRING类型。

  • 2列为value,STRING类型。

set key value

LIST类型

DDL为两列:

  • 1列为key,STRING类型。

  • 2列为value,STRING类型。

lpush key value

SET类型

DDL为两列:

  • 1列为key,STRING类型。

  • 2列为value,STRING类型。

sadd key value

HASHMAP类型

默认情况下,DDL为三列:

  • 1列为key,STRING类型。

  • 2列为field,STRING类型。

  • 3列为value,STRING类型。

hmset key field value

flattenHash参数配置为true时,DDL支持多列,以4列的情况为例:

  • 1列为key,STRING类型。

  • 2列的字段名(假设为col1)对应一个field,字段值(假设为value1)对应该fieldvalue,STRING类型。

  • 3列的字段名(假设为col2)对应一个field,字段值(假设为value2)对应该fieldvalue,STRING类型。

  • 4列的字段名(假设为col3)对应一个field,字段值(假设为value3)对应该fieldvalue,STRING类型。

hmset key col1 value1 col2 value2 col3 value3

SORTEDSET类型

DDL为三列:

  • 1列为key,STRING类型。

  • 2列为score,DOUBLE类型。

  • 3列为value,STRING类型。

zadd key score value

类型映射

类型

Redis字段类型

Flink字段类型

类型

Redis字段类型

Flink字段类型

通用

STRING

STRING

结果表独有

SCORE

DOUBLE

说明

因为RedisSCORE类型应用于SORTEDSET(有序集合),所以需要手动为每个Value设置一个DOUBLE类型的SCORE,Value才能按照该SCORE从小到大进行排序。

使用示例

  • 结果表

    • 写入STRING类型数据:在代码示例中,redis_sink结果表中col1列的值会作为key,col2列的值会作为value写入到Redis中。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'STRING',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
    • 单值模式写入HASHMAP类型数据:在代码示例中,redis_sink结果表中的col1列的值会作为key,col2列的值会作为field,col3列的值会作为value写入到Redis中。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
    • 多值模式写入HASHMAP类型数据:在代码示例中,redis_sink结果表中的col1列的值会作为key,col2列的值会作为fieldcol2value,col3列的值会作为fieldcol3value,col4列的值会作为fieldcol4value,写入到Redis中。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'flattenHash' = 'true',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM datagen_source;
  • 维表

    • 读取STRING类型数据:在代码示例中,redis_dim维表中的col1列的值对应key,col2列的值对应value。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col1, t2.col2
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;
    • 单值模式读取HASHMAP类型数据:在代码示例中,hashName参数的值testKeykey,redis_dim维表中的col1列的值对应field,col2列的值对应value。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>',
        'hashName' = 'testkey'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col1, t2.col2
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;
    • 多值模式读取HASHMAP类型数据:在代码示例中,redis_dim维表中的col1列的值对应key,col2列的值对应fieldcol2value,col3列的值对应fieldcol3value,col4列的值对应fieldcol4value。

      CREATE TEMPORARY TABLE datagen_source (
        col1 STRING,
        proctime as PROCTIME()
      ) WITH (
        'connector' = 'datagen',
        'number-of-rows' = '100'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
        PRIMARY KEY (col1) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = '<yourHost>',
        'port' = '<yourPort>',
        'password' = '<yourPassword>',
        'mode' = 'HASHMAP'
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        col1 STRING,
        col2 STRING,
        col3 STRING,
        col4 STRING
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT t1.col1, t2.col2, t2.col3, t2.col4
      FROM datagen_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.col1 = t2.col1;
  • 本页导读 (1)
  • 背景信息
  • 前提条件
  • 使用限制
  • 已知缺陷及解决方案
  • 语法结构
  • WITH参数
  • Redis结果表数据结构格式
  • 类型映射
  • 使用示例
AI助理

点击开启售前

在线咨询服务

你好,我是AI助理

可以解答问题、推荐解决方案等