边缘应用数据总线对接
1.整体介绍
平台提供了对数据进行增删改查的4个API接口。如果两个应用,涉及到相同的数据模型时,是共享一份数据,还是独立管理各自的数据。通过应用声明,应用在上架到应用市场之后,用户就能感知应用与数据之间的关系。目前通用的集成方案是以项目为隔离维度的。也就是说,在一个项目内的所有应用,他们关联的相同的数据模型,会被默认放在同一个隔离区域ScopeID内。
2.数据操作API文档
2.1 新增数据接口
接口描述
API版本  | 0.0.3  | 
授权类型  | APPSIGN  | 
协议  | HTTP  | 
请求方法  | Post  | 
域名(环境变量中获取)  | System.getenv("iot.hosting.api.domain")  | 
路径  | /data/model/data/insert  | 
入参说明
入参名称  | 数据类型  | 是否必须  | 入参示例  | 入参描述  | 
modelId  | 字符串  | 是  | test_model1  | 模型id  | 
scopeId  | 字符串  | 否  | 1C35315598694F  | 业务隔离id  | 
appId  | 字符串  | 否  | AADB1F0EBD0411B9  | 应用id  | 
properties  | 复杂对象  | 是  | {"name":"xxx", "age":18}  | 模型对应的字段名称以及字段值的JSON数据  | 
出参列表
出参名称  | 数据类型  | 出参描述  | 
code  | 整型  | 响应码, 200: 成功  | 
message  | 字符串  | 错误消息  | 
localizedMsg  | 字符串  | 本地语言错误消息  | 
data  | 长整型  | 响应结果,返回的长整型数据是增加的数据的id  | 
请求示例
/**
 * 系统环境变量中获取的
 */
public static final String appkey = System.getenv("iot.hosting.appKey");
public static final String appSecret = System.getenv("iot.hosting.appSecret");
//边缘端数据模型服务路由
private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
public static void main(String[] args) throws UnsupportedEncodingException {
    IoTApiClientBuilderParams ioTApiClientBuilderParams =
      new IoTApiClientBuilderParams();
    ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
    ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
    SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
    IoTApiRequest request = new IoTApiRequest();
    //设置api的版本
    request.setApiVer("0.0.3");
    //如果需要登录,设置当前的会话的token
    request.setIotToken("xxxxxxxxxxxxxxx");
    // 接口参数
    JSONObject properties = new JSONObject();
    request.putParam("modelId","value1");
    request.putParam("scopeId","value2");
    request.putParam("appId","value3");
    request.putParam("properties",properties);
    //请求参数域名、path、request
    ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
      "/data/model/data/insert", request, true);
    System.out.println( "response code = " + response.getCode()
      + " response = " + new String(response.getBody(), "UTF-8"));
}返回结果示例 JSON
{   
    "id": "3826f64d-6c7b-4d21-8214-8b23771b763a",
    "code": 200,
    "localizedMsg": null,
    "message": null,
    "data": 791
}失败返回结果示例 JSON
{
    "id": "f561d973-9094-479f-81fd-95ec3e7271f5",
    "code": 52009,
    "localizedMsg": "传入的参数和模型字段不匹配:name1",
    "message": "传入的参数和模型字段不匹配:name1",
    "data": null
}2.2 删除数据接口
接口描述
API版本  | 0.0.2  | 
授权类型  | APPSIGN  | 
协议  | HTTP  | 
请求方法  | Post  | 
域名(环境变量中获取)  | System.getenv("iot.hosting.api.domain")  | 
路径  | /data/model/data/delete  | 
入参说明
入参名称  | 数据类型  | 是否必须  | 入参示例  | 入参描述  | 
modelId  | 字符串  | 是  | test_model1  | 模型id  | 
scopeId  | 字符串  | 否  | 1C35315598694F  | 业务隔离id  | 
appId  | 字符串  | 否  | AADB1F0EBD0411B9  | 应用id  | 
conditions  | 复杂对象数组  | 是  | [{"fieldName": "id", "operate": "eq", "value": 7}]  | 可以输入多个条件 fieldName 字段名称 operate 操作符 value 字段值  | 
出参列表
出参名称  | 数据类型  | 出参描述  | 
code  | 整型  | 响应码, 200: 成功  | 
message  | 字符串  | 错误消息  | 
localizedMsg  | 字符串  | 本地语言错误消息  | 
data  | 长整型  | 响应结果返回的长整型数据是增加的数据的id  | 
请求示例
/**
 * 系统环境变量中获取的
 */
