SQL语法和示例

更新时间:

云消息队列 MQTT 版Kafka规则支持在数据流转过程中通过SQL对数据进行提取、转化和过滤等操作,本文为您介绍SQL语法和示例。

SQL语法

基本格式

合法的SQL规则格式为:

SELECT <字段名> FROM <主题> [WHERE <条件>]
  • SELECT子句用于选出需要保留的字段,或是对数据进行变换。

  • FROM子句用于选择数据源,可以是消息主题或事件主题。

  • WHERE子句用于对结果进行过滤,所用字段可以来自于payload,也可以是其他消息参数。

SQL参数

  • 发送消息云消息队列 MQTT 版发送消息参数如下所示:

    字段

    类型

    描述

    channelId

    String

    TCP连接标识ID

    clientId

    String

    客户端ID

    topic

    String

    发送的MQTT消息topic

    bornTime

    Long

    时间

    payload

    Byte[]

    消息体

    userProps

    String

    5.0协议,用户属性

    msgId

    String

    系统生成的消息唯一ID,不是协议SDK自带的消息ID

  • 事件消息云消息队列 MQTT 版默认事件和可访问参数如下所示:

    事件

    TOPIC

    连接

    $event/connect

    断连

    $event/close

    订阅

    $event/subscribe

    取消订阅

    $event/unsubscribe

    消息确认

    $event/ack

    连接事件

    字段

    类型

    描述

    channelId

    String

    TCP连接标识ID

    clientId

    String

    客户端ID。

    bornTime

    Long

    时间

    protocol

    String

    协议版本:

    • MQTT_5

    • MQTT_3_1_1

    clientIp

    String

    客户端IP

    username

    String

    MQTT连接用户名

    cleanSession

    String

    是否保存离线

    keepAlive

    Integer

    心跳保活时间间隔(秒)

    accepted

    Boolean

    连接是否被接受

    cleanStart

    Boolean

    5.0协议,是否开启新会话

    expiryInterval

    Integer

    5.0协议,会话过期时间

    断连事件

    字段

    类型

    描述

    channelId

    String

    TCP连接标识ID

    clientId

    String

    客户端ID

    bornTime

    Long

    时间

    reason

    String

    断连信息

    订阅事件

    字段

    类型

    描述

    channelId

    String

    TCP连接标识ID

    clientId

    String

    客户端ID

    topic

    String

    订阅的Topic

    bornTime

    Long

    时间

    qos

    Integer

    QoS级别

    取消订阅事件

    字段

    类型

    描述

    channelId

    String

    TCP连接标识ID

    clientId

    String

    客户端ID

    topic

    String

    取消订阅的Topic

    bornTime

    Long

    时间

    消息确认事件

    字段

    类型

    描述

    channelId

    String

    TCP连接标识ID

    clientId

    String

    客户端ID

    topic

    String

    消息Topic

    bornTime

    Long

    时间

    msgId

    String

    系统生成的消息唯一ID,不是协议SDK自带的消息ID

    payload

    Byte[]

    消息体

运算符号

符号名

符号作用

返回值

+

加法

加和

-

减法

差值

*

乘法

乘积

/

除法

商值

||

字符串连接

连接后的字符串

=

比较两者是否相等,可以比较基本类型变量是否相同,也可比较字符串是否完全一致

true/false

[ ]

获取集合类型变量中的值

数组元素,或Map中键值对的值

NOT

对一个 Boolean 类型变量进行取反

true/false

AND

与运算的结果

AND两侧均为true时,返回true;反之返回false

OR

或运算的结果

OR两侧均为false时,返回false;反之返回true

比较符号

符号名

符号作用

返回值

>

大于

true/false

<

小于

true/false

>=

大于等于

true/false

<=

小于等于

true/false

<>

不等于

true/false

IS NULL

判断左侧变量是否为 null

true/false

IS NOT NULL

判断左侧变量是否不为 null

true/false

x BETWEEN y AND z

判断 x 是否大于等于 y 且小于等于 z

true/false

x NOT BETWEEN y AND z

判断 x 是否小于 y 或大于 z

true/false

x LIKE y

判断 x 是否匹配模式 y

true/false

x NOT LIKE y

判断 x 是否不匹配模式 y

true/false

SQL限制

  • 请使用payload字段访问消息载荷。

  • 可以使用半角句号(.)来访问payload内部的嵌套结构。

  • 请使用完整路径名称访问数据,暂不支持使用别名访问数据。例如:SELECT payload as p, p.data as info FROM "myTopic/a",该SQL语句中使用payload的别名p来访问数据。

  • 消息载荷中的字段名、SELECT子句中创建的别名不能与保留关键字相同。如果需要使用保留关键字作为字段名或别名,请将其放置在半角双引号("")内。详情请参见保留关键字

  • SELECT子句后的结果字段之间请用半角逗号(,)分隔,且最后一个字段后面不加半角逗号(,)。例如:SELECT payload as p, clientId as id, qos

  • FROM子句有且只能包含一个数据源,不能在FROM后引用多个数据源。

  • FROM子句后面的数据源需要将其放置在半角双引号("")内。例如:"myTopic/#"

  • SQL语句中除数据源外的字符串值,将其放置在半角单引号('')内。例如: 'Hello'

  • 暂不支持GROUP BYHAVING关键字。

  • SQL语句中包含WHERE子句,且WHERE子句后面的条件没有被满足,则当前消息/事件不会被转发,也不会有任何结果输出。

  • SQL语句中包含中文的字符串,则需要在字符串前加上_utf8。例如,SELECT _utf8'你好' as hello。若消息体或参数中有字段包含中文字符,在访问相关字段时无需添加 _utf8

