SQL语法和示例
云消息队列 MQTT 版Kafka规则支持在数据流转过程中通过SQL对数据进行提取、转化和过滤等操作,本文为您介绍SQL语法和示例。
SQL语法
基本格式
合法的SQL规则格式为:
SELECT <字段名> FROM <主题> [WHERE <条件>]
SELECT
子句用于选出需要保留的字段,或是对数据进行变换。FROM
子句用于选择数据源,可以是消息主题或事件主题。WHERE
子句用于对结果进行过滤,所用字段可以来自于payload
,也可以是其他消息参数。
SQL参数
发送消息:云消息队列 MQTT 版发送消息参数如下所示:
字段
类型
描述
channelId
String
TCP连接标识ID
clientId
String
客户端ID
topic
String
发送的MQTT消息topic
bornTime
Long
时间
payload
Byte[]
消息体
userProps
String
5.0协议,用户属性
msgId
String
系统生成的消息唯一ID,不是协议SDK自带的消息ID
事件消息:云消息队列 MQTT 版默认事件和可访问参数如下所示:
事件
TOPIC
连接
$event/connect
断连
$event/close
订阅
$event/subscribe
取消订阅
$event/unsubscribe
消息确认
$event/ack
连接事件
字段
类型
描述
channelId
String
TCP连接标识ID
clientId
String
客户端ID。
bornTime
Long
时间
protocol
String
协议版本:
MQTT_5
MQTT_3_1_1
clientIp
String
客户端IP
username
String
MQTT连接用户名
cleanSession
String
是否保存离线
keepAlive
Integer
心跳保活时间间隔(秒)
accepted
Boolean
连接是否被接受
cleanStart
Boolean
5.0协议,是否开启新会话
expiryInterval
Integer
5.0协议,会话过期时间
断连事件
字段
类型
描述
channelId
String
TCP连接标识ID
clientId
String
客户端ID
bornTime
Long
时间
reason
String
断连信息
订阅事件
字段
类型
描述
channelId
String
TCP连接标识ID
clientId
String
客户端ID
topic
String
订阅的Topic
bornTime
Long
时间
qos
Integer
QoS级别
取消订阅事件
字段
类型
描述
channelId
String
TCP连接标识ID
clientId
String
客户端ID
topic
String
取消订阅的Topic
bornTime
Long
时间
消息确认事件
字段
类型
描述
channelId
String
TCP连接标识ID
clientId
String
客户端ID
topic
String
消息Topic
bornTime
Long
时间
msgId
String
系统生成的消息唯一ID,不是协议SDK自带的消息ID
payload
Byte[]
消息体
运算符号
符号名 | 符号作用 | 返回值 |
+ | 加法 | 加和 |
- | 减法 | 差值 |
* | 乘法 | 乘积 |
/ | 除法 | 商值 |
|| | 字符串连接 | 连接后的字符串 |
= | 比较两者是否相等,可以比较基本类型变量是否相同,也可比较字符串是否完全一致 |
|
[ ] | 获取集合类型变量中的值 | 数组元素,或Map中键值对的值 |
NOT | 对一个 Boolean 类型变量进行取反 |
|
AND | 与运算的结果 | 当 |
OR | 或运算的结果 | 当 |
比较符号
符号名 | 符号作用 | 返回值 |
> | 大于 |
|
< | 小于 |
|
>= | 大于等于 |
|
<= | 小于等于 |
|
<> | 不等于 |
|
IS NULL | 判断左侧变量是否为 null |
|
IS NOT NULL | 判断左侧变量是否不为 null |
|
x BETWEEN y AND z | 判断 x 是否大于等于 y 且小于等于 z |
|
x NOT BETWEEN y AND z | 判断 x 是否小于 y 或大于 z |
|
x LIKE y | 判断 x 是否匹配模式 y |
|
x NOT LIKE y | 判断 x 是否不匹配模式 y |
|
SQL限制
请使用
payload
字段访问消息载荷。可以使用半角句号(.)来访问
payload
内部的嵌套结构。请使用完整路径名称访问数据,暂不支持使用别名访问数据。例如:
SELECT payload as p, p.data as info FROM "myTopic/a"
,该SQL语句中使用payload
的别名p
来访问数据。消息载荷中的字段名、
SELECT
子句中创建的别名不能与保留关键字相同。如果需要使用保留关键字作为字段名或别名,请将其放置在半角双引号("")内。详情请参见保留关键字。SELECT
子句后的结果字段之间请用半角逗号(,)分隔,且最后一个字段后面不加半角逗号(,)。例如:SELECT payload as p, clientId as id, qos
。FROM
子句有且只能包含一个数据源,不能在FROM
后引用多个数据源。FROM
子句后面的数据源需要将其放置在半角双引号("")内。例如:"myTopic/#"
。SQL语句中除数据源外的字符串值,将其放置在半角单引号('')内。例如:
'Hello'
。暂不支持
GROUP BY
、HAVING
关键字。若SQL语句中包含
WHERE
子句,且WHERE
子句后面的条件没有被满足,则当前消息/事件不会被转发,也不会有任何结果输出。若SQL语句中包含中文的字符串,则需要在字符串前加上
_utf8
。例如,SELECT _utf8'你好' as hello
。若消息体或参数中有字段包含中文字符,在访问相关字段时无需添加_utf8
。
SQL示例
选择指定主题的消息
应用场景:使用SQL选中特定主题,将命中主题的消息载荷提取出来。允许主题名称中使用通配符。
从特定Topic的消息中提取字段
从payload中获取嵌套值/使用数组(JSON类型输入)
应用场景:使用SQL从JSON类型的嵌套数据中获取值,并利用下标访问数组元素。
函数使用(JSON类型输入)
应用场景:在SQL中使用规则引擎提供的函数进行复杂数据处理,在使用消息载荷数据时需要JSON_VALUE
进行提取。更多函数使用方法,请参见SQL函数示例。
本示例以输入类型为JSON格式为例进行说明。
保留关键字
保留关键字列表
当字段名或别名与下列保留关键字相同且未在半角双引号("")内时,会导致 SQL 无法正常解析。保留关键字大小写不敏感,即大小写均会被保留。保留关键字列表如下所示:
ABS, ALL, ALLOCATE, ALLOW, ALTER, AND, ANY, ARE, ARRAY, ARRAY_MAX_CARDINALITY, AS, ASENSITIVE, ASOF, ASYMMETRIC, AT, ATOMIC, AUTHORIZATION, AVG,
BEGIN, BEGIN_FRAME, BEGIN_PARTITION, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BY, CALL, CALLED, CARDINALITY, CASCADED, CASE, CAST, CEIL,
CEILING, CHAR, CHARACTER, CHARACTER_LENGTH, CHAR_LENGTH, CHECK, CLASSIFIER, CLOB, CLOSE, COALESCE, COLLATE, COLLECT, COLUMN, COMMIT, CONDITION,
CONNECT, CONSTRAINT, CONTAINS, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG,
CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_ROW, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP,
CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CYCLE, DATE, DATETIME, DAY, DEALLOCATE, DEC, DECIMAL, DECLARE, DEFAULT, DEFINE, DELETE,
DENSE_RANK, DEREF, DESCRIBE, DETERMINISTIC, DISALLOW, DISCONNECT, DISTINCT, DOUBLE, DROP, DYNAMIC, EACH, ELEMENT, ELSE, EMPTY, END, END-EXEC,
END_FRAME, END_PARTITION, EQUALS, ESCAPE, EVERY, EXCEPT, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER,
FIRST_VALUE, FLOAT, FLOOR, FOR, FOREIGN, FRAME_ROW, FREE, FRIDAY, FROM, FULL, FUNCTION, FUSION, GET, GLOBAL, GRANT, GROUP, GROUPING, GROUPS, HAVING,
HOLD, HOUR, IDENTITY, IMPORT, IN, INDICATOR, INITIAL, INNER, INOUT, INSENSITIVE, INSERT, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, IS,
JOIN, JSON_ARRAY, JSON_ARRAYAGG, JSON_EXISTS, JSON_OBJECT, JSON_OBJECTAGG, JSON_QUERY, JSON_SCOPE, JSON_VALUE, LAG, LANGUAGE, LARGE, LAST_VALUE,
LATERAL, LEAD, LEADING, LEFT, LIKE, LIKE_REGEX, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOWER, MATCH, MATCHES, MATCH_CONDITION, MATCH_NUMBER,
MATCH_RECOGNIZE, MAX, MEASURE, MEASURES, MEMBER, MERGE, METHOD, MIN, MINUS, MINUTE, MOD, MODIFIES, MODULE, MONDAY, MONTH, MULTISET, NATIONAL,
NATURAL, NCHAR, NCLOB, NEW, NEXT, NO, NONE, NORMALIZE, NOT, NTH_VALUE, NTILE, NULL, NULLIF, NUMERIC, OCCURRENCES_REGEX, OCTET_LENGTH, OF, OFFSET,
OLD, OMIT, ON, ONE, ONLY, OPEN, OR, ORDER, ORDINAL, OUT, OUTER, OVER, OVERLAPS, OVERLAY, PARAMETER, PARTITION, PATTERN, PER, PERCENT, PERCENTILE_CONT,
PERCENTILE_DISC, PERCENT_RANK, PERIOD, PERMUTE, PORTION, POSITION, POSITION_REGEX, POWER, PRECEDES, PRECISION, PREPARE, PREV, PRIMARY, PROCEDURE,
QUALIFY, RANGE, RANK, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE,
REGR_SXX, REGR_SXY, REGR_SYY, RELEASE, RESET, RESULT, RETURN, RETURNS, REVOKE, RIGHT, ROLLBACK, ROLLUP, ROW, ROWS, ROW_NUMBER, RUNNING, SAFE_CAST,
SAFE_OFFSET, SAFE_ORDINAL, SATURDAY, SAVEPOINT, SCOPE, SCROLL, SEARCH, SECOND, SEEK, SELECT, SENSITIVE, SESSION_USER, SET, SHOW, SIMILAR, SKIP,
SMALLINT, SOME, SPECIFIC, SPECIFICTYPE, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQRT, START, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, SUBMULTISET,
SUBSET, SUBSTRING, SUBSTRING_REGEX, SUCCEEDS, SUM, SUNDAY, SYMMETRIC, SYSTEM, SYSTEM_TIME, SYSTEM_USER, TABLE, TABLESAMPLE, THEN, THURSDAY, TIME,
TIMESTAMP, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TRAILING, TRANSLATE, TRANSLATE_REGEX, TRANSLATION, TREAT, TRIGGER, TRIM, TRIM_ARRAY, TRUE,
TRUNCATE, TRY_CAST, TUESDAY, UESCAPE, UNION, UNIQUE, UNKNOWN, UNNEST, UPDATE, UPPER, UPSERT, USER, USING, UUID, VALUE, VALUES, VALUE_OF, VARBINARY,
VARCHAR, VARIANT, VARYING, VAR_POP, VAR_SAMP, VERSIONING, WEDNESDAY, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, YEAR
保留关键字作别名或字段名时的使用示例
USER
是保留关键字之一,在此以USER
为例进行说明。
作为别名使用时的示例:
SELECT payload as "user" FROM "myTopic/#"
当需要访问的字段名为
USER
或其小写形式时,正确访问方式示例如下:SELECT payload."user".age as age FROM "myTopic/#"