文档

数据总线

更新时间:
一键部署

1. 数据集成介绍

数据集成标准化的目标是规范应用之间数据的传递方式和表达方式。

  • 传递方式:即应用之间的数据如何流通。平台提供了对数据进行增删改查的4个API,以及HTTP2方式的消息订阅机制。

  • 表达方式:即应用之间如何对数据内容有一致的理解。为了实现这个目标,需要做到如下两点,一是数据结构需要由小二后台统一管控;二是应用集成对接之前(比如应用上架的时候)需要声明本应用对哪些数据模型产生什么样的数据操作(如查询、新增,或者订阅)。

基于以上逻辑,为了实现应用的数据集成能力,分别需要执行相应的应用声明、应用开发、应用集成。image.png

2. 应用声明

应用的声明是指应用在上架到市场时,由应用开发者自行声明的,包含两部分声明内容:应用涉及到的数据模型;以及对这些模型所做的相应的操作。这里指出应用对数据模型的操作,目的有两个,一是用户可以感知应用对数据的操作范围,二是平台根据声明的操作决定对应用的操作权限。image.png

3. 应用集成

所谓的“数据集成”,是指应用之间数据共享的方式,比如两个应用,当涉及到相同的数据模型时,是共享一份数据,还是独立管理各自的数据。通过应用声明,应用在上架到应用市场之后,用户就能感知应用与数据之间的关系。目前通用的集成方案是以项目为隔离维度的。也就是说,在一个项目内的所有应用,他们关联的相同的数据模型,会被默认放在同一个隔离区域ScopeID内。

4. 应用开发 - 数据操作

应用之间共享数据时,需要向提平台操作数据,或者从平台订阅数据。数据操作方面,这里主要涉及4个接口:新增数据、查询数据、删除数据、修改数据。详见后面的API参考。同时,为了便于应用开发,平台提供了SDK,简化开发。

4.1 SDK介绍

4.1.1 Java SDK

1. 依赖

依赖的三方库:

<dependency>
  <groupId>com.aliyun.api.gateway</groupId>
  <artifactId>sdk-core-java</artifactId>
  <version>1.0.4</version>
</dependency>

依赖的工具类: 

git clone https://github.com/aliyun/iotx-api-gateway-client.git

2. 示例代码

import com.alibaba.cloudapi.sdk.model.ApiResponse;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.iotx.api.client.IoTApiClientBuilderParams;
import com.aliyun.iotx.api.client.IoTApiRequest;
import com.aliyun.iotx.api.client.SyncApiClient;

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;

public class Demo {

    // 依赖的工具类从 https://github.com/aliyun/iotx-api-gateway-client 获取
    public static void main(String[] args) throws UnsupportedEncodingException{
        String appKey = "YOUR_APP_KEY";
        String appSecret = "YOUR_APP_SECRET";
        String modelId = "YOUR_MODEL_ID";
        String dataId = "YOUR_DATA_ID";

        dataQuery(appKey, appSecret, modelId, dataId);
    }

     public static void dataQuery(String appKey, String appSecret,String modelId,String dataId) throws UnsupportedEncodingException {
         IoTApiClientBuilderParams ioTApiClientBuilderParams =
                 new IoTApiClientBuilderParams();
         // 填写应用的appkey信息
         ioTApiClientBuilderParams.setAppKey(appKey);
         ioTApiClientBuilderParams.setAppSecret(appSecret);
         SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
         IoTApiRequest request = new IoTApiRequest();
         // 设置api的版本
         request.setApiVer("0.0.5");
         // 接口参数
         request.putParam("modelId",modelId);
         ArrayList returnFields = new ArrayList();
         returnFields.add("*");
         request.putParam("returnFields", returnFields);

         JSONArray conditions = new JSONArray();
         JSONObject condition = new JSONObject();
         condition.put("fieldName", "id");
         condition.put("operate", "eq");
         condition.put("value", dataId);
         conditions.add(condition);
         request.putParam("conditions", conditions);

         ApiResponse response = syncClient.postBody("api.link.aliyun.com",
                 "/data/model/data/query", request, true);
         int code = response.getCode();
         String res = new String(response.getBody());
         System.out.println("response code = " + response.getCode() + " response = " + new String(response.getBody(), "UTF-8"));
     }
}

4.1.2 Python SDK

1. 依赖

依赖的三方库:无

2. 示例代码

Step 1: 下载示例工程代码。示例工程基于Python 2.7,Python3请自行适配。

git clone https://github.com/aliyun/api-gateway-demo-sign-python.git