public static final String appkey = System.getenv("iot.hosting.appKey");
public static final String appSecret = System.getenv("iot.hosting.appSecret");
//边缘端数据模型服务路由
private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
public static void main(String[] args) throws UnsupportedEncodingException {
    IoTApiClientBuilderParams ioTApiClientBuilderParams =
      new IoTApiClientBuilderParams();
    ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
    ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
    SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
    IoTApiRequest request = new IoTApiRequest();
    //设置api的版本
    request.setApiVer("0.0.2");
    //如果需要登录,设置当前的会话的token
    request.setIotToken("xxxxxxxxxxxxxxx");
    // 接口参数
    request.putParam("modelId","value1");
    request.putParam("scopeId","value2");
    request.putParam("appId","value3");
    request.putParam("conditions", JSON.parseArray("[{\"fieldName\": \"id\",\"operate\": \"eq\",\"value\": 1862}]"));
    //请求参数域名、path、request
    ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
      "/data/model/data/delete", request, true);
    System.out.println( "response code = " + response.getCode()
      + " response = " + new String(response.getBody(), "UTF-8"));
}返回结果示例 JSON
{
    "code": 200,
    "data": 2,
    "message": "success"
}失败返回结果示例 JSON
{
    "code": 500,
    "data": null,
    "message": "系统错误.删除数据异常"
}2.3 修改数据接口
接口描述
API版本  | 0.0.2  | 
授权类型  | APPSIGN  | 
协议  | HTTP  | 
请求方法  | Post  | 
域名(环境变量中获取)  | System.getenv("iot.hosting.api.domain")  | 
路径  | /data/model/data/update  | 
入参说明
入参名称  | 数据类型  | 是否必须  | 入参示例  | 入参描述  | 
modelId  | 字符串  | 是  | test_model1  | 模型id  | 
scopeId  | 字符串  | 否  | 1C35315598694F  | 业务隔离id  | 
appId  | 字符串  | 否  | AADB1F0EBD0411B9  | 应用id  | 
conditions  | 复杂对象数组  | 否  | [{"fieldName": "id", "operate": "eq", "value": 7}]  | 可以输入多个条件 fieldName 字段名称 operate 操作符 value 字段值  | 
updateDetails  | 复杂对象  | 是  | {"name": "demo", "age":20}  | 模型字段名称与字段值的一个JSON数据  | 
出参列表
出参名称  | 数据类型  | 出参描述  | 
code  | 整型  | 响应码, 200: 成功  | 
message  | 字符串  | 错误消息  | 
localizedMsg  | 字符串  | 本地语言错误消息  | 
data  | 长整型  | 响应结果返回的长整型数据是增加的数据的id  | 
请求示例
/**
 * 系统环境变量中获取的
 */