SQL示例

选择指定主题的消息

应用场景:使用SQL选中特定主题,将命中主题的消息载荷提取出来。允许主题名称中使用通配符。

SQL示例

  • SQL 语句:

    SELECT
     payload as column1
    FROM
     "myTopic/#"
  • 输入消息示例:

    {
      "msg": "myMsg"
    }
  • 处理结果:

    {
      "column1": {
        "msg": "myMsg"
      }
    }

从特定Topic的消息中提取字段

SQL示例

  • Topict/test的消息中提取所有字段:

    SELECT
     *
    FROM
     "t/test"
  • Topict/#的消息中提取所有字段:

    SELECT
     *
    FROM
     "t/#"
  • Topic能够匹配到t/#的消息中提取clientIdtopicbornTime字段:

    SELECT
     clientId, topic, bornTime
    FROM
     "t/#"
  • 从连接事件($event/connect)中获取channelIdclientIp字段:

    SELECT
     channelId, clientIp
    FROM
     "$event/connect"
  • 筛选所有订阅事件($event/subscribe)中Topict/#qos0clientId

    说明

    mqttMatch函数用于校验原Topic是否匹配含通配符的目标Topic。在FROM子句中,可以直接使用含通配符的Topic作为数据源;在FROM子句之外,只能使用mqttMatch函数判断Topic是否与含通配符的目标Topic匹配。当数据源为事件时,若需要使用Topic通配符进行过滤,请在WHERE子句中使用mqttMatch函数。

    SELECT
     clientId
    FROM
     "$event/subscribe"
    WHERE
     mqttMatch(topic, 't/#') and qos = 0

payload中获取嵌套值/使用数组(JSON类型输入)

应用场景:使用SQLJSON类型的嵌套数据中获取值,并利用下标访问数组元素。

SQL示例

  • SQL 语句:

    SELECT
     JSON_VALUE(payload, '$.data[0].age') as age,
     JSON_QUERY(payload, '$.data[1]') as info
    FROM
     "myTopic/#"
    重要
    • 数组的下标从0开始。

    • 需要使用JSON_QUERY函数获取载荷中的非基本类型数据,使用JSON_VALUE函数获取载荷中的基本类型数据。函数第一项参数固定为payload,第二项参数是被半角单引号('')包装的数据路径。

    • JSON_VALUE返回值默认为字符串类型。若需要整数、小数等其它类型,可通过CAST函数进行转换。

  • 输入消息示例:

    {
      "data": [
        { "age": 25, "name": "alice" },
        { "age": 30, "name": "bob" }
      ]
    }
  • 处理结果:

    {
      "age" : "25",
      "info" : {
        "age" : 30,
        "name" : "bob"
      }
    }

函数使用(JSON类型输入)

应用场景:在SQL中使用规则引擎提供的函数进行复杂数据处理,在使用消息载荷数据时需要JSON_VALUE进行提取。更多函数使用方法,请参见SQL函数示例

本示例以输入类型为JSON格式为例进行说明。

SQL示例

  • SQL 语句:

    SELECT
     REVERSE(JSON_VALUE(payload, '$.data[0].name')) as revName,
     sqrt(JSON_VALUE(payload, '$.data[0].age') * 2 + JSON_VALUE(payload, '$.data[1].age')) as sqrtAge
    FROM
     "myTopic/#"
  • 输入消息示例:

    {
      "data": [
        { "age": 25, "name": "alice" },
        { "age": 30, "name": "bob" }
      ]
    }
  • 处理结果:

    {
      "revName" : "ecila",
      "sqrtAge" : 8.94427190999916
    }

保留关键字

保留关键字列表

当字段名或别名与下列保留关键字相同且未在半角双引号("")内时,会导致 SQL 无法正常解析。保留关键字大小写不敏感,即大小写均会被保留。保留关键字列表如下所示:

ABS, ALL, ALLOCATE, ALLOW, ALTER, AND, ANY, ARE, ARRAY, ARRAY_MAX_CARDINALITY, AS, ASENSITIVE, ASOF, ASYMMETRIC, AT, ATOMIC, AUTHORIZATION, AVG, 
BEGIN, BEGIN_FRAME, BEGIN_PARTITION, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BY, CALL, CALLED, CARDINALITY, CASCADED, CASE, CAST, CEIL, 
CEILING, CHAR, CHARACTER, CHARACTER_LENGTH, CHAR_LENGTH, CHECK, CLASSIFIER, CLOB, CLOSE, COALESCE, COLLATE, COLLECT, COLUMN, COMMIT, CONDITION, 
CONNECT, CONSTRAINT, CONTAINS, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, 
CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_ROW, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, 
CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CYCLE, DATE, DATETIME, DAY, DEALLOCATE, DEC, DECIMAL, DECLARE, DEFAULT, DEFINE, DELETE, 
DENSE_RANK, DEREF, DESCRIBE, DETERMINISTIC, DISALLOW, DISCONNECT, DISTINCT, DOUBLE, DROP, DYNAMIC, EACH, ELEMENT, ELSE, EMPTY, END, END-EXEC, 
END_FRAME, END_PARTITION, EQUALS, ESCAPE, EVERY, EXCEPT, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, 
FIRST_VALUE, FLOAT, FLOOR, FOR, FOREIGN, FRAME_ROW, FREE, FRIDAY, FROM, FULL, FUNCTION, FUSION, GET, GLOBAL, GRANT, GROUP, GROUPING, GROUPS, HAVING, 
HOLD, HOUR, IDENTITY, IMPORT, IN, INDICATOR, INITIAL, INNER, INOUT, INSENSITIVE, INSERT, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, IS, 
JOIN, JSON_ARRAY, JSON_ARRAYAGG, JSON_EXISTS, JSON_OBJECT, JSON_OBJECTAGG, JSON_QUERY, JSON_SCOPE, JSON_VALUE, LAG, LANGUAGE, LARGE, LAST_VALUE, 
LATERAL, LEAD, LEADING, LEFT, LIKE, LIKE_REGEX, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOWER, MATCH, MATCHES, MATCH_CONDITION, MATCH_NUMBER, 
MATCH_RECOGNIZE, MAX, MEASURE, MEASURES, MEMBER, MERGE, METHOD, MIN, MINUS, MINUTE, MOD, MODIFIES, MODULE, MONDAY, MONTH, MULTISET, NATIONAL, 
NATURAL, NCHAR, NCLOB, NEW, NEXT, NO, NONE, NORMALIZE, NOT, NTH_VALUE, NTILE, NULL, NULLIF, NUMERIC, OCCURRENCES_REGEX, OCTET_LENGTH, OF, OFFSET, 
OLD, OMIT, ON, ONE, ONLY, OPEN, OR, ORDER, ORDINAL, OUT, OUTER, OVER, OVERLAPS, OVERLAY, PARAMETER, PARTITION, PATTERN, PER, PERCENT, PERCENTILE_CONT, 
PERCENTILE_DISC, PERCENT_RANK, PERIOD, PERMUTE, PORTION, POSITION, POSITION_REGEX, POWER, PRECEDES, PRECISION, PREPARE, PREV, PRIMARY, PROCEDURE, 
QUALIFY, RANGE, RANK, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, 
REGR_SXX, REGR_SXY, REGR_SYY, RELEASE, RESET, RESULT, RETURN, RETURNS, REVOKE, RIGHT, ROLLBACK, ROLLUP, ROW, ROWS, ROW_NUMBER, RUNNING, SAFE_CAST, 
SAFE_OFFSET, SAFE_ORDINAL, SATURDAY, SAVEPOINT, SCOPE, SCROLL, SEARCH, SECOND, SEEK, SELECT, SENSITIVE, SESSION_USER, SET, SHOW, SIMILAR, SKIP, 
SMALLINT, SOME, SPECIFIC, SPECIFICTYPE, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQRT, START, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, SUBMULTISET, 
SUBSET, SUBSTRING, SUBSTRING_REGEX, SUCCEEDS, SUM, SUNDAY, SYMMETRIC, SYSTEM, SYSTEM_TIME, SYSTEM_USER, TABLE, TABLESAMPLE, THEN, THURSDAY, TIME, 
TIMESTAMP, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TRAILING, TRANSLATE, TRANSLATE_REGEX, TRANSLATION, TREAT, TRIGGER, TRIM, TRIM_ARRAY, TRUE, 
TRUNCATE, TRY_CAST, TUESDAY, UESCAPE, UNION, UNIQUE, UNKNOWN, UNNEST, UPDATE, UPPER, UPSERT, USER, USING, UUID, VALUE, VALUES, VALUE_OF, VARBINARY, 
VARCHAR, VARIANT, VARYING, VAR_POP, VAR_SAMP, VERSIONING, WEDNESDAY, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, YEAR

保留关键字作别名或字段名时的使用示例

USER是保留关键字之一,在此以USER为例进行说明。

  • 作为别名使用时的示例:

    SELECT
      payload as "user"
    FROM
      "myTopic/#"
  • 当需要访问的字段名为USER或其小写形式时,正确访问方式示例如下:

    SELECT
      payload."user".age as age
    FROM
      "myTopic/#"