创建源表
流数据分析的源表是指流式数据存储,流式数据存储驱动流数据分析的运行。因此,每个流数据分析任务必须提供至少一个流式数据存储。
创建源表的语法如下所示。
CREATE TABLE tableName
(columnName dataType [, columnName dataType ]*)
| PRIMARY KEY (key_part,...)
[ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
边缘端消息总线(EdgeBus)
示例:
create table property (
propertyName varchar,
propertyValue varchar,
productKey varchar,
deviceName varchar,
gmtCreate varchar,
ts varchar,
tstamp as to_timestamp (cast (ts as bigint)),
WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
) with (
type = 'edgebus',
jsonParser = 'device_property'
);
WITH
参数说明如下:
参数 | 是否必选 | 描述 |
type | 是 | 定义源表类型为edgebus。 |
jsonParser | 否 | 定义消息解析器,默认为default,取值如下:
不同的jsonParser对应不同的字段,详情请见下文字段说明内容。 |
字段说明:
jsonParser = 'device_property'
示例:
create table property ( propertyName varchar, propertyValue varchar, productKey varchar, deviceName varchar, gmtCreate varchar, ts varchar, tstamp as to_timestamp (cast (ts as bigint)), WATERMARK wk FOR tstamp as withOffset (tstamp, 2000) ) with ( type = 'edgebus', jsonParser = 'device_property' );
字段名 类型 描述 productKey String 产品的唯一标识ProductKey。 deviceName String 设备名称,该产品下设备唯一标识。 propertyName String 属性名。 propertyValue String 属性值。 ts String 消息产生时间。 gmtCreate String 流数据分析接收到消息的时间。 jsonParser = 'device_event'
示例:
create table event ( eventCode varchar, params varchar, productKey varchar, deviceName varchar, gmtCreate varchar, ts varchar, tstamp as to_timestamp (cast (ts as bigint)), WATERMARK wk FOR tstamp as withOffset (tstamp, 2000) ) with ( type = 'edgebus', jsonParser = 'device_event' );
字段名 类型 描述 productKey String 产品的唯一标识ProductKey。 deviceName String 设备名称,该产品下设备唯一标识。 eventCode String 事件标识符。 params String 事件参数。 ts String 事件产生的时间。 gmtCreate String 流数据分析收到事件消息的时间。 jsonParser = 'default'
示例:
create table property ( json varchar, jsonType varchar, gmtCreate as to_timestamp ( cast (json_value (json, '$.gmtCreate') as bigint) ), deviceName as json_value (json, '$.deviceName'), productKey as json_value (json, '$.productKey'), ts as to_timestamp ( cast ( json_value (json, '$.items.temperature.time') as bigint ) ), temperature as cast ( json_value (json, '$.items.temperature.value') as int ) ) with ( type = 'edgebus' );
字段名 描述 json 从消息路由获取到的消息内容,该消息内容的格式为字符串型JSON格式,详细示例请参见本文下方JSON格式内容。 jsonType 从消息路由获取到的消息类型。 - device_property
- device_event
JSON格式:
- jsonType = device_property
{ "gmtCreate": 1510292739881, "items": { "attribute_9": { "time": 1510292697471, "value": 560542025 }, "attribute_8": { "time": 1510292697470, "value": 715665571 } }, "productKey": "X5eCzh6fEH7", "deviceName": "xxxxxxxxxxxxxxxx" }
- jsonType = device_event
{ "value": { "BrokenInfo": { "Power": "on", "structParam": { "param1": "abc", "param2": 123 } }, "eventCode2": { } }, "time": 1510799670074, "productKey": "5RS5XTnNADg", "deviceName": "xxxxxxxxxxxxxxxx", "gmtCreate": 1510799670074 }