public static final String appkey = System.getenv("iot.hosting.appKey");
public static final String appSecret = System.getenv("iot.hosting.appSecret");
//边缘端数据模型服务路由
private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
public static void main(String[] args) throws UnsupportedEncodingException {
    IoTApiClientBuilderParams ioTApiClientBuilderParams =
      new IoTApiClientBuilderParams();
    ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
    ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
    SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
    IoTApiRequest request = new IoTApiRequest();
    //设置api的版本
    request.setApiVer("0.0.2");
    //如果需要登录,设置当前的会话的token
    request.setIotToken("xxxxxxxxxxxxxxx");
    // 接口参数
    request.putParam("modelId","value1");
    request.putParam("scopeId","value2");
    request.putParam("appId","value3");
    JSONObject updateDetails = new JSONObject();
    updateDetails.put("key",value);
    request.putParam("updateDetails", updateDetails);
    //请求参数域名、path、request
    ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
      "/data/model/data/update", request, true);
    System.out.println( "response code = " + response.getCode()
      + " response = " + new String(response.getBody(), "UTF-8"));
}返回结果示例 JSON
{
    "code": 200,
    "data": 3,
    "message": "success"
}失败返回结果示例 JSON
{
    "code": 500,
    "data": null
    "message": "系统错误.删除数据异常"
}2.4 查询数据接口
接口描述
API版本  | 0.0.3  | 
授权类型  | APPSIGN  | 
协议  | HTTP  | 
请求方法  | Post  | 
域名(环境变量中获取)  | System.getenv("iot.hosting.api.domain")  | 
路径  | /data/model/data/query  | 
入参说明
入参名称  | 数据类型  | 是否必须  | 入参示例  | 入参描述  | 
modelId  | 字符串  | 是  | test_model1  | 模型id  | 
scopeId  | 字符串  | 否  | 1C35315598694F  | 业务隔离id  | 
appId  | 字符串  | 否  | AADB1F0EBD0411B9  | 应用id  | 
pageSize  | 整型  | 是  | 缺省默认值20  | 页大小,最大200  | 
pageNum  | 整型  | 是  | 缺省默认值1  | 页码  | 
conditions  | 复杂对象数组  | 否  | [{"fieldName": "id", "operate": "比较符", "value": 7}]  | 可以输入多个条件 fieldName 字段名称 operate 操作符 value 字段值,不同类型比较符参考下述列表  | 
returnFields  | 复杂对象数组  | 是  | ["*"]或者["name", "age"]  | 1.若期望返回所有字段,则传入参数为["*"];2.若期望返回模型的某几个字段,输入字段的名称  | 
orderBy  | 复杂对象  | 否  | {"asc": "true", "orderByFields": ["name", "age"]}  | asc=true为升序,需要模型的哪个字段进行排序,依次传入模型的字段名称  | 
比较符参考表
operate  | 对应数据类型  | 
eq  | boolean、String、Date、Integer、Double  | 
neq  | String  | 
in  | String、Integer、Double  | 
nin  | String、Integer、Double  | 
It  | Date、Integer、Double  | 
Iteq  | Date、Integer、Double  | 
mt  | Date、Integer、Double  | 
mteq  | Date、Integer、Double  | 
bt  | Date、Integer、Double  | 
出参列表
出参名称  | 数据类型  | 出参描述  | 
code  | 整型  | 响应码, 200: 成功  | 
message  | 字符串  | 错误消息  | 
localizedMsg  | 字符串  | 本地语言错误消息  | 
data  | 长整型  | data格式示列 {\"count\":1,\"hasNext\":false,\"items\":[{\"gmt_create\":1551872701000,\"name\":\"33\",\"id\":2,\"gmt_modified\":1551872714000}],\"pageNum\":1,\"pageSize\":10}包含页大小pageSize,当前页码pageNum,数据总数count以及数据的集合items还有是否还有下一页hasNext,其中items中返回的参数就是创建模型时定义的模型的字段,以及字段对应的值  | 
请求示例
/**
 * 系统环境变量中获取的
 */
public static final String appkey = System.getenv("iot.hosting.appKey");
public static final String appSecret = System.getenv("iot.hosting.appSecret");
//边缘端数据模型服务路由
private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
public static void main(String[] args) throws UnsupportedEncodingException {
    IoTApiClientBuilderParams ioTApiClientBuilderParams =
      new IoTApiClientBuilderParams();
    ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
    ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
    SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
    IoTApiRequest request = new IoTApiRequest();
    //设置api的版本
    request.setApiVer("0.0.2");
    // 接口参数
    request.putParam("modelId","value1");
    request.putParam("scopeId","value2");
    request.putParam("appId","value3");
    List<String> returnFields = Lists.newArrayList("sceneId", "userId", "ctrlColumn  ", "condition","val");
    request.putParam("returnFields", returnFields);
    request.putParam("conditions", JSON.parseArray("[{\"fieldName\": \"id\",\"operate\": \"eq\",\"value\": 1862}]"));
    //请求参数域名、path、request
    ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
      "/data/model/data/query", request, true);
    System.out.println( "response code = " + response.getCode()
      + " response = " + new String(response.getBody(), "UTF-8"));
}返回结果示例 JSON
{
    "id": "4636f64d-5c8b-4d21-8214-8b237715721c",
    "code": 200,
    "message": null,
    "localizedMsg": null,
    "data": "{\"count\":1,\"hasNext\":false,\"items\":[{\"gmt_create\":1551872701000,\"name\":\"33\",\"id\":2,\"gmt_modified\":1551872714000}],\"pageNum\":1,\"pageSize\":10}"
}失败返回结果示例 JSON
{
    "id": "f678d543-5436-897f-76fd-45ec3e1243a9",
    "code": 52009,
    "localizedMsg": "传入的参数和模型字段不匹配:name1",
    "message": "传入的参数和模型字段不匹配:name1",
    "data": null
}2.5 获取文件上传地址接口
接口描述
API版本  | 0.0.1  | 
授权类型  | APPSIGN  | 
协议  | HTTP  | 
请求方法  | Post  | 
域名(环境变量中获取)  | System.getenv("iot.hosting.api.domain")  | 
路径  | /data/model/data/upload  | 
入参说明
入参名称  | 数据类型  | 是否必须  | 入参示例  | 入参描述  | 
modelId  | 字符串  | 是  | test_model1  | 模型id  | 
scopeId  | 字符串  | 否  | 1C35315598694F  | 业务隔离id  | 
appId  | 字符串  | 否  | AADB1F0EBD0411B9  | 应用id  | 
fileSize  | 整型  | 是  | 4096  | 文件大小,以字节为单位,目前系统不支持5M以上文件  | 
attrName  | 字符串  | 是  | name  | 属性名称,模型中包含的属性名称,不包含会报错进行提示  | 
fileType  | 字符串  | 是  | jpg  | 文件类型,目前系统只支持bmp,png,gif,jpg  | 
version  | 字符串  | 否  | 1.0  | 模型版本  | 
出参列表
出参名称  | 数据类型  | 出参描述  | 
code  | 整型  | 响应码, 200: 成功  | 
message  | 字符串  | 错误消息  | 
localizedMsg  | 字符串  | 本地语言错误消息  | 
data  | 长整型  | 响应结果  | 
请求示例
/**
 * 系统环境变量中获取的
 */
public static final String appkey = System.getenv("iot.hosting.appKey");
public static final String appSecret = System.getenv("iot.hosting.appSecret");
//边缘端数据模型服务路由
private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
public static void main(String[] args) throws UnsupportedEncodingException {
    IoTApiClientBuilderParams ioTApiClientBuilderParams =
      new IoTApiClientBuilderParams();
    ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
    ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
    SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
    IoTApiRequest request = new IoTApiRequest();
    //设置api的版本
    request.setApiVer("0.0.2");
    // 接口参数
    request.putParam("scopeId","value1");
    request.putParam("appId","value2");
    request.putParam("modelId","value3");
    request.putParam("fileSize","value4");
    request.putParam("attrName","value5");
    request.putParam("fileType","value6");
    request.putParam("version","value7");
    ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
      "/data/model/data/upload", request, true);
    System.out.println( "response code = " + response.getCode()
      + " response = " + new String(response.getBody(), "UTF-8"));
}返回结果示例 JSON
{
    "id": "6fr2c332-c1db-417c-aa15-8c5trg3r5d92",
    "code": 200,
    "message": null,
    "localizedMsg": null,
    "data": {
        "fileName": "5269712352e5.jpg",
        "url": "https://*****.example.com/***/file/5269712352e5.jpg?Expires=1557902379&OSSAccessKeyId=uyedjYL******&Signature=sotMFFIq4RP%2BWJSDScE8SxvO******"
    }
}失败返回结果示例 JSON
{
    "id": "f561d973-9094-479f-81fd-95ec3e7271f5",
    "code": 52002,
    "localizedMsg": "没有字段name1的访问权限",
    "message": "没有字段name1的访问权限",
    "data": null
}2.6 获取文件下载地址接口
接口描述
API版本  | 0.0.1  | 
授权类型  | APPSIGN  | 
协议  | HTTP  | 
请求方法  | Post  | 
域名(环境变量中获取)  | System.getenv("iot.hosting.api.domain")  | 
路径  | /data/model/data/download  | 
入参说明
入参名称  | 数据类型  | 是否必须  | 入参示例  | 入参描述  | 
modelId  | 字符串  | 是  | test_model1  | 模型id  | 
scopeId  | 字符串  | 否  | 1C35315598694F  | 业务隔离id  | 
appId  | 字符串  | 否  | AADB1F0EBD0411B9  | 应用id  | 
attrName  | 字符串  | 是  | name  | 属性名称,模型中包含的属性名称,不包含会报错进行提示  | 
fileName  | 字符串  | 是  | demo.jpg  | 文件名称,必须为获取文件上传地址API中返回的fileName参数  | 
version  | 字符串  | 否  | 1.0  | 模型版本  | 
出参列表
出参名称  | 数据类型  | 出参描述  | 
code  | 整型  | 响应码, 200: 成功  | 
message  | 字符串  | 错误消息  | 
localizedMsg  | 字符串  | 本地语言错误消息  | 
data  | 长整型  | 响应结果  | 
请求示例
/**
 * 系统环境变量中获取的
 */
public static final String appkey = System.getenv("iot.hosting.appKey");
public static final String appSecret = System.getenv("iot.hosting.appSecret");
//边缘端数据模型服务路由
private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
public static void main(String[] args) throws UnsupportedEncodingException {
    IoTApiClientBuilderParams ioTApiClientBuilderParams =
      new IoTApiClientBuilderParams();
    ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
    ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
    SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
    IoTApiRequest request = new IoTApiRequest();
    //设置api的版本
    request.setApiVer("0.0.1");
    request.setId("42423423");
    //如果需要登录,设置当前的会话的token
    request.setIotToken("xxxxxxxxxxxxxxx");
    // 接口参数
    request.putParam("fileName","value1");
    request.putParam("scopeId","value2");
    request.putParam("modelId","value3");
    request.putParam("appId","value4");
    request.putParam("attrName","value5");
    request.putParam("fileName","value6");
    request.putParam("version","value7");
    //请求参数域名、path、request
    ApiResponse response = syncClient.postBody("api.link.aliyun.com",
      "/data/model/data/download", request, true);
    System.out.println( "response code = " + response.getCode()
      + " response = " + new String(response.getBody(), "UTF-8"));返回结果示例 JSON
{
    "id": "6fr2c332-c1db-417c-aa15-8c5trg3r5d92",
    "code": 200,
    "message": null,
    "localizedMsg": null,
    "data": {
        "url": "https://example.com/***/file/52697123****.jpg?Expires=1557902379&OSSAccessKeyId=uyedjYL******&Signature=sotMFFIq4RP%2BWJSDScE8SxvO******"
    }
}失败返回结果示例 JSON
{
    "id": "f561d973-9094-479f-81fd-95ec3e7271f5",
    "code": 52002,
    "localizedMsg": "没有字段name1的访问权限",
    "message": "没有字段name1的访问权限",
    "data": null
}2.7 数据变更消息订阅
当总线中的数据发生增删改等数据变更行为时,应用可以订阅这类数据事件。大概分成三个阶段:获取订阅队列信息、订阅并获取消息、解析消息。
2.7.1 获取订阅队列信息接口
接口描述
查询该应用的订阅队列信息
接口路径
/dop/meta/subscribe/queue
接口参数
参数名称  | 参数类型  | 必填  | 备注  | 
scopeId  | 字符串  | 否  | 业务隔离id  | 
appKey  | 字符串  | 否  | 应用id(从网关系统参数中获得)  | 
返回值
名称  | 类型  | 可选  | 备注  | 
endpoint  | 字符串  | 否  | rabbit-mq 服务地址  | 
port  | 字符串  | 否  | rabbit-mq 监听端口  | 
user  | 字符串  | 否  | rabbit-mq 用户名  | 
password  | 字符串  | 否  | rabbit-mq 密码。  | 
queue  | 字符串  | 否  | 该应用队列名称  | 
virtualHost  | 字符串  | 否  | 虚拟地址  | 
返回值示例
{   
    "id": "3826f64d-6c7b-4d21-8214-8b23771b763a",
    "code": 200,
    "endpoint": "localhost",
    "port": 1000,
    "user": "12****",
    "password": "****",
    "queue": "/abc/ddd/12****/****",
    "virtualHost":"demo"
}参考代码
//环境变量中获取,测试时候可以自行填写
private static final String appKey = System.getenv("iot.hosting.appKey");
private static final String appSecrete = System.getenv("iot.hosting.appSecret");
//边缘端数据模型服务路由
private static final String apiHost = System.getenv("iot.hosting.api.domain");
//http
@Value("${iot.hosting.api.schema}")
private boolean apiSchema;
//发送请求获得订阅队列信息
public String getSubscriberQueue() throws UnsupportedEncodingException {
    //创建边缘网关客户端
    logger.info("subscriberProxyImpl, appKey:{}, appSecrete:{}, apiHost:{}, apiSchema:{}",
            appKey, appSecrete, apiHost, apiSchema);
    IoTApiClientBuilderParams ioTApiClientBuilderParams = new IoTApiClientBuilderParams();
    ioTApiClientBuilderParams.setAppKey(appKey);
    ioTApiClientBuilderParams.setAppSecret(appSecrete);
    SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
    String subscriberQueuePath = "/dop/meta/subscribe/queue";
    IoTApiRequest request = new IoTApiRequest();
    // 接口参数--可以按照需求进行填写
    request.putParam("appKey", appKey);
    request.putParam("key", "value");
    //发送请求获得订阅队列信息
    ApiResponse apiResponse = syncClient.postBody(apiHost,
            subscriberQueuePath, request, false);
    logger.info("response code = " + apiResponse.getCode()
            + " response = " + new String(apiResponse.getBody(), "UTF-8"));
    System.out.println(new String(apiResponse.getBody(), "UTF-8"));
    return getResponseBody(apiResponse);
}
//解析边缘网关返回值
private String getResponseBody(ApiResponse apiResponse) {
    if (apiResponse == null || apiResponse.getCode() != 200) {
        return null;
    }
    String jsonObjectString;
    if (StringUtils.isEmpty(apiResponse.getBodyStr())) {
        try {
            jsonObjectString = new String(apiResponse.getBody(), "utf-8");
        } catch (UnsupportedEncodingException e) {
            return null;
        }
    } else {
        jsonObjectString = apiResponse.getBodyStr();
    }
    if (StringUtils.isEmpty(jsonObjectString)) {
        return null;
    }
    JSONObject iotxResultJson = JSON.parseObject(jsonObjectString);
    int iotCode = iotxResultJson.getInteger("code");
    if (iotCode != 200) {
        return null;
    }
    return iotxResultJson.getString("data");
}2.7.2 订阅队列
参考代码:
String queue = getResponseBody(apiResponse);
JSONObject jsonObject = (JSONObject) JSON.parse(queue);
String endpoint = jsonObject.getString("endpoint");
Integer port = jsonObject.getInteger("port");
String user = jsonObject.getString("user");
String password = jsonObject.getString("password");
String queueName = jsonObject.getString("queue");
String virtualHost = jsonObject.getString("virtualHost");
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(endpoint);
connectionFactory.setPort(port);
connectionFactory.setUsername(user);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
// 得到连接
try {
    connection = connectionFactory.newConnection();
    channel = connection.createChannel();
    AMQP.Queue.DeclareOk queueDeclare = null;
    channel.basicQos(64);
    channel.basicConsume(queueName, false, "ConsumerTag", new EdgeConsumer(channel));
} catch (IOException e) {
    e.printStackTrace();
} catch (TimeoutException e) {
    logger.error("连接消息队列失败:" + e);
    try {
        close();
    } catch (Exception e1) {
        logger.error("关闭消息连接:" + e1);
    }
    return getResponseBody(apiResponse);
}2.7.3 推送消息解析
参考代码:
//获取订阅消息的解析
JSONObject jsonObject= JSON.parseObject(String.valueOf(apiResponse.getBody()));
//模型ID
String modelId = jsonObject2.getString("modelId");
//操作类型,update表示更新,insert表示新增,delete表示删除
String operationType = jsonObject2.getString("operationType");
// 元素为long类型数值,数据在dop的唯一标示
JSONArray dataIds = jsonObject2.getJSONArray("dataIds");