数据总线
1. 数据集成介绍
数据集成标准化的目标是规范应用之间数据的传递方式和表达方式。
传递方式:即应用之间的数据如何流通。平台提供了对数据进行增删改查的4个API,以及HTTP2方式的消息订阅机制。
表达方式:即应用之间如何对数据内容有一致的理解。为了实现这个目标,需要做到如下两点,一是数据结构需要由小二后台统一管控;二是应用集成对接之前(比如应用上架的时候)需要声明本应用对哪些数据模型产生什么样的数据操作(如查询、新增,或者订阅)。
基于以上逻辑,为了实现应用的数据集成能力,分别需要执行相应的应用声明、应用开发、应用集成。
2. 应用声明
应用的声明是指应用在上架到市场时,由应用开发者自行声明的,包含两部分声明内容:应用涉及到的数据模型;以及对这些模型所做的相应的操作。这里指出应用对数据模型的操作,目的有两个,一是用户可以感知应用对数据的操作范围,二是平台根据声明的操作决定对应用的操作权限。
3. 应用集成
所谓的“数据集成”,是指应用之间数据共享的方式,比如两个应用,当涉及到相同的数据模型时,是共享一份数据,还是独立管理各自的数据。通过应用声明,应用在上架到应用市场之后,用户就能感知应用与数据之间的关系。目前通用的集成方案是以项目为隔离维度的。也就是说,在一个项目内的所有应用,他们关联的相同的数据模型,会被默认放在同一个隔离区域ScopeID内。
4. 应用开发 - 数据操作
应用之间共享数据时,需要向提平台操作数据,或者从平台订阅数据。数据操作方面,这里主要涉及4个接口:新增数据、查询数据、删除数据、修改数据。详见后面的API参考。同时,为了便于应用开发,平台提供了SDK,简化开发。
4.1 SDK介绍
4.1.1 Java SDK
1. 依赖
依赖的三方库:
<dependency>
<groupId>com.aliyun.api.gateway</groupId>
<artifactId>sdk-core-java</artifactId>
<version>1.0.4</version>
</dependency>
依赖的工具类:
git clone https://github.com/aliyun/iotx-api-gateway-client.git
2. 示例代码
import com.alibaba.cloudapi.sdk.model.ApiResponse;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.iotx.api.client.IoTApiClientBuilderParams;
import com.aliyun.iotx.api.client.IoTApiRequest;
import com.aliyun.iotx.api.client.SyncApiClient;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
public class Demo {
// 依赖的工具类从 https://github.com/aliyun/iotx-api-gateway-client 获取
public static void main(String[] args) throws UnsupportedEncodingException{
String appKey = "YOUR_APP_KEY";
String appSecret = "YOUR_APP_SECRET";
String modelId = "YOUR_MODEL_ID";
String dataId = "YOUR_DATA_ID";
dataQuery(appKey, appSecret, modelId, dataId);
}
public static void dataQuery(String appKey, String appSecret,String modelId,String dataId) throws UnsupportedEncodingException {
IoTApiClientBuilderParams ioTApiClientBuilderParams =
new IoTApiClientBuilderParams();
// 填写应用的appkey信息
ioTApiClientBuilderParams.setAppKey(appKey);
ioTApiClientBuilderParams.setAppSecret(appSecret);
SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
IoTApiRequest request = new IoTApiRequest();
// 设置api的版本
request.setApiVer("0.0.5");
// 接口参数
request.putParam("modelId",modelId);
ArrayList returnFields = new ArrayList();
returnFields.add("*");
request.putParam("returnFields", returnFields);
JSONArray conditions = new JSONArray();
JSONObject condition = new JSONObject();
condition.put("fieldName", "id");
condition.put("operate", "eq");
condition.put("value", dataId);
conditions.add(condition);
request.putParam("conditions", conditions);
ApiResponse response = syncClient.postBody("api.link.aliyun.com",
"/data/model/data/query", request, true);
int code = response.getCode();
String res = new String(response.getBody());
System.out.println("response code = " + response.getCode() + " response = " + new String(response.getBody(), "UTF-8"));
}
}
4.1.2 Python SDK
1. 依赖
依赖的三方库:无
2. 示例代码
Step 1: 下载示例工程代码。示例工程基于Python 2.7,Python3请自行适配。
git clone https://github.com/aliyun/api-gateway-demo-sign-python.git
Step 2: 修改signature_composer.py文件第49行
修改前: string_to_sign.append(_build_resource(uri=uri, body=body))
修改后: string_to_sign.append(uri)
Step 3: 修改ClientDemo.py如下,执行调用
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import json
sys_path = sys.path[0]
if not os.path.isdir(sys_path):
sys_path = os.path.dirname(sys_path)
os.chdir(sys_path)
from com.aliyun.api.gateway.sdk import client
from com.aliyun.api.gateway.sdk.http import request
from com.aliyun.api.gateway.sdk.common import constant
host = "https://api.link.aliyun.com"
url = "/data/model/data/query"
cli = client.DefaultClient(app_key="YOUR_APP_KEY", app_secret="YOUR_APP_SECRET")
req_post = request.Request(host=host, protocol=constant.HTTPS, url=url, method="POST", time_out=30000)
req_post.set_content_type(constant.CONTENT_TYPE_STREAM)
body = {
"request":{
"apiVer":"0.0.2"
},
"params":{
"modelId":"EMPLOYEE",
"returnFields":["employee_no", "name"],
"conditions":[{"fieldName":"id","value":"1","operate":"mt"}],
"pageSize": 100,
"pageNum": 1
},
"version":"1.0"
}
bb = json.dumps(body).encode("utf-8")
req_post.set_body(bb)
print '==================================================='
print str(cli.execute(req_post)).decode('string_escape')
print '==================================================='
4.1.3 JS SDK
1. 依赖
$ # save into package.json dependencies with -S
$ npm install aliyun-api-gateway -S
$ # you can use cnpm for fast install
$ cnpm install aliyun-api-gateway -S
2. 示例代码
'use strict';
const co = require('co');
const Client = require('aliyun-api-gateway').Client;
const client = new Client('YOUR_APP_KEY','YOUR_APP_SECRET');
co(function* () {
var url = 'http://api.link.aliyun.com/test1234';
var result = yield client.post(url, {
data: {
'testtest': 'query1Value'
},
headers: {
accept: 'application/json'
}
});
console.log(JSON.stringify(result));
});
4.2 数据操作API
4.2.1 API 描述
API名称 | API描述 | API Path | API 版本 |
新增数据 | 基于已经创建且被授权写入的模型,进行数据的新增。 | /data/model/data/insert | 0.0.3 |
删除数据 | 基于已经创建且被授权删除的模型,进行数据的更新。 | /data/model/data/delete | 0.0.2 |
修改数据 | 基于已经创建且被授权更新的模型,进行数据的更新。 | /data/model/data/update | 0.0.2 |
查询数据 | 基于已经创建且被授权查询的模型,进行数据的查询。 | /data/model/data/query | 0.0.3 |
获取文件上传地址 | 对于模型中指定为“图片”标签的字段,在该字段需要填写文件名,并且文件名需要通过该接口获取,该接口同时还会返回一个URL供用户上传文件。该地址有效期是10秒。 | /data/model/data/upload | 0.0.1 |
获取文件下载地址 | 对于模型中指定为“图片”标签的字段,在该字段的内容是一个系统分配的文件名,用户可以通过本接口,传入这个文件名,获取真实的文件下载地址。该地址有效期是10秒。 | /data/model/data/download | 0.0.1 |
4.2.2 API 约定
日期类型的参数传入格式:当前时间到格林威治时间1970年01月01日00时00分00秒的毫秒数。
数量查询的一些约定:
单次查询最多返回200条数据,未指定分页参数情况下,查询返回满足条件的前200条,可根据返回参数中的hasNext判断是否有更多数据。
若需要按照指定条件返回数据总数,则指定返回参数为COUNT,API返回参数中会带有COUNT以及对应的值。
更新删除的约定:单次操作,最多支持200条数据。
运算符定义:
运算符 | 含义 | 备注 |
eq | equals | 相等 |
neq | not equals | 不相等 |
lt | less than | 小于 |
lteq | less than or equals | 小于等于 |
mt | more than | 大于 |
mteq | more than or equals | 大于等于 |
bt | between | 在..之间 |
in | in | 在..之内 |
nin | not in | 不在..之内 |
nul | is null | 为空 |
nnul | is not null | 不为空 |
数据包括系统属性,API中不允许赋值和更新系统属性,系统属性如下:
属性 | 描述 |
id | 数据主键 |
creator | 数据创建者 |
modifier | 数据修改者 |
gmt_create | 数据创建时间 |
gmt_modified | 数据修改时间 |
4.2.3 数据新增API
1. 请求参数
参数 | 类型 | 描述 | 是否必传 |
modelId | String | 数据模型id | 是 |
properties | JSON | 数据字段键值对,增加的字段的键只能是模型包含的字段,否则会报错,其中 Boolean属性的property传入"true"和"false"或者0和1。如:{"name":"xxx", "age":18} | 是 |
scopeId | String | 经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 | 否 |
appId | String | 对于SaaS应用,需要填该值 | 否 |
2. 返回参数
参数 | 类型 | 描述 |
data | Long | 数据主键id |
3. 请求示例
{
"request": {
"apiVer": "0.0.3" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123", //数据模型id
"scopeId":"fdbsdj1dfjdubgxxx", //数据模型id
"properties":{
"BRAND":"BMW",
"MODE":"5",
"CREATE_DATE":1526969423,
"LONG_SIZE":3.3,
"LONG_SIZE":2.2
}
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200,
"message": "success",
"localizedMsg": null,
"data": 12345
}
5. 返回码
状态码 | 描述 | 其他说明 |
200 | 成功 | |
460 | 参数验证异常 | 会带有验证异常的详细说明 |
500 | 服务异常 | server error |
52002 | 无访问权限 | |
52005 | 找不到目标存储 | |
52009 | 参数和模型定义不匹配 | |
52011 | 数据类型校验错误 |
4.2.4 数据删除API
1. 请求参数
参数 | 类型 | 是否必传 | 描述 |
modelId | String | 是 | 数据模型id |
conditions | JSON | 否 | 数据条件,由字段名、运算符、比较值组成一个condition。如:[{"fieldName": "id", "operate": "eq", "value": 7}] |
appId | String | 否 | 对于SaaS应用,需要填该值 |
scopeId | String | 否 | 经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
其中condition针对不同类型的属性支持的运算符:
属性 | 支持的运算符 |
Integer | eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
String | eq,neq,nnul,in,nin |
Double | eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
Boolean | eq,nnul |
Date | eq,neq,lt,lteq,mt,mteq,bt,nnul |
2. 返回参数
参数 | 类型 | 描述 |
data | Integer | 删除数据的条数 |
3. 请求示例
{
"request": {
"apiVer": "0.1.0" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123",
"conditions":[
{"fieldName":"BRAND","value":"BMW","operate":"eq"},
{"fieldName":"MODE","value":"X5","operate":"eq"}
]
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200,
"message": "success",
"localizedMsg": null,
"data":100
}
5. 返回码
状态码 | 描述 | 说明 |
200 | 成功 | |
460 | 参数验证异常 | 会带有验证异常的详细说明 |
500 | 服务异常 | server error |
52002 | 无访问权限 |
4.2.5 数据修改API
1. 请求参数
参数 | 类型 | 是否必传 | 描述 |
modelId | String | 是 | 数据模型id |
updateDetails | JSON | 是 | 更新的具体字段和值,字段需要是模型中包含的字段,否则会报错,其中 Boolean属性的property传入"true"和"false"或者0和1。如:{"name": "xxxx", "age":20} |
conditions | JSON | 否 | 条件,由字段名、运算符、比较值组成一个condition,格式如下:[{"fieldName": "id", "operate": "eq", "value": 7}]。fieldName表示字段的名称;operate表示操作符,操作符见上表;value表示值。 |
appId | String | 否 | 对于SaaS应用,需要填该值 |
scopeId | String | 否 | 经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
其中condition针对不同类型的属性支持的运算符:
属性 | 支持的运算符 | |
Integer | eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul | |
String | eq,neq,nnul,in,nin | |
Double | eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul | |
Boolean | eq,nnul | |
Date | eq,neq,lt,lteq,mt,mteq,bt,nnul |
2. 返回参数
参数 | 类型 | 描述 |
data | Integer | 更新数据的条数 |
3. 请求示例
{
"request": {
"apiVer": "0.0.3" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123",
"conditions":[
{"fieldName":"BRAND","value":"BMW","operate":"eq"},
{"fieldName":"MODE","value":"X5","operate":"eq"}
],
"updateDetails":{
"LONG_SIZE":4,
"WIDTH_SIZE":3
}
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200,
"message": "success",
"localizedMsg": null,
"data":100
}
5. 返回码
状态码 | 描述 | |
200 | 成功 | |
460 | 参数验证异常 | 会带有验证异常的详细说明 |
500 | 服务异常 | server error |
52002 | 无访问权限 | |
52006 | 资源更新错误 | |
52011 | 数据类型校验错误 |
4.2.6 数据查询API
1. 请求参数
参数 | 类型 | 是否必传 | 说明 |
modelId | String | 是 | 数据模型ID |
returnFields | JSON | 是 | 指定返回的字段。1.若期望返回所有字段,则传入参数为["*"];2.若期望返回数据总数,则传入参数为["COUNT"]。如:["*"]或者["name","age"]["COUNT"] |
conditions | JSON | 否 | 条件,由字段名、运算符、比较值组成一个condition。如:[{"fieldName": "id", "operate": "eq", "value": 7}] |
orderBy | JSON | 否 | 排序条件,由增序或降序以及排序字段组成。如:{"asc":"true", "orderByFields":["name","age"]} |
pageNum | Integer | 否 | 分页页数 |
pageSize | Integer | 否 | 分页每页数量 |
appId | String | 否 | 对于SaaS应用,需要填该值 |
scopeId | String | 否 | 经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
其中condition针对不同类型的属性支持的运算符:
属性 | 支持的运算符 |
Integer | eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
String | eq,neq,nnul,in,nin |
Double | eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
Boolean | eq,nnul |
Date | eq,neq,lt,lteq,mt,mteq,bt,nnul |
2. 返回参数
参数 | 类型 | 描述 |
data | String | 查询返回数据内容,其中,Boolean类型的属性返回值为0或1 |
3. 请求示例-1
{
"request": {
"apiVer": "0.0.2" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123",
"returnFields":["MODE","ENGINE"],
"conditions":[
{"fieldName":"BRAND","value":"BMW","operate":"eq"},
{"fieldName":"WIDTH_SIZE","value":1,"value2":2,"operate":"bt"},
{"fieldName":"LONG_SIZE","value":"3","operate":"mt"}
],
"orderBy":{
"asc":true,
"orderByFields":["CREATE_DATA","MODE"]
},
"pageNum":1,
"pageSize":10
}
},
"version": "1.0" // 请求协议版本
}
4. 返回示例-1
{
"code": 200,
"localizedMsg": null,
"data": "{\"count\":1,
\"hasNext\":false,
\"items\":[{\"gmt_create\":1551872701000,\"MODE\":\"33\",\"ENGINE\":\"44\",\"id\":2,\"gmt_modified\":1551872714000}],
\"pageNum\":1,
\"pageSize\":10}",
"message": "success"
}
5. 请求示例-2
{
"request": {
"apiVer": "0.1.0" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123",
"returnFields":["COUNT",
"conditions":[
{"fieldName":"BRAND","value":"BMW","operate":"eq"},
{"fieldName":"WIDTH_SIZE","value":1,"value2":2,"operate":"bt"},
{"fieldName":"LONG_SIZE","value":"3","operate":"mt"}
],
"pageNum":1,
"pageSize":10
}
},
"version": "1.0" // 请求协议版本
}
6. 返回示例-2
{
"code": 200,
"localizedMsg": null,
"data": "{
\"hasNext\":false,
\"items\":[{
\"COUNT\":100
}]
}"
"message": "success"
}
7. 返回码
状态码 | 描述 | |
200 | 成功 | |
460 | 参数验证异常 | 会带有验证异常的详细说明 |
500 | 服务异常 | server error |
52002 | 无访问权限 | |
52005 | 目标存储未找到 |
4.2.7 获取文件上传地址API
1. 请求参数
参数 | 类型 | 是否必传 | 描述 |
modelId | String | 是 | 数据模型id |
version | String | 是 | 数据模型的版本号 |
fileSize | Integer | 是 | 文件大小,以字节为单位,目前系统不支持5M以上文件 |
attrName | String | 是 | 属性名称,模型中包含的属性名称,不包含会报错进行提示 |
fileType | String | 是 | 文件类型,目前系统只支持bmp、png、gif、jpg |
appId | String | 否 | 对于SaaS应用,需要填该值 |
scopeId | String | 否 | 经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
2. 返回参数
参数 | 类型 | 描述 |
url | String | 数据上传的url,url有效时间10s |
fileName | String | 文件名称,系统随机分配的文件名称,用户在得到这个文件名之后,应该将其放在相应模型数据的文件类型的字段上,例如:xxxxx.jpg |
3. 请求示例
{
"request": {
"apiVer": "0.0.1" // api版本号
},
"id": 1508212818671, // request里的全局唯一id透传
"params": {
"modelId":"XXX123", //数据模型id
"fileSize":"3.2", //文件大小
"attrName":"name", //属性名称
"fileType":"jpg", //文件类型
"version":"1.0", //模型版本
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200, // 返回是否成功,只要不是200说明返回不成功
"message": "success", // 如果失败,会返回失败的信息描述
"localizedMsg": null,
"data": {"url":"http://xxx.xxx.xx","fileName":"xxx.jpg"} // 返回的数据
}
5. 返回码
状态码 | 描述 | |
200 | 成功 | |
460 | 参数验证异常 | 会带有验证异常的详细说明 |
500 | 服务异常 | server error |
52002 | 无访问权限 | |
52005 | 找不到目标存储 | |
52009 | 参数和模型定义不匹配 | |
52011 | 数据类型校验错误 | |
52064 | 属性字段没有相应的图片标签 | |
52063 | 文件大小不能大于5M |
4.2.8 获取文件下载地址API
1. 请求参数
参数 | 类型 | 是否必传 | 描述 |
modelId | String | 是 | 数据模型id |
version | String | 是 | 数据模型的版本号 |
attrName | String | 是 | 属性名称,模型中包含的属性名称,不包含会报错进行提示 |
fileName | String | 是 | 文件名称,必须为获取文件上传地址API中返回的fileName参数,例如:xxxxx.jpg |
appId | String | 否 | 对于SaaS应用,需要填该值 |
scopeId | String | 否 | 经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
2. 返回参数
参数 | 类型 | 描述 |
url | String | 数据上传的url,url有效时间10s |
3. 请求示例
{
"request": {
"apiVer": "0.0.1" // api版本号
},
"id": 1508212818671, // request里的全局唯一id透传
"params": {
"modelId":"XXX123", //数据模型id
"scopeId":"fdbsdj1dfjdubgxxx", //业务隔离id
"attrName":"name", //属性名称
"fileName":"xdsfxv.jpg", //文件类型
"version":"1.0", //模型版本
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200, // 返回是否成功,只要不是200说明返回不成功
"message": "success", // 如果失败,会返回失败的信息描述
"localizedMsg": null,
"data": {"url":"http://xxx.xxx.xx"} // 返回的数据下载查看的url
}
5. 返回码
状态码 | 描述 | |
200 | 成功 | |
460 | 参数验证异常 | 会带有验证异常的详细说明 |
500 | 服务异常 | server error |
52002 | 无访问权限 | |
52005 | 找不到目标存储 | |
52009 | 参数和模型定义不匹配 | |
52011 | 数据类型校验错误 | |
52064 | 属性字段没有相应的图片标签 |
5. 应用开发 - 数据订阅
应用可以通过AMQP方式,订阅数据的变更消息(新增、删除、修改)。应用自身不需要发布数据的变更消息到通道中,这些消息的产生是由应用通过上面的数据操作API,对数据进行操作之后,由平台产生。平台产生时间之后,会将消息通过订阅关系发送到订阅方。在这个消息通信中,每一个应用实例,由AppKey标识身份。对于单租户型的应用,每一个AppKey代表了一次应用分发的实例;但是,对于SaaS应用,他的一个AppKey代表了该应用对应的所有租户的身份,因此,在SaaS应用按照AppKey得到消息之后,需要自行根据订阅到的数据内容中的AppID字段,将数据对应到不用的用户中。
5.1 SDK介绍
5.1.1 Java SDK
1. 依赖引用
在工程中添加 maven 依赖接入 SDK。
<!-- amqp 1.0 qpid client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.40.0</version>
</dependency>
<!-- util for base64-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
2. 身份认证
使用服务端订阅功能,需要基于AppKey进行身份认证并建立连接。该AppKey根据应用类型不同,有两种来源:
对于独立单租户托管并分发的应用,该AppKey是由托管平台在应用实例化分发部署应用时,自动产生,并注入在主机的环境变量中,应用事先并不知道该值,需要动态从环境变量中获取。
对于共享型应用,该AppKey是该SaaS应用在注册多租户接口时,从平台创建,并硬编码在应用中。
public static void main(String[] args) throws Exception {
String clientId = UUID.randomUUID().toString(); //填写唯一的id,比如uuid
String random = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 6);
//按照阿里云IoT规范,组装UserName
String userName = clientId + "|authMode=appkey"
+ ",signMethod=SHA256"
+ ",random=" + random
+ ",appKey=" + "{appkey:纯数字}"
//+ ",iotInstanceId=" + iotInstanceId, //如果是企业实例ID,那么需要补充这个参数。如果是公共实例ID,那么隐藏这个参数
+ ",groupId=" + "{appkey:纯数字}"
+ "|";
//按照阿里云IoT规范,计算签名,组装password
String signContent = "random=" + random;
String password = doSign(signContent, "{app-secret}", "HmacSHA256");
//如果是公共实例,那么使用如下链接地址。如果是企业实例,需要在实例管理中查看amqp链接地址。
String connectionUrl =
"failover:(amqps://{aliyun-uid}.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
+ "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<String, String>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
Connection connection = cf.createConnection(userName, password);
MyJmsConnectionListener myJmsConnectionListener = new MyJmsConnectionListener();
((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()
// Session.AUTO_ACKNOWLEDGE: SDK自动ack(推荐)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(messageListener);
}
/**
* 按照指定签名算法计算password签名
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Hex.encodeHexString(rawHmac);
}
}
注:connectionUrlTemplate 的写法如下所示:
上海:
公共实例:amqps://${uid}.iot-amqp.cn-shanghai.aliyuncs.com:5671
购买的实例:在LP控制台“实例管理”--“查看终端节点”里找到AMQP接入点
3. 设置消息接受接口
使用AMQP方式,建立连接之前,需要提供消息接收接口,用于处理回调的消息,需要通过consumer.setMessageListener(messageListener)来设置,其中messageListener是一个自定义实现MessageListener接口的类,如下所示:
private static MessageListener messageListener = new MessageListener() {
@Override
public void onMessage(final Message message) {
try {
//1.收到消息之后一定要ACK。
// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。
// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。
// message.acknowledge();
//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。
// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。
executorService.submit(new Runnable() {
@Override
public void run() {
processMessage(message);
}
});
} catch (Exception e) {
logger.error("submit task occurs exception ", e);
}
}
};
private static class MyJmsConnectionListener implements JmsConnectionListener() {
/**
* 连接成功建立
* @param remoteURI
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 最终连接失败,尝试过最大重试次数之后
* @param error
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 连接中断
* @param remoteURI
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 连接中断后又自动重连上
* @param remoteURI
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
logger.info("onInboundMessage, {}", envelope);
}
@Override
public void onSessionClosed(Session session, Throwable cause) {
logger.error("onSessionClosed, " + session, cause);
}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
logger.error("onConsumerClosed, " + consumer, cause);
}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {
logger.error("onProducerClosed, " + producer, cause);
}
}
其中的content内容是一个JSON对象字符串,字段信息如下:
// 模型ID
String modelId;
// 变更数据的ID
List<Long> dataIds;
// 操作类型:insert/update/delete
String operateType;
// 订阅的appId, 以appKey授权时为空
String appId;
String scopeId;
// 模型的逻辑隔离id
String logicalModelIsoId;
// 模型实例id
String modelInstanceId;
// 数据实例id
String dataInstanceId;