流数据分析的源表是指流式数据存储,流式数据存储驱动流数据分析的运行。因此,每个流数据分析任务必须提供至少一个流式数据存储。

创建源表的语法如下:

CREATE TABLE tableName
    (columnName dataType [, columnName dataType ]*)
    | PRIMARY KEY (key_part,...)
    [ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];

边缘端消息总线(EdgeBus)

示例:

create table property (
    propertyName varchar,
    propertyValue varchar,
    productKey varchar,
    deviceName varchar,
    gmtCreate varchar,
    ts varchar,
    tstamp as to_timestamp (cast (ts as bigint)),
    WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
) with (
    type = 'edgebus',
    jsonParser = 'device_property'
);

WITH参数说明如下:

参数 是否必选 描述
type 定义源表类型为edgebus。
jsonParser 定义消息解析器,默认为default,取值如下:
  • device_property:属性消息,不符合属性消息格式的消息将会被丢弃
  • device_event:事件消息,不符合事件消息格式的消息将会被丢弃
  • default:不使用解析器,直接使用json格式字符串,不丢弃任何消息

不同的jsonParser对应不同的字段,详情请见下文字段说明内容。

字段说明:

  • jsonParser = 'device_property'

    示例:

    create table property (
        propertyName varchar,
        propertyValue varchar,
        productKey varchar,
        deviceName varchar,
        gmtCreate varchar,
        ts varchar,
        tstamp as to_timestamp (cast (ts as bigint)),
        WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
    ) with (
        type = 'edgebus',
        jsonParser = 'device_property'
    );
    字段名 类型 描述
    productKey String 产品的唯一标识ProductKey。
    deviceName String 设备名称,该产品下设备唯一标识。
    propertyName String 属性名。
    propertyValue String 属性值。
    ts String 消息产生时间。
    gmtCreate String 流数据分析接收到消息的时间。
  • jsonParser = 'device_event'

    示例:

    create table event (
        eventCode varchar,
        params varchar,
        productKey varchar,
        deviceName varchar,
        gmtCreate varchar,
        ts varchar,
        tstamp as to_timestamp (cast (ts as bigint)),
        WATERMARK wk FOR tstamp as withOffset (tstamp, 2000)
    ) with (
        type = 'edgebus',
        jsonParser = 'device_event'
    );
    字段名 类型 描述
    productKey String 产品的唯一标识ProductKey。
    deviceName String 设备名称,该产品下设备唯一标识。
    eventCode String 事件标识符。
    params String 事件参数。
    ts String 事件产生的时间。
    gmtCreate String 流数据分析收到事件消息的时间。
  • jsonParser = 'default'

    示例:

    create table property (
        json varchar,
        jsonType varchar,
        gmtCreate as to_timestamp (
            cast (json_value (json, '$.gmtCreate') as bigint)
        ),
        deviceName as json_value (json, '$.deviceName'),
        productKey as json_value (json, '$.productKey'),
        ts as to_timestamp (
            cast (
                json_value (json, '$.items.temperature.time') as bigint
            )
        ),
        temperature as cast (
            json_value (json, '$.items.temperature.value') as int
        )
    ) with (
        type = 'edgebus'
    );
    字段名 描述
    json 从消息路由获取到的消息内容,该消息内容的格式为字符串型json格式,详细示例请参见本文下方Json格式内容。
    jsonType 从消息路由获取到的消息类型。
    • device_property
    • device_event

    Json格式:

    • jsonType = device_property
      {
          "gmtCreate": 1510292739881,
          "items": {
              "attribute_9": {
                  "time": 1510292697471,
                  "value": 560542025
              },
              "attribute_8": {
                  "time": 1510292697470,
                  "value": 715665571
              }
          },
          "productKey": "X5eCzh6fEH7",
          "deviceName": "xxxxxxxxxxxxxxxx"
      }
    • jsonType = device_event
      {
          "value": {
              "BrokenInfo": {
                  "Power": "on",
                  "structParam": {
                      "param1": "abc",
                      "param2": 123
                  }
              },
              "eventCode2": {
              }
          },
          "time": 1510799670074,
          "productKey": "5RS5XTnNADg",
          "deviceName": "xxxxxxxxxxxxxxxx",
          "gmtCreate": 1510799670074
      }