数据总线
1. 数据集成介绍
数据集成标准化的目标是规范应用之间数据的传递方式和表达方式。
传递方式:即应用之间的数据如何流通。平台提供了对数据进行增删改查的4个API,以及HTTP2方式的消息订阅机制。
表达方式:即应用之间如何对数据内容有一致的理解。为了实现这个目标,需要做到如下两点,一是数据结构需要由小二后台统一管控;二是应用集成对接之前(比如应用上架的时候)需要声明本应用对哪些数据模型产生什么样的数据操作(如查询、新增,或者订阅)。
基于以上逻辑,为了实现应用的数据集成能力,分别需要执行相应的应用声明、应用开发、应用集成。
2. 应用声明
应用的声明是指应用在上架到市场时,由应用开发者自行声明的,包含两部分声明内容:应用涉及到的数据模型;以及对这些模型所做的相应的操作。这里指出应用对数据模型的操作,目的有两个,一是用户可以感知应用对数据的操作范围,二是平台根据声明的操作决定对应用的操作权限。
3. 应用集成
所谓的“数据集成”,是指应用之间数据共享的方式,比如两个应用,当涉及到相同的数据模型时,是共享一份数据,还是独立管理各自的数据。通过应用声明,应用在上架到应用市场之后,用户就能感知应用与数据之间的关系。目前通用的集成方案是以项目为隔离维度的。也就是说,在一个项目内的所有应用,他们关联的相同的数据模型,会被默认放在同一个隔离区域ScopeID内。
4. 应用开发 - 数据操作
应用之间共享数据时,需要向提平台操作数据,或者从平台订阅数据。数据操作方面,这里主要涉及4个接口:新增数据、查询数据、删除数据、修改数据。详见后面的API参考。同时,为了便于应用开发,平台提供了SDK,简化开发。
4.1 SDK介绍
4.1.1 Java SDK
1. 依赖
依赖的三方库:
<dependency>
<groupId>com.aliyun.api.gateway</groupId>
<artifactId>sdk-core-java</artifactId>
<version>1.0.4</version>
</dependency>
依赖的工具类:
git clone https://github.com/aliyun/iotx-api-gateway-client.git
2. 示例代码
import com.alibaba.cloudapi.sdk.model.ApiResponse;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.iotx.api.client.IoTApiClientBuilderParams;
import com.aliyun.iotx.api.client.IoTApiRequest;
import com.aliyun.iotx.api.client.SyncApiClient;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
public class Demo {
// 依赖的工具类从 https://github.com/aliyun/iotx-api-gateway-client 获取
public static void main(String[] args) throws UnsupportedEncodingException{
String appKey = "YOUR_APP_KEY";
String appSecret = "YOUR_APP_SECRET";
String modelId = "YOUR_MODEL_ID";
String dataId = "YOUR_DATA_ID";
dataQuery(appKey, appSecret, modelId, dataId);
}
public static void dataQuery(String appKey, String appSecret,String modelId,String dataId) throws UnsupportedEncodingException {
IoTApiClientBuilderParams ioTApiClientBuilderParams =
new IoTApiClientBuilderParams();
// 填写应用的appkey信息
ioTApiClientBuilderParams.setAppKey(appKey);
ioTApiClientBuilderParams.setAppSecret(appSecret);
SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
IoTApiRequest request = new IoTApiRequest();
// 设置api的版本
request.setApiVer("0.0.5");
// 接口参数
request.putParam("modelId",modelId);
ArrayList returnFields = new ArrayList();
returnFields.add("*");
request.putParam("returnFields", returnFields);
JSONArray conditions = new JSONArray();
JSONObject condition = new JSONObject();
condition.put("fieldName", "id");
condition.put("operate", "eq");
condition.put("value", dataId);
conditions.add(condition);
request.putParam("conditions", conditions);
ApiResponse response = syncClient.postBody("api.link.aliyun.com",
"/data/model/data/query", request, true);
int code = response.getCode();
String res = new String(response.getBody());
System.out.println("response code = " + response.getCode() + " response = " + new String(response.getBody(), "UTF-8"));
}
}
4.1.2 Python SDK
1. 依赖
依赖的三方库:无
2. 示例代码
Step 1: 下载示例工程代码。示例工程基于Python 2.7,Python3请自行适配。
git clone https://github.com/aliyun/api-gateway-demo-sign-python.git
Step 2: 修改signature_composer.py文件第49行
修改前: string_to_sign.append(_build_resource(uri=uri, body=body))
修改后: string_to_sign.append(uri)
Step 3: 修改ClientDemo.py如下,执行调用
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import json
sys_path = sys.path[0]
if not os.path.isdir(sys_path):
sys_path = os.path.dirname(sys_path)
os.chdir(sys_path)
from com.aliyun.api.gateway.sdk import client
from com.aliyun.api.gateway.sdk.http import request
from com.aliyun.api.gateway.sdk.common import constant
host = "https://api.link.aliyun.com"
url = "/data/model/data/query"
cli = client.DefaultClient(app_key="YOUR_APP_KEY", app_secret="YOUR_APP_SECRET")
req_post = request.Request(host=host, protocol=constant.HTTPS, url=url, method="POST", time_out=30000)
req_post.set_content_type(constant.CONTENT_TYPE_STREAM)
body = {
"request":{
"apiVer":"0.0.2"
},
"params":{
"modelId":"EMPLOYEE",
"returnFields":["employee_no", "name"],
"conditions":[{"fieldName":"id","value":"1","operate":"mt"}],
"pageSize": 100,
"pageNum": 1
},
"version":"1.0"
}
bb = json.dumps(body).encode("utf-8")
req_post.set_body(bb)
print '==================================================='
print str(cli.execute(req_post)).decode('string_escape')
print '==================================================='
4.1.3 JS SDK
1. 依赖
$ # save into package.json dependencies with -S
$ npm install aliyun-api-gateway -S
$ # you can use cnpm for fast install
$ cnpm install aliyun-api-gateway -S
2. 示例代码
'use strict';
const co = require('co');
const Client = require('aliyun-api-gateway').Client;
const client = new Client('YOUR_APP_KEY','YOUR_APP_SECRET');
co(function* () {
var url = 'http://api.link.aliyun.com/test1234';
var result = yield client.post(url, {
data: {
'testtest': 'query1Value'
},
headers: {
accept: 'application/json'
}
});
console.log(JSON.stringify(result));
});
4.2 数据操作API
4.2.1 API 描述
API名称 |
API描述 |
API Path |
API 版本 |
新增数据 |
基于已经创建且被授权写入的模型,进行数据的新增。 |
/data/model/data/insert |
0.0.3 |
删除数据 |
基于已经创建且被授权删除的模型,进行数据的更新。 |
/data/model/data/delete |
0.0.2 |
修改数据 |
基于已经创建且被授权更新的模型,进行数据的更新。 |
/data/model/data/update |
0.0.2 |
查询数据 |
基于已经创建且被授权查询的模型,进行数据的查询。 |
/data/model/data/query |
0.0.3 |
获取文件上传地址 |
对于模型中指定为“图片”标签的字段,在该字段需要填写文件名,并且文件名需要通过该接口获取,该接口同时还会返回一个URL供用户上传文件。该地址有效期是10秒。 |
/data/model/data/upload |
0.0.1 |
获取文件下载地址 |
对于模型中指定为“图片”标签的字段,在该字段的内容是一个系统分配的文件名,用户可以通过本接口,传入这个文件名,获取真实的文件下载地址。该地址有效期是10秒。 |
/data/model/data/download |
0.0.1 |
4.2.2 API 约定
日期类型的参数传入格式:当前时间到格林威治时间1970年01月01日00时00分00秒的毫秒数。
数量查询的一些约定:
单次查询最多返回200条数据,未指定分页参数情况下,查询返回满足条件的前200条,可根据返回参数中的hasNext判断是否有更多数据。
若需要按照指定条件返回数据总数,则指定返回参数为COUNT,API返回参数中会带有COUNT以及对应的值。
更新删除的约定:单次操作,最多支持200条数据。
运算符定义:
运算符 |
含义 |
备注 |
eq |
equals |
相等 |
neq |
not equals |
不相等 |
lt |
less than |
小于 |
lteq |
less than or equals |
小于等于 |
mt |
more than |
大于 |
mteq |
more than or equals |
大于等于 |
bt |
between |
在..之间 |
in |
in |
在..之内 |
nin |
not in |
不在..之内 |
nul |
is null |
为空 |
nnul |
is not null |
不为空 |
数据包括系统属性,API中不允许赋值和更新系统属性,系统属性如下:
属性 |
描述 |
id |
数据主键 |
creator |
数据创建者 |
modifier |
数据修改者 |
gmt_create |
数据创建时间 |
gmt_modified |
数据修改时间 |
4.2.3 数据新增API
1. 请求参数
参数 |
类型 |
描述 |
是否必传 |
modelId |
String |
数据模型id |
是 |
properties |
JSON |
数据字段键值对,增加的字段的键只能是模型包含的字段,否则会报错,其中 Boolean属性的property传入"true"和"false"或者0和1。如:{"name":"xxx", "age":18} |
是 |
scopeId |
String |
经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
否 |
appId |
String |
对于SaaS应用,需要填该值 |
否 |
2. 返回参数
参数 |
类型 |
描述 |
data |
Long |
数据主键id |
3. 请求示例
{
"request": {
"apiVer": "0.0.3" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123", //数据模型id
"scopeId":"fdbsdj1dfjdubgxxx", //数据模型id
"properties":{
"BRAND":"BMW",
"MODE":"5",
"CREATE_DATE":1526969423,
"LONG_SIZE":3.3,
"LONG_SIZE":2.2
}
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200,
"message": "success",
"localizedMsg": null,
"data": 12345
}
5. 返回码
状态码 |
描述 |
其他说明 |
200 |
成功 |
|
460 |
参数验证异常 |
会带有验证异常的详细说明 |
500 |
服务异常 |
server error |
52002 |
无访问权限 |
|
52005 |
找不到目标存储 |
|
52009 |
参数和模型定义不匹配 |
|
52011 |
数据类型校验错误 |
4.2.4 数据删除API
1. 请求参数
参数 |
类型 |
是否必传 |
描述 |
modelId |
String |
是 |
数据模型id |
conditions |
JSON |
否 |
数据条件,由字段名、运算符、比较值组成一个condition。如:[{"fieldName": "id", "operate": "eq", "value": 7}] |
appId |
String |
否 |
对于SaaS应用,需要填该值 |
scopeId |
String |
否 |
经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
其中condition针对不同类型的属性支持的运算符:
属性 |
支持的运算符 |
Integer |
eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
String |
eq,neq,nnul,in,nin |
Double |
eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
Boolean |
eq,nnul |
Date |
eq,neq,lt,lteq,mt,mteq,bt,nnul |
2. 返回参数
参数 |
类型 |
描述 |
data |
Integer |
删除数据的条数 |
3. 请求示例
{
"request": {
"apiVer": "0.1.0" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123",
"conditions":[
{"fieldName":"BRAND","value":"BMW","operate":"eq"},
{"fieldName":"MODE","value":"X5","operate":"eq"}
]
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200,
"message": "success",
"localizedMsg": null,
"data":100
}
5. 返回码
状态码 |
描述 |
说明 |
200 |
成功 |
|
460 |
参数验证异常 |
会带有验证异常的详细说明 |
500 |
服务异常 |
server error |
52002 |
无访问权限 |
4.2.5 数据修改API
1. 请求参数
参数 |
类型 |
是否必传 |
描述 |
modelId |
String |
是 |
数据模型id |
updateDetails |
JSON |
是 |
更新的具体字段和值,字段需要是模型中包含的字段,否则会报错,其中 Boolean属性的property传入"true"和"false"或者0和1。如:{"name": "xxxx", "age":20} |
conditions |
JSON |
否 |
条件,由字段名、运算符、比较值组成一个condition,格式如下:[{"fieldName": "id", "operate": "eq", "value": 7}]。fieldName表示字段的名称;operate表示操作符,操作符见上表;value表示值。 |
appId |
String |
否 |
对于SaaS应用,需要填该值 |
scopeId |
String |
否 |
经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
其中condition针对不同类型的属性支持的运算符:
属性 |
支持的运算符 |
|
Integer |
eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
|
String |
eq,neq,nnul,in,nin |
|
Double |
eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
|
Boolean |
eq,nnul |
|
Date |
eq,neq,lt,lteq,mt,mteq,bt,nnul |
2. 返回参数
参数 |
类型 |
描述 |
data |
Integer |
更新数据的条数 |
3. 请求示例
{
"request": {
"apiVer": "0.0.3" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123",
"conditions":[
{"fieldName":"BRAND","value":"BMW","operate":"eq"},
{"fieldName":"MODE","value":"X5","operate":"eq"}
],
"updateDetails":{
"LONG_SIZE":4,
"WIDTH_SIZE":3
}
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200,
"message": "success",
"localizedMsg": null,
"data":100
}
5. 返回码
状态码 |
描述 |
|
200 |
成功 |
|
460 |
参数验证异常 |
会带有验证异常的详细说明 |
500 |
服务异常 |
server error |
52002 |
无访问权限 |
|
52006 |
资源更新错误 |
|
52011 |
数据类型校验错误 |
4.2.6 数据查询API
1. 请求参数
参数 |
类型 |
是否必传 |
说明 |
modelId |
String |
是 |
数据模型ID |
returnFields |
JSON |
是 |
指定返回的字段。1.若期望返回所有字段,则传入参数为["*"];2.若期望返回数据总数,则传入参数为["COUNT"]。如:["*"]或者["name","age"]["COUNT"] |
conditions |
JSON |
否 |
条件,由字段名、运算符、比较值组成一个condition。如:[{"fieldName": "id", "operate": "eq", "value": 7}] |
orderBy |
JSON |
否 |
排序条件,由增序或降序以及排序字段组成。如:{"asc":"true", "orderByFields":["name","age"]} |
pageNum |
Integer |
否 |
分页页数 |
pageSize |
Integer |
否 |
分页每页数量 |
appId |
String |
否 |
对于SaaS应用,需要填该值 |
scopeId |
String |
否 |
经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
其中condition针对不同类型的属性支持的运算符:
属性 |
支持的运算符 |
Integer |
eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
String |
eq,neq,nnul,in,nin |
Double |
eq,neq,lt,lteq,mt,mteq,bt,in,nin,nnul |
Boolean |
eq,nnul |
Date |
eq,neq,lt,lteq,mt,mteq,bt,nnul |
2. 返回参数
参数 |
类型 |
描述 |
data |
String |
查询返回数据内容,其中,Boolean类型的属性返回值为0或1 |
3. 请求示例-1
{
"request": {
"apiVer": "0.0.2" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123",
"returnFields":["MODE","ENGINE"],
"conditions":[
{"fieldName":"BRAND","value":"BMW","operate":"eq"},
{"fieldName":"WIDTH_SIZE","value":1,"value2":2,"operate":"bt"},
{"fieldName":"LONG_SIZE","value":"3","operate":"mt"}
],
"orderBy":{
"asc":true,
"orderByFields":["CREATE_DATA","MODE"]
},
"pageNum":1,
"pageSize":10
}
},
"version": "1.0" // 请求协议版本
}
4. 返回示例-1
{
"code": 200,
"localizedMsg": null,
"data": "{\"count\":1,
\"hasNext\":false,
\"items\":[{\"gmt_create\":1551872701000,\"MODE\":\"33\",\"ENGINE\":\"44\",\"id\":2,\"gmt_modified\":1551872714000}],
\"pageNum\":1,
\"pageSize\":10}",
"message": "success"
}
5. 请求示例-2
{
"request": {
"apiVer": "0.1.0" // api版本号
},
"id": 1508212818676,// request里的全局唯一id透传
"params": {
"modelId":"XXX123",
"returnFields":["COUNT",
"conditions":[
{"fieldName":"BRAND","value":"BMW","operate":"eq"},
{"fieldName":"WIDTH_SIZE","value":1,"value2":2,"operate":"bt"},
{"fieldName":"LONG_SIZE","value":"3","operate":"mt"}
],
"pageNum":1,
"pageSize":10
}
},
"version": "1.0" // 请求协议版本
}
6. 返回示例-2
{
"code": 200,
"localizedMsg": null,
"data": "{
\"hasNext\":false,
\"items\":[{
\"COUNT\":100
}]
}"
"message": "success"
}
7. 返回码
状态码 |
描述 |
|
200 |
成功 |
|
460 |
参数验证异常 |
会带有验证异常的详细说明 |
500 |
服务异常 |
server error |
52002 |
无访问权限 |
|
52005 |
目标存储未找到 |
4.2.7 获取文件上传地址API
1. 请求参数
参数 |
类型 |
是否必传 |
描述 |
modelId |
String |
是 |
数据模型id |
version |
String |
是 |
数据模型的版本号 |
fileSize |
Integer |
是 |
文件大小,以字节为单位,目前系统不支持5M以上文件 |
attrName |
String |
是 |
属性名称,模型中包含的属性名称,不包含会报错进行提示 |
fileType |
String |
是 |
文件类型,目前系统只支持bmp、png、gif、jpg |
appId |
String |
否 |
对于SaaS应用,需要填该值 |
scopeId |
String |
否 |
经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
2. 返回参数
参数 |
类型 |
描述 |
url |
String |
数据上传的url,url有效时间10s |
fileName |
String |
文件名称,系统随机分配的文件名称,用户在得到这个文件名之后,应该将其放在相应模型数据的文件类型的字段上,例如:xxxxx.jpg |
3. 请求示例
{
"request": {
"apiVer": "0.0.1" // api版本号
},
"id": 1508212818671, // request里的全局唯一id透传
"params": {
"modelId":"XXX123", //数据模型id
"fileSize":"3.2", //文件大小
"attrName":"name", //属性名称
"fileType":"jpg", //文件类型
"version":"1.0", //模型版本
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200, // 返回是否成功,只要不是200说明返回不成功
"message": "success", // 如果失败,会返回失败的信息描述
"localizedMsg": null,
"data": {"url":"http://xxx.xxx.xx","fileName":"xxx.jpg"} // 返回的数据
}
5. 返回码
状态码 |
描述 |
|
200 |
成功 |
|
460 |
参数验证异常 |
会带有验证异常的详细说明 |
500 |
服务异常 |
server error |
52002 |
无访问权限 |
|
52005 |
找不到目标存储 |
|
52009 |
参数和模型定义不匹配 |
|
52011 |
数据类型校验错误 |
|
52064 |
属性字段没有相应的图片标签 |
|
52063 |
文件大小不能大于5M |
4.2.8 获取文件下载地址API
1. 请求参数
参数 |
类型 |
是否必传 |
描述 |
modelId |
String |
是 |
数据模型id |
version |
String |
是 |
数据模型的版本号 |
attrName |
String |
是 |
属性名称,模型中包含的属性名称,不包含会报错进行提示 |
fileName |
String |
是 |
文件名称,必须为获取文件上传地址API中返回的fileName参数,例如:xxxxx.jpg |
appId |
String |
否 |
对于SaaS应用,需要填该值 |
scopeId |
String |
否 |
经常是项目的id,该参数非必填,一般上架应用被授权之后,会被默认绑定到一个scope中,因此当前操作会被默认操作到被绑定到的这一个scope中。但是,对于集成应用,也有可能被绑定到多个scope中,此时该操作需要填入scopeId。 |
2. 返回参数
参数 |
类型 |
描述 |
url |
String |
数据上传的url,url有效时间10s |
3. 请求示例
{
"request": {
"apiVer": "0.0.1" // api版本号
},
"id": 1508212818671, // request里的全局唯一id透传
"params": {
"modelId":"XXX123", //数据模型id
"scopeId":"fdbsdj1dfjdubgxxx", //业务隔离id
"attrName":"name", //属性名称
"fileName":"xdsfxv.jpg", //文件类型
"version":"1.0", //模型版本
},
"version": "1.0" // 请求协议版本
}
4. 返回示例
{
"code": 200, // 返回是否成功,只要不是200说明返回不成功
"message": "success", // 如果失败,会返回失败的信息描述
"localizedMsg": null,
"data": {"url":"http://xxx.xxx.xx"} // 返回的数据下载查看的url
}
5. 返回码
状态码 |
描述 |
|
200 |
成功 |
|
460 |
参数验证异常 |
会带有验证异常的详细说明 |
500 |
服务异常 |
server error |
52002 |
无访问权限 |
|
52005 |
找不到目标存储 |
|
52009 |
参数和模型定义不匹配 |
|
52011 |
数据类型校验错误 |
|
52064 |
属性字段没有相应的图片标签 |
5. 应用开发 - 数据订阅
应用可以通过AMQP方式,订阅数据的变更消息(新增、删除、修改)。应用自身不需要发布数据的变更消息到通道中,这些消息的产生是由应用通过上面的数据操作API,对数据进行操作之后,由平台产生。平台产生时间之后,会将消息通过订阅关系发送到订阅方。在这个消息通信中,每一个应用实例,由AppKey标识身份。对于单租户型的应用,每一个AppKey代表了一次应用分发的实例;但是,对于SaaS应用,他的一个AppKey代表了该应用对应的所有租户的身份,因此,在SaaS应用按照AppKey得到消息之后,需要自行根据订阅到的数据内容中的AppID字段,将数据对应到不用的用户中。
5.1 SDK介绍
5.1.1 Java SDK
1. 依赖引用
在工程中添加 maven 依赖接入 SDK。
<!-- amqp 1.0 qpid client -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.40.0</version>
</dependency>
<!-- util for base64-->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
2. 身份认证
使用服务端订阅功能,需要基于AppKey进行身份认证并建立连接。该AppKey根据应用类型不同,有两种来源:
对于独立单租户托管并分发的应用,该AppKey是由托管平台在应用实例化分发部署应用时,自动产生,并注入在主机的环境变量中,应用事先并不知道该值,需要动态从环境变量中获取。
对于共享型应用,该AppKey是该SaaS应用在注册多租户接口时,从平台创建,并硬编码在应用中。
public static void main(String[] args) throws Exception {
String clientId = UUID.randomUUID().toString(); //填写唯一的id,比如uuid
String random = UUID.randomUUID().toString().replaceAll("-", "").substring(0, 6);
//按照阿里云IoT规范,组装UserName
String userName = clientId + "|authMode=appkey"
+ ",signMethod=SHA256"
+ ",random=" + random
+ ",appKey=" + "{appkey:纯数字}"
//+ ",iotInstanceId=" + iotInstanceId, //如果是企业实例ID,那么需要补充这个参数。如果是公共实例ID,那么隐藏这个参数
+ ",groupId=" + "{appkey:纯数字}"
+ "|";
//按照阿里云IoT规范,计算签名,组装password
String signContent = "random=" + random;
String password = doSign(signContent, "{app-secret}", "HmacSHA256");
//如果是公共实例,那么使用如下链接地址。如果是企业实例,需要在实例管理中查看amqp链接地址。
String connectionUrl =
"failover:(amqps://{aliyun-uid}.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"
+ "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30";
Hashtable<String, String> hashtable = new Hashtable<String, String>();
hashtable.put("connectionfactory.SBCF", connectionUrl);
hashtable.put("queue.QUEUE", "default");
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
Context context = new InitialContext(hashtable);
ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");
Destination queue = (Destination)context.lookup("QUEUE");
Connection connection = cf.createConnection(userName, password);
MyJmsConnectionListener myJmsConnectionListener = new MyJmsConnectionListener();
((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);
// Create Session
// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()
// Session.AUTO_ACKNOWLEDGE: SDK自动ack(推荐)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
// Create Receiver Link
MessageConsumer consumer = session.createConsumer(queue);
}
/**
* 按照指定签名算法计算password签名
*/
private static String doSign(String toSignString, String secret, String signMethod) throws Exception {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);
Mac mac = Mac.getInstance(signMethod);
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(toSignString.getBytes());
return Hex.encodeHexString(rawHmac);
}
}
注:connectionUrlTemplate 的写法如下所示:
上海:
公共实例:amqps://${uid}.iot-amqp.cn-shanghai.aliyuncs.com:5671
购买的实例:在LP控制台“实例管理”--“查看终端节点”里找到AMQP接入点
3. 设置消息接受接口
使用AMQP方式,建立连接之前,需要提供消息接收接口,用于处理回调的消息,需要通过 ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener)来设置,其中 myJmsConnectionListener是一个自定义实现JmsConnectionListener接口的类,如下所示:
class MyJmsConnectionListener impJmsConnectionListener() {
/**
* 连接成功建立
* @param remoteURI
*/
@Override
public void onConnectionEstablished(URI remoteURI) {
logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);
}
/**
* 最终连接失败,尝试过最大重试次数之后
* @param error
*/
@Override
public void onConnectionFailure(Throwable error) {
logger.error("onConnectionFailure, {}", error.getMessage());
}
/**
* 连接中断
* @param remoteURI
*/
@Override
public void onConnectionInterrupted(URI remoteURI) {
logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);
}
/**
* 连接中断后又自动重连上
* @param remoteURI
*/
@Override
public void onConnectionRestored(URI remoteURI) {
logger.info("onConnectionRestored, remoteUri:{}", remoteURI);
}
@Override
public void onInboundMessage(JmsInboundMessageDispatch envelope) {
logger.info("onInboundMessage, {}", envelope);
JmsMessage message = envelope.getMessage();
try {
byte[] body = message.getBody(byte[].class);
String content = new String(body);
String topic = message.getStringProperty("topic");
String messageId = message.getStringProperty("messageId");
logger.info("receive message"
+ ", topic = " + topic
+ ", messageId = " + messageId
+ ", content = " + content);
//如果创建Session选择的是Session.CLIENT_ACKNOWLEDGE,这里需要手动ack
//Thread.sleep(2000);
message.acknowledge();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onSessionClosed(Session session, Throwable cause) {
logger.error("onSessionClosed, " + session, cause);
}
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {
logger.error("onConsumerClosed, " + consumer, cause);
}
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {
logger.error("onProducerClosed, " + producer, cause);
}
}
其中的content内容是一个JSON对象字符串,字段信息如下:
// 模型ID
String modelId;
// 变更数据的ID
List<Long> dataIds;
// 操作类型:insert/update/delete
String operateType;
// 订阅的appId, 以appKey授权时为空
String appId;
String scopeId;
// 模型的逻辑隔离id
String logicalModelIsoId;
// 模型实例id
String modelInstanceId;
// 数据实例id
String dataInstanceId;