Step 2: 修改signature_composer.py文件第49行

修改前: string_to_sign.append(_build_resource(uri=uri, body=body))
修改后: string_to_sign.append(uri)

Step 3: 修改ClientDemo.py如下,执行调用

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import sys
import json

sys_path = sys.path[0]
if not os.path.isdir(sys_path):
    sys_path = os.path.dirname(sys_path) 
os.chdir(sys_path)

from com.aliyun.api.gateway.sdk import client
from com.aliyun.api.gateway.sdk.http import request
from com.aliyun.api.gateway.sdk.common import constant

host = "https://api.link.aliyun.com"
url = "/data/model/data/query"
cli = client.DefaultClient(app_key="YOUR_APP_KEY", app_secret="YOUR_APP_SECRET")
req_post = request.Request(host=host, protocol=constant.HTTPS, url=url, method="POST", time_out=30000)
req_post.set_content_type(constant.CONTENT_TYPE_STREAM)

body = {
    "request":{
        "apiVer":"0.0.2"
    },
    "params":{
        "modelId":"EMPLOYEE",
        "returnFields":["employee_no", "name"],
        "conditions":[{"fieldName":"id","value":"1","operate":"mt"}],
        "pageSize": 100,
        "pageNum": 1
    },
    "version":"1.0"
}
bb = json.dumps(body).encode("utf-8")

req_post.set_body(bb)

print '==================================================='
print str(cli.execute(req_post)).decode('string_escape')
print '==================================================='

4.1.3 JS SDK

1. 依赖

$ # save into package.json dependencies with -S
$ npm install aliyun-api-gateway -S
$ # you can use cnpm for fast install
$ cnpm install aliyun-api-gateway -S

2. 示例代码

'use strict';
const co = require('co');
const Client = require('aliyun-api-gateway').Client;
const client = new Client('YOUR_APP_KEY','YOUR_APP_SECRET');

co(function* () {
  var url = 'http://api.link.aliyun.com/test1234';

  var result = yield client.post(url, {
    data: {
      'testtest': 'query1Value'
    },
    headers: {
      accept: 'application/json'
    }
  });

  console.log(JSON.stringify(result));
});

4.2 数据操作API

4.2.1 API 描述

API名称

API描述

API Path

API 版本

新增数据

基于已经创建且被授权写入的模型,进行数据的新增。

/data/model/data/insert

0.0.3

删除数据

基于已经创建且被授权删除的模型,进行数据的更新。

/data/model/data/delete

0.0.2

修改数据

基于已经创建且被授权更新的模型,进行数据的更新。

/data/model/data/update

0.0.2

查询数据

基于已经创建且被授权查询的模型,进行数据的查询。

/data/model/data/query

0.0.3

获取文件上传地址

对于模型中指定为“图片”标签的字段,在该字段需要填写文件名,并且文件名需要通过该接口获取,该接口同时还会返回一个URL供用户上传文件。该地址有效期是10秒。

/data/model/data/upload

0.0.1

获取文件下载地址

对于模型中指定为“图片”标签的字段,在该字段的内容是一个系统分配的文件名,用户可以通过本接口,传入这个文件名,获取真实的文件下载地址。该地址有效期是10秒。

/data/model/data/download

0.0.1

4.2.2 API 约定

  1. 日期类型的参数传入格式:当前时间到格林威治时间1970年01月01日00时00分00秒的毫秒数。

  2. 数量查询的一些约定:

    1. 单次查询最多返回200条数据,未指定分页参数情况下,查询返回满足条件的前200条,可根据返回参数中的hasNext判断是否有更多数据。

    2. 若需要按照指定条件返回数据总数,则指定返回参数为COUNT,API返回参数中会带有COUNT以及对应的值。

  3. 更新删除的约定:单次操作,最多支持200条数据。

  4. 运算符定义:

运算符

含义

备注

eq

equals

相等

neq

not equals

不相等

lt

less than

小于

lteq

less than or equals

小于等于

mt

more than

大于

mteq

more than or equals

大于等于

bt

between

在..之间

in

in

在..之内

nin

not in

不在..之内

nul

is null

为空

nnul

is not null

不为空

  1. 数据包括系统属性,API中不允许赋值和更新系统属性,系统属性如下:

属性

描述

id

数据主键

creator

数据创建者

modifier

数据修改者

gmt_create

数据创建时间

gmt_modified

数据修改时间

4.2.3 数据新增API

1. 请求参数

参数

类型

描述

是否必传

modelId

String

数据模型id

properties

JSON

数据字段键值对,增加的字段的键只能是模型包含的字段,否则会报错,其中 Boolean属性的property传入"true"和"false"或者0和1。如:{"name":"xxx", "age":18}

scopeId

String

经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。

appId

String

对于SaaS应用,需要填该值

2. 返回参数

参数

类型

描述

data

Long

数据主键id

3. 请求示例

{
    "request": {
        "apiVer": "0.0.3" // api版本号
    },
    "id": 1508212818676,// request里的全局唯一id透传
    "params": {
       "modelId":"XXX123",  //数据模型id
       "scopeId":"fdbsdj1dfjdubgxxx",  //数据模型id
       "properties":{
            "BRAND":"BMW",
            "MODE":"5",
            "CREATE_DATE":1526969423,
            "LONG_SIZE":3.3,
            "LONG_SIZE":2.2              
        }

    },
    "version": "1.0" // 请求协议版本
}

4. 返回示例

{
  "code": 200,
  "message": "success",
  "localizedMsg": null,
  "data": 12345      
}

5. 返回码

状态码

描述

其他说明

200

成功

460

参数验证异常

会带有验证异常的详细说明

500

服务异常

server error

52002

无访问权限

52005

找不到目标存储

52009

参数和模型定义不匹配

52011

数据类型校验错误

4.2.4 数据删除API

1. 请求参数

参数

类型

是否必传

描述

modelId

String

数据模型id

conditions

JSON

数据条件,由字段名、运算符、比较值组成一个condition。如:[{"fieldName": "id", "operate": "eq", "value": 7}]

appId

String

对于SaaS应用,需要填该值

scopeId

String

经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。

其中condition针对不同类型的属性支持的运算符:

属性

支持的运算符

Integer

eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul

String

eq,neq,nnul,in,nin

Double

eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul

Boolean

eq,nnul

Date

eq,neq,lt,lteq,mt,mteq,bt,nnul

2. 返回参数

参数

类型

描述

data

Integer

删除数据的条数

3. 请求示例

{
    "request": {
        "apiVer": "0.1.0" // api版本号
    },
    "id": 1508212818676,// request里的全局唯一id透传
    "params": {  
       "modelId":"XXX123",
       "conditions":[
          {"fieldName":"BRAND","value":"BMW","operate":"eq"},
          {"fieldName":"MODE","value":"X5","operate":"eq"}
        ]                
    },
    "version": "1.0" // 请求协议版本
}

4. 返回示例

{
  "code": 200,
  "message": "success",
  "localizedMsg": null,
  "data":100   
}

5. 返回码

状态码

描述

说明

200

成功

460

参数验证异常

会带有验证异常的详细说明

500

服务异常

server error

52002

无访问权限

4.2.5 数据修改API

1. 请求参数

参数

类型

是否必传

描述

modelId

String

数据模型id

updateDetails

JSON

更新的具体字段和值,字段需要是模型中包含的字段,否则会报错,其中 Boolean属性的property传入"true"和"false"或者0和1。如:{"name": "xxxx", "age":20}

conditions

JSON

条件,由字段名、运算符、比较值组成一个condition,格式如下:[{"fieldName": "id", "operate": "eq", "value": 7}]。fieldName表示字段的名称;operate表示操作符,操作符见上表;value表示值。

appId

String

对于SaaS应用,需要填该值

scopeId

String

经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。

其中condition针对不同类型的属性支持的运算符:

属性

支持的运算符

Integer

eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul

String

eq,neq,nnul,in,nin

Double

eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul

Boolean

eq,nnul

Date

eq,neq,lt,lteq,mt,mteq,bt,nnul

2. 返回参数

参数

类型

描述

data

Integer

更新数据的条数

3. 请求示例

{
    "request": {
        "apiVer": "0.0.3" // api版本号
    },
    "id": 1508212818676,// request里的全局唯一id透传
    "params": {  
       "modelId":"XXX123",
       "conditions":[
          {"fieldName":"BRAND","value":"BMW","operate":"eq"},
          {"fieldName":"MODE","value":"X5","operate":"eq"}
        ],
       "updateDetails":{
          "LONG_SIZE":4,
          "WIDTH_SIZE":3
        }                     
    },
    "version": "1.0" // 请求协议版本
}

4. 返回示例

{
  "code": 200,
  "message": "success",
  "localizedMsg": null,
  "data":100   
}

5. 返回码

状态码

描述

200

成功

460

参数验证异常

会带有验证异常的详细说明

500

服务异常

server error

52002

无访问权限

52006

资源更新错误

52011

数据类型校验错误

4.2.6 数据查询API

1. 请求参数

参数

类型

是否必传

说明

modelId

String

数据模型ID

returnFields

JSON

指定返回的字段。1.若期望返回所有字段,则传入参数为["*"];2.若期望返回数据总数,则传入参数为["COUNT"]。如:["*"]或者["name","age"]["COUNT"]

conditions

JSON

条件,由字段名、运算符、比较值组成一个condition。如:[{"fieldName": "id", "operate": "eq", "value": 7}]

orderBy

JSON

排序条件,由增序或降序以及排序字段组成。如:{"asc":"true", "orderByFields":["name","age"]}

pageNum

Integer

分页页数

pageSize

Integer

分页每页数量

appId

String

对于SaaS应用,需要填该值

scopeId

String

经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。

其中condition针对不同类型的属性支持的运算符:

属性

支持的运算符

Integer

eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul

String

eq,neq,nnul,in,nin

Double

eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul

Boolean

eq,nnul

Date

eq,neq,lt,lteq,mt,mteq,bt,nnul

2. 返回参数

参数

类型

描述

data

String

查询返回数据内容,其中,Boolean类型的属性返回值为0或1

3. 请求示例-1

{
    "request": {
        "apiVer": "0.0.2" // api版本号
    },
    "id": 1508212818676,// request里的全局唯一id透传
    "params": {        
       "modelId":"XXX123",
       "returnFields":["MODE","ENGINE"],
       "conditions":[
          {"fieldName":"BRAND","value":"BMW","operate":"eq"},
          {"fieldName":"WIDTH_SIZE","value":1,"value2":2,"operate":"bt"},
          {"fieldName":"LONG_SIZE","value":"3","operate":"mt"}
        ],
       "orderBy":{
          "asc":true,
          "orderByFields":["CREATE_DATA","MODE"]
        },
        "pageNum":1,
        "pageSize":10
       }            
    },
    "version": "1.0" // 请求协议版本
}

4. 返回示例-1

{
    "code": 200,
    "localizedMsg": null,
    "data": "{\"count\":1,
                      \"hasNext\":false,
            \"items\":[{\"gmt_create\":1551872701000,\"MODE\":\"33\",\"ENGINE\":\"44\",\"id\":2,\"gmt_modified\":1551872714000}],
            \"pageNum\":1,
            \"pageSize\":10}",
    "message": "success"
}

5. 请求示例-2

{
    "request": {
        "apiVer": "0.1.0" // api版本号
    },
    "id": 1508212818676,// request里的全局唯一id透传
    "params": {        
       "modelId":"XXX123",
       "returnFields":["COUNT",
       "conditions":[
          {"fieldName":"BRAND","value":"BMW","operate":"eq"},
          {"fieldName":"WIDTH_SIZE","value":1,"value2":2,"operate":"bt"},
          {"fieldName":"LONG_SIZE","value":"3","operate":"mt"}
        ],
        "pageNum":1,
        "pageSize":10
       }            
    },
    "version": "1.0" // 请求协议版本
}

6. 返回示例-2

{
    "code": 200,
    "localizedMsg": null,
    "data": "{
                      \"hasNext\":false,
            \"items\":[{
                    \"COUNT\":100
             }]
            }"
    "message": "success"
}

7. 返回码

状态码

描述

200

成功

460

参数验证异常

会带有验证异常的详细说明

500

服务异常

server error

52002

无访问权限

52005

目标存储未找到

4.2.7 获取文件上传地址API

1. 请求参数

参数

类型

是否必传

描述

modelId

String

数据模型id

version

String

数据模型的版本号

fileSize

Integer

文件大小,以字节为单位,目前系统不支持5M以上文件

attrName

String

属性名称,模型中包含的属性名称,不包含会报错进行提示

fileType

String

文件类型,目前系统只支持bmp、png、gif、jpg

appId

String

对于SaaS应用,需要填该值

scopeId

String

经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。

2. 返回参数

参数

类型

描述

url

String

数据上传的url,url有效时间10s

fileName

String

文件名称,系统随机分配的文件名称,用户在得到这个文件名之后,应该将其放在相应模型数据的文件类型的字段上,例如:xxxxx.jpg

3. 请求示例

{
    "request": {
        "apiVer": "0.0.1"   // api版本号
    },
    "id": 1508212818671,    // request里的全局唯一id透传
    "params": {
       "modelId":"XXX123",  //数据模型id
       "fileSize":"3.2",    //文件大小  
       "attrName":"name",   //属性名称
       "fileType":"jpg",    //文件类型 
       "version":"1.0",     //模型版本  
    },
    "version": "1.0"        // 请求协议版本
}

4. 返回示例

{
    "code": 200, // 返回是否成功,只要不是200说明返回不成功
    "message": "success",  // 如果失败,会返回失败的信息描述
    "localizedMsg": null,
    "data": {"url":"http://xxx.xxx.xx","fileName":"xxx.jpg"} // 返回的数据   
}

5. 返回码

状态码

描述

200

成功

460

参数验证异常

会带有验证异常的详细说明

500

服务异常

server error

52002

无访问权限

52005

找不到目标存储

52009

参数和模型定义不匹配

52011

数据类型校验错误

52064

属性字段没有相应的图片标签

52063

文件大小不能大于5M

4.2.8 获取文件下载地址API

1. 请求参数

参数

类型

是否必传

描述

modelId

String

数据模型id

version

String

数据模型的版本号

attrName

String

属性名称,模型中包含的属性名称,不包含会报错进行提示

fileName

String

文件名称,必须为获取文件上传地址API中返回的fileName参数,例如:xxxxx.jpg

appId

String

对于SaaS应用,需要填该值

scopeId

String

经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。

2. 返回参数

参数

类型

描述

url

String

数据上传的url,url有效时间10s

3. 请求示例

{
    "request": {
        "apiVer": "0.0.1" // api版本号
    },
    "id": 1508212818671, // request里的全局唯一id透传
    "params": {
       "modelId":"XXX123",  //数据模型id
       "scopeId":"fdbsdj1dfjdubgxxx",  //业务隔离id 
       "attrName":"name",  //属性名称
       "fileName":"xdsfxv.jpg",  //文件类型 
       "version":"1.0",  //模型版本  
    },
    "version": "1.0" // 请求协议版本
}

4. 返回示例

{
    "code": 200, // 返回是否成功,只要不是200说明返回不成功
    "message": "success",  // 如果失败,会返回失败的信息描述
    "localizedMsg": null,
    "data": {"url":"http://xxx.xxx.xx"} // 返回的数据下载查看的url   
}

5. 返回码

状态码

描述

200

成功

460

参数验证异常

会带有验证异常的详细说明

500

服务异常

server error

52002

无访问权限

52005

找不到目标存储

52009

参数和模型定义不匹配

52011

数据类型校验错误

52064

属性字段没有相应的图片标签

5. 应用开发 - 数据订阅

应用可以通过AMQP方式,订阅数据的变更消息(新增、删除、修改)。应用自身不需要发布数据的变更消息到通道中,这些消息的产生是由应用通过上面的数据操作API,对数据进行操作之后,由平台产生。平台产生时间之后,会将消息通过订阅关系发送到订阅方。在这个消息通信中,每一个应用实例,由AppKey标识身份。对于单租户型的应用,每一个AppKey代表了一次应用分发的实例;但是,对于SaaS应用,他的一个AppKey代表了该应用对应的所有租户的身份,因此,在SaaS应用按照AppKey得到消息之后,需要自行根据订阅到的数据内容中的AppID字段,将数据对应到不用的用户中。

5.1 SDK介绍

5.1.1 Java SDK

1. 依赖引用

在工程中添加 maven 依赖接入 SDK。

<!-- amqp 1.0 qpid client -->
<dependency>
  <groupId>org.apache.qpid</groupId>
  <artifactId>qpid-jms-client</artifactId>
  <version>0.40.0</version>
</dependency>
<!-- util for base64-->
<dependency>
  <groupId>commons-codec</groupId>
  <artifactId>commons-codec</artifactId>
  <version>1.10</version>
</dependency>

2. 身份认证

使用服务端订阅功能,需要基于AppKey进行身份认证并建立连接。该AppKey根据应用类型不同,有两种来源:

  • 对于独立单租户托管并分发的应用,该AppKey是由托管平台在应用实例化分发部署应用时,自动产生,并注入在主机的环境变量中,应用事先并不知道该值,需要动态从环境变量中获取。

  • 对于共享型应用,该AppKey是该SaaS应用在注册多租户接口时,从平台创建,并硬编码在应用中。

public static void main(String[] args) throws Exception {
        String clientId = UUID.randomUUID().toString();  //填写唯一的id,比如uuid
        String random = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 6);

        //按照阿里云IoT规范,组装UserName
        String userName = clientId + "|authMode=appkey"
            + ",signMethod=SHA256"
            + ",random=" + random
            + ",appKey=" + "{appkey:纯数字}"
            //+ ",iotInstanceId=" + iotInstanceId, //如果是企业实例ID,那么需要补充这个参数。如果是公共实例ID,那么隐藏这个参数
            + ",groupId=" + "{appkey:纯数字}"
            + "|";

        //按照阿里云IoT规范,计算签名,组装password
        String signContent = "random=" + random;
        String password = doSign(signContent, "{app-secret}", "HmacSHA256");

        //如果是公共实例,那么使用如下链接地址。如果是企业实例,需要在实例管理中查看amqp链接地址。
        String connectionUrl =
            "failover:(amqps://{aliyun-uid}.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
                + "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";

        Hashtable<String, String> hashtable = new Hashtable<String, String>();
        hashtable.put("connectionfactory.SBCF", connectionUrl);
        hashtable.put("queue.QUEUE", "default");
        hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
        Context context = new InitialContext(hashtable);
        ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
        Destination queue = (Destination)context.lookup("QUEUE");
        Connection connection = cf.createConnection(userName, password);
        MyJmsConnectionListener myJmsConnectionListener = new MyJmsConnectionListener();
        ((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);
        // Create Session
        // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()
        // Session.AUTO_ACKNOWLEDGE: SDK自动ack(推荐)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection.start();
        // Create Receiver Link
        MessageConsumer consumer = session.createConsumer(queue);
  			consumer.setMessageListener(messageListener);
    }

    /**
     * 按照指定签名算法计算password签名
     */
    private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
        SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
        Mac mac = Mac.getInstance(signMethod);
        mac.init(signingKey);
        byte[] rawHmac = mac.doFinal(toSignString.getBytes());
        return Hex.encodeHexString(rawHmac);
    }
}

注:connectionUrlTemplate 的写法如下所示:

上海:
公共实例:amqps://${uid}.iot-amqp.cn-shanghai.aliyuncs.com:5671
购买的实例:在LP控制台“实例管理”--“查看终端节点”里找到AMQP接入点

3. 设置消息接受接口

使用AMQP方式,建立连接之前,需要提供消息接收接口,用于处理回调的消息,需要通过consumer.setMessageListener(messageListener)来设置,其中messageListener是一个自定义实现MessageListener接口的类,如下所示:

private static MessageListener messageListener = new MessageListener() {
    @Override
    public void onMessage(final Message message) {
        try {
            //1.收到消息之后一定要ACK。
            // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
            // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
            // message.acknowledge();
            //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
            // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    processMessage(message);
                }
            });
        } catch (Exception e) {
            logger.error("submit task occurs exception ", e);
        }
    }
};

private static class MyJmsConnectionListener implements JmsConnectionListener() {
    /**
     * 连接成功建立
     * @param remoteURI
     */
    @Override
    public void onConnectionEstablished(URI remoteURI) {
        logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
    }

    /**
     * 最终连接失败,尝试过最大重试次数之后
     * @param error
     */
    @Override
    public void onConnectionFailure(Throwable error) {
        logger.error("onConnectionFailure, {}", error.getMessage());
    }

    /**
     * 连接中断
     * @param remoteURI
     */
    @Override
    public void onConnectionInterrupted(URI remoteURI) {
        logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
    }

    /**
     * 连接中断后又自动重连上
     * @param remoteURI
     */
    @Override
    public void onConnectionRestored(URI remoteURI) {
        logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
    }

    @Override
    public void onInboundMessage(JmsInboundMessageDispatch envelope) {
        logger.info("onInboundMessage, {}", envelope);
    }

    @Override
    public void onSessionClosed(Session session, Throwable cause) {
        logger.error("onSessionClosed, " + session, cause);
    }

    @Override
    public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
        logger.error("onConsumerClosed, " + consumer, cause);
    }

    @Override
    public void onProducerClosed(MessageProducer producer, Throwable cause) {
        logger.error("onProducerClosed, " + producer, cause);
    }
}

其中的content内容是一个JSON对象字符串,字段信息如下:

   // 模型ID
    String modelId;

    // 变更数据的ID
    List<Long> dataIds;

    // 操作类型:insert/update/delete
    String operateType;

    // 订阅的appId, 以appKey授权时为空
    String appId;

    String scopeId;

    // 模型的逻辑隔离id
    String logicalModelIsoId;

    // 模型实例id
    String modelInstanceId;

    //  数据实例id
    String dataInstanceId;