全部产品
云市场
云游戏

边缘应用数据总线对接

更新时间:2020-03-09 18:22:52

1.整体介绍

平台提供了对数据进行增删改查的4个API接口。如果两个应用,涉及到相同的数据模型时,是共享一份数据,还是独立管理各自的数据。通过应用声明,应用在上架到应用市场之后,用户就能感知应用与数据之间的关系。
目前通用的集成方案是以项目为隔离维度的。也就是说,在一个项目内的所有应用,他们关联的相同的数据模型,会被默认放在同一个隔离区域ScopeID内。

2.数据模型API文档

2.1 新增数据模型接口

接口描述

API版本 0.0.3
授权类型 APPSIGN
协议 HTTPS
请求方法 Post
域名(环境变量中获取) System.getenv(“iot.hosting.api.domain”)
路径 /data/model/data/insert

入参说明

入参名称 数据类型 是否必须 入参示例 入参描述
modelId 字符串 test_model1 模型id
scopeId 字符串 1C35315598694F 业务隔离id
appId 字符串 AADB1F0EBD0411B9 应用id
properties 复杂对象 {“name”:”xxx”,”age”:18} 模型对应的字段名称以及字段值的json数据

出参列表

出参名称 数据类型 出参描述
code 整形 响应码, 200: 成功
message 字符串 错误消息
localizedMsg 字符串 本地语言错误消息
data 长整型 响应结果 返回的长整型数据是增加的数据的id

请求示例

  1. /**
  2. * 系统环境变量中获取的
  3. */
  4. public static final String appkey = System.getenv("iot.hosting.appKey");
  5. public static final String appSecret = System.getenv("iot.hosting.appSecret");
  6. //边缘端数据模型服务路由
  7. private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
  8. public static void main(String[] args) throws UnsupportedEncodingException {
  9. IoTApiClientBuilderParams ioTApiClientBuilderParams =
  10. new IoTApiClientBuilderParams();
  11. ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
  12. ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
  13. SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
  14. IoTApiRequest request = new IoTApiRequest();
  15. //设置api的版本
  16. request.setApiVer("0.0.3");
  17. //如果需要登陆,设置当前的会话的token
  18. request.setIotToken("xxxxxxxxxxxxxxx");
  19. // 接口参数
  20. JSONObject properties = new JSONObject();
  21. request.putParam("modelId","value1");
  22. request.putParam("scopeId","value2");
  23. request.putParam("appId","value3");
  24. request.putParam("properties",properties);
  25. //请求参数域名、path、request
  26. ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
  27. "/data/model/data/insert", request, true);
  28. System.out.println( "response code = " + response.getCode()
  29. + " response = " + new String(response.getBody(), "UTF-8"));
  30. }

返回结果示例 JSON

  1. {
  2. "id": "3826f64d-6c7b-4d21-8214-8b23771b763a"
  3. "code": 200,
  4. "localizedMsg": null,
  5. "message": null,
  6. "data": 791
  7. }

失败返回结果示例 JSON

  1. {
  2. "id": "f561d973-9094-479f-81fd-95ec3e7271f5",
  3. "code": 52009,
  4. "localizedMsg": "传入的参数和模型字段不匹配:name1",
  5. "message": "传入的参数和模型字段不匹配:name1",
  6. "data": null
  7. }

2.2 删除数据模型接口

接口描述

API版本 0.0.2
授权类型 APPSIGN
协议 HTTPS
请求方法 Post
域名(环境变量中获取) System.getenv(“iot.hosting.api.domain”)
路径 /data/model/data/delete

入参说明

入参名称 数据类型 是否必须 入参示例 入参描述
modelId 字符串 test_model1 模型id
scopeId 字符串 1C35315598694F 业务隔离id
appId 字符串 AADB1F0EBD0411B9 应用id
conditions 复杂对象数组 [{“fieldName”: “id”,”operate”: “eq”,”value”: 7}] 可以输入多个条件 fieldName 字段名称 operate 操作符 value 字段值

出参列表

出参名称 数据类型 出参描述
code 整形 响应码, 200: 成功
message 字符串 错误消息
localizedMsg 字符串 本地语言错误消息
data 长整型 响应结果 返回的长整型数据是增加的数据的id

请求示例

  1. /**
  2. * 系统环境变量中获取的
  3. */
  4. public static final String appkey = System.getenv("iot.hosting.appKey");
  5. public static final String appSecret = System.getenv("iot.hosting.appSecret");
  6. //边缘端数据模型服务路由
  7. private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
  8. public static void main(String[] args) throws UnsupportedEncodingException {
  9. IoTApiClientBuilderParams ioTApiClientBuilderParams =
  10. new IoTApiClientBuilderParams();
  11. ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
  12. ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
  13. SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
  14. IoTApiRequest request = new IoTApiRequest();
  15. //设置api的版本
  16. request.setApiVer("0.0.2");
  17. //如果需要登陆,设置当前的会话的token
  18. request.setIotToken("xxxxxxxxxxxxxxx");
  19. // 接口参数
  20. request.putParam("modelId","value1");
  21. request.putParam("scopeId","value2");
  22. request.putParam("appId","value3");
  23. request.putParam("conditions", JSON.parseArray("[{\"fieldName\": \"id\",\"operate\": \"eq\",\"value\": 1862}]"));
  24. //请求参数域名、path、request
  25. ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
  26. "/data/model/data/delete", request, true);
  27. System.out.println( "response code = " + response.getCode()
  28. + " response = " + new String(response.getBody(), "UTF-8"));
  29. }

返回结果示例 JSON

  1. {
  2. "code": 200,
  3. "data": 2,
  4. "message": "success"
  5. }

失败返回结果示例 JSON

  1. {
  2. "code": 500,
  3. "data": null
  4. "message": "系统错误.删除数据异常"
  5. }

2.3 修改数据模型接口

接口描述

API版本 0.0.2
授权类型 APPSIGN
协议 HTTPS
请求方法 Post
域名(环境变量中获取) System.getenv(“iot.hosting.api.domain”)
路径 /data/model/data/update

入参说明

入参名称 数据类型 是否必须 入参示例 入参描述
modelId 字符串 test_model1 模型id
scopeId 字符串 1C35315598694F 业务隔离id
appId 字符串 AADB1F0EBD0411B9 应用id
conditions 复杂对象数组 [{“fieldName”: “id”,”operate”: “eq”,”value”: 7}] 可以输入多个条件 fieldName 字段名称 operate 操作符 value 字段值
updateDetails 复杂对象 {“name”:”xxxx”,”age”:20} 模型字段名称与字段值的一个json数据

出参列表

出参名称 数据类型 出参描述
code 整形 响应码, 200: 成功
message 字符串 错误消息
localizedMsg 字符串 本地语言错误消息
data 长整型 响应结果 返回的长整型数据是增加的数据的id

请求示例

  1. /**
  2. * 系统环境变量中获取的
  3. */
  4. public static final String appkey = System.getenv("iot.hosting.appKey");
  5. public static final String appSecret = System.getenv("iot.hosting.appSecret");
  6. //边缘端数据模型服务路由
  7. private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
  8. public static void main(String[] args) throws UnsupportedEncodingException {
  9. IoTApiClientBuilderParams ioTApiClientBuilderParams =
  10. new IoTApiClientBuilderParams();
  11. ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
  12. ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
  13. SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
  14. IoTApiRequest request = new IoTApiRequest();
  15. //设置api的版本
  16. request.setApiVer("0.0.2");
  17. //如果需要登陆,设置当前的会话的token
  18. request.setIotToken("xxxxxxxxxxxxxxx");
  19. // 接口参数
  20. request.putParam("modelId","value1");
  21. request.putParam("scopeId","value2");
  22. request.putParam("appId","value3");
  23. JSONObject updateDetails = new JSONObject();
  24. updateDetails.put("key",value);
  25. request.putParam("updateDetails", updateDetails);
  26. //请求参数域名、path、request
  27. ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
  28. "/data/model/data/update", request, true);
  29. System.out.println( "response code = " + response.getCode()
  30. + " response = " + new String(response.getBody(), "UTF-8"));
  31. }

返回结果示例 JSON

  1. {
  2. "code": 200,
  3. "data": 3,
  4. "message": "success"
  5. }

失败返回结果示例 JSON

  1. {
  2. "code": 500,
  3. "data": null
  4. "message": "系统错误.删除数据异常"
  5. }

2.4 查询数据模型接口

接口描述

API版本 0.0.3
授权类型 APPSIGN
协议 HTTPS
请求方法 Post
域名(环境变量中获取) System.getenv(“iot.hosting.api.domain”)
路径 /data/model/data/query

入参说明

入参名称 数据类型 是否必须 入参示例 入参描述
modelId 字符串 test_model1 模型id
scopeId 字符串 1C35315598694F 业务隔离id
appId 字符串 AADB1F0EBD0411B9 应用id
pageSize
整形 缺省默认值20 页大小 最大200
pageNum 整形 缺省默认值1 页码
conditions 复杂对象数组 [{“fieldName”: “id”,”operate”: “比较符”,”value”: 7}] 可以输入多个条件 fieldName 字段名称 operate 操作符 value 字段值,不同类型比较符参考下述列表
returnFields 复杂对象数组 [“*”]或者[“name”,”age”] 返回全部输入*,返回模型的某几个字段,输入字段的名称
orderBy 复杂对象 {“asc”:”true”,”orderByFields”:[“name”,”age”]} asc=true为升序,需要模型的哪个字段进行排序,依次传入模型的字段名称

比较符参考表

operate 对应数据类型
eq boolean、String、Date、Integer、Double
neq String
in String、Date、Integer、Double
nin String、Date、Integer、Double
It Date、Integer、Double
Iteg Date、Integer、Double
mt Date、Integer、Double
mteq Date、Integer、Double
bt Date、Integer、Double

出参列表

出参名称 数据类型 出参描述
code 整形 响应码, 200: 成功
message 字符串 错误消息
localizedMsg 字符串 本地语言错误消息
data 长整型 data格式示列 {\"count\":1,\"hasNext\":false,\"items\":[{\"gmt_create\":1551872701000,\"name\":\"33\",\"id\":2,\"gmt_modified\":1551872714000}],\"pageNum\":1,\"pageSize\":10}包含页大小pageSize,当前页码pageNum,数据总数count以及数据的集合items还有是否还有下一页hasNext,其中items中返回的参数就是创建模型时定义的模型的字段,以及字段对应的值

请求示例

  1. /**
  2. * 系统环境变量中获取的
  3. */
  4. public static final String appkey = System.getenv("iot.hosting.appKey");
  5. public static final String appSecret = System.getenv("iot.hosting.appSecret");
  6. //边缘端数据模型服务路由
  7. private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
  8. public static void main(String[] args) throws UnsupportedEncodingException {
  9. IoTApiClientBuilderParams ioTApiClientBuilderParams =
  10. new IoTApiClientBuilderParams();
  11. ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
  12. ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
  13. SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
  14. IoTApiRequest request = new IoTApiRequest();
  15. //设置api的版本
  16. request.setApiVer("0.0.2");
  17. // 接口参数
  18. request.putParam("modelId","value1");
  19. request.putParam("scopeId","value2");
  20. request.putParam("appId","value3");
  21. List<String> returnFields = Lists.newArrayList("sceneId", "userId", "ctrlColumn ", "condition","val");
  22. request.putParam("returnFields", returnFields);
  23. request.putParam("conditions", JSON.parseArray("[{\"fieldName\": \"id\",\"operate\": \"eq\",\"value\": 1862}]"));
  24. //请求参数域名、path、request
  25. ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
  26. "/data/model/data/query", request, true);
  27. System.out.println( "response code = " + response.getCode()
  28. + " response = " + new String(response.getBody(), "UTF-8"));
  29. }

返回结果示例 JSON

  1. {
  2. "id": "4636f64d-5c8b-4d21-8214-8b237715721c"
  3. "code": 200,
  4. "message": null
  5. "localizedMsg": null,
  6. "data": "{\"count\":1,\"hasNext\":false,\"items\":[{\"gmt_create\":1551872701000,\"name\":\"33\",\"id\":2,\"gmt_modified\":1551872714000}],\"pageNum\":1,\"pageSize\":10}"
  7. }
  8. }

失败返回结果示例 JSON

  1. {
  2. "id": "f678d543-5436-897f-76fd-45ec3e1243a9",
  3. "code": 52009,
  4. "localizedMsg": "传入的参数和模型字段不匹配:name1",
  5. "message": "传入的参数和模型字段不匹配:name1",
  6. "data": null
  7. }

2.5 上传文件数据模型接口

接口描述

API版本 0.0.1
授权类型 APPSIGN
协议 HTTPS
请求方法 Post
域名(环境变量中获取) System.getenv(“iot.hosting.api.domain”)
路径 /data/model/data/upload

入参说明

入参名称 数据类型 是否必须 入参示例 入参描述
modelId 字符串 test_model1 模型id
scopeId 字符串 1C35315598694F 业务隔离id
appId 字符串 AADB1F0EBD0411B9 应用id
fileSize
整形 4096 文件大小,以字节为单位,目前系统不支持5M以上文件
attrName 字符串 name 属性名称,模型中包含的属性名称,不包含会报错进行提示
fileType 字符串 jpg 文件类型,目前系统只支持bmp,png,gif,jpg
version 字符串
1.0 模型版本

出参列表

出参名称 数据类型 出参描述
code 整形 响应码, 200: 成功
message 字符串 错误消息
localizedMsg 字符串 本地语言错误消息
data 长整型 响应结果

请求示例

  1. /**
  2. * 系统环境变量中获取的
  3. */
  4. public static final String appkey = System.getenv("iot.hosting.appKey");
  5. public static final String appSecret = System.getenv("iot.hosting.appSecret");
  6. //边缘端数据模型服务路由
  7. private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
  8. public static void main(String[] args) throws UnsupportedEncodingException {
  9. IoTApiClientBuilderParams ioTApiClientBuilderParams =
  10. new IoTApiClientBuilderParams();
  11. ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
  12. ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
  13. SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
  14. IoTApiRequest request = new IoTApiRequest();
  15. //设置api的版本
  16. request.setApiVer("0.0.2");
  17. // 接口参数
  18. request.putParam("scopeId","value1");
  19. request.putParam("appId","value2");
  20. request.putParam("modelId","value3");
  21. request.putParam("fileSize","value4");
  22. request.putParam("attrName","value5");
  23. request.putParam("fileType","value6");
  24. request.putParam("version","value7");
  25. ApiResponse response = syncClient.postBody(DATA_EDGE_PATH,
  26. "/data/model/data/upload", request, true);
  27. System.out.println( "response code = " + response.getCode()
  28. + " response = " + new String(response.getBody(), "UTF-8"));
  29. }

返回结果示例 JSON

  1. {
  2. "id": "6fr2c332-c1db-417c-aa15-8c5trg3r5d92",
  3. "code": 200,
  4. "message": null,
  5. "localizedMsg": null,
  6. "data": {
  7. "fileName": "5269712352e5.jpg",
  8. "url": "https://xxxxx.xxx.xx.com/xxx/file/5269712352e5.jpg?Expires=1557902379&OSSAccessKeyId=uyedjYLHDH3DF&Signature=sotMFFIq4RP%2BWJSDScE8SxvOlvo%3D"
  9. }

失败返回结果示例 JSON

  1. {
  2. "id": "f561d973-9094-479f-81fd-95ec3e7271f5",
  3. "code": 52002,
  4. "localizedMsg": "没有字段name1的访问权限",
  5. "message": "没有字段name1的访问权限",
  6. "data": null
  7. }

2.6 下载文件数据模型接口

接口描述

API版本 0.0.1
授权类型 APPSIGN
协议 HTTPS
请求方法 Post
域名(环境变量中获取) System.getenv(“iot.hosting.api.domain”)
路径 /data/model/data/download

入参说明

入参名称 数据类型 是否必须 入参示例 入参描述
modelId 字符串 test_model1 模型id
scopeId 字符串 1C35315598694F 业务隔离id
appId 字符串 AADB1F0EBD0411B9 应用id
attrName 字符串 name 属性名称,模型中包含的属性名称,不包含会报错进行提示
version 字符串
1.0 模型版本

出参列表

出参名称 数据类型 出参描述
code 整形 响应码, 200: 成功
message 字符串 错误消息
localizedMsg 字符串 本地语言错误消息
data 长整型 响应结果

请求示例

  1. /**
  2. * 系统环境变量中获取的
  3. */
  4. public static final String appkey = System.getenv("iot.hosting.appKey");
  5. public static final String appSecret = System.getenv("iot.hosting.appSecret");
  6. //边缘端数据模型服务路由
  7. private static final String DATA_EDGE_PATH = System.getenv("iot.hosting.api.domain");
  8. public static void main(String[] args) throws UnsupportedEncodingException {
  9. IoTApiClientBuilderParams ioTApiClientBuilderParams =
  10. new IoTApiClientBuilderParams();
  11. ioTApiClientBuilderParams.setAppKey("你的<AppKey>");
  12. ioTApiClientBuilderParams.setAppSecret("你的<AppSecret>");
  13. SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
  14. IoTApiRequest request = new IoTApiRequest();
  15. //设置api的版本
  16. request.setApiVer("0.0.1");
  17. request.setId("42423423");
  18. //如果需要登陆,设置当前的会话的token
  19. request.setIotToken("xxxxxxxxxxxxxxx");
  20. // 接口参数
  21. request.putParam("fileName","value1");
  22. request.putParam("scopeId","value2");
  23. request.putParam("modelId","value3");
  24. request.putParam("appId","value4");
  25. request.putParam("attrName","value5");
  26. request.putParam("version","value6");
  27. //请求参数域名、path、request
  28. ApiResponse response = syncClient.postBody("api.link.aliyun.com",
  29. "/data/model/data/download", request, true);
  30. System.out.println( "response code = " + response.getCode()
  31. + " response = " + new String(response.getBody(), "UTF-8"));

返回结果示例 JSON

  1. {
  2. "id": "6fr2c332-c1db-417c-aa15-8c5trg3r5d92",
  3. "code": 200,
  4. "message": null,
  5. "localizedMsg": null,
  6. "data": {
  7. "url": "https://xxxxx.xxx.xx.com/xxx/file/5269712352e5.jpg?Expires=1557902379&OSSAccessKeyId=uyedjYLHDH3DF&Signature=sotMFFIq4RP%2BWJSDScE8SxvOlvo%3D"
  8. }
  9. }

失败返回结果示例 JSON

  1. {
  2. "id": "f561d973-9094-479f-81fd-95ec3e7271f5",
  3. "code": 52002,
  4. "localizedMsg": "没有字段name1的访问权限",
  5. "message": "没有字段name1的访问权限",
  6. "data": null
  7. }

2.7 数据变更消息订阅


当总线中的数据发生增删改等数据变更行为时,应用可以订阅这类数据事件。大概分成三个阶段:获取订阅队列信息、订阅并获取消息、解析消息。

2.7.1 获取订阅队列信息接口

接口描述

查询该应用的订阅队列信息

接口路径

/dop/meta/subscribe/queue

接口参数

参数名称 参数类型 必填 备注
scopeId 字符串 业务隔离id
appKey 字符串 应用id(从网关系统参数中获得)

返回值

名称 类型 可选 备注
endpoint 字符串 rabbit-mq 服务地址
port 字符串 rabbit-mq 监听端口
user 字符串 rabbit-mq 用户名
password 字符串 rabbit-mq 密码。
queue 字符串 该应用队列名称
virtualHost 字符串 虚拟地址

返回值示例

  1. {
  2. "id": "3826f64d-6c7b-4d21-8214-8b23771b763a"
  3. "code": 200,
  4. "endpoint": "localhost",
  5. "port": 1000,
  6. "user": "123456",
  7. "password": "xxx",
  8. "queue": "/abc/ddd/123456/xxx",
  9. "virtualHost":"xxx"
  10. }

参考代码

  1. //环境变量中获取,测试时候可以自行填写
  2. private static final String appKey = System.getenv("iot.hosting.appKey");
  3. private static final String appSecrete = System.getenv("iot.hosting.appSecret");
  4. //边缘端数据模型服务路由
  5. private static final String apiHost = System.getenv("iot.hosting.api.domain");
  6. //http
  7. @Value("${iot.hosting.api.schema}")
  8. private boolean apiSchema;
  9. public String getSubscriberQueue() throws UnsupportedEncodingException {
  10. //创建边缘网关客户端
  11. logger.info("subscriberProxyImpl, appKey:{}, appSecrete:{}, apiHost:{}, apiSchema:{}",
  12. appKey, appSecrete, apiHost, apiSchema);
  13. IoTApiClientBuilderParams ioTApiClientBuilderParams = new IoTApiClientBuilderParams();
  14. ioTApiClientBuilderParams.setAppKey(appKey);
  15. ioTApiClientBuilderParams.setAppSecret(appSecrete);
  16. SyncApiClient syncClient = new SyncApiClient(ioTApiClientBuilderParams);
  17. String subscriberQueuePath = "/dop/meta/subscribe/queue";
  18. IoTApiRequest request = new IoTApiRequest();
  19. // 接口参数--可以按照需求进行填写
  20. request.putParam("appKey", appKey);
  21. request.putParam("key", "value");
  22. //发送请求获得订阅队列信息
  23. ApiResponse apiResponse = syncClient.postBody(apiHost,
  24. subscriberQueuePath, request, false);
  25. logger.info("response code = " + apiResponse.getCode()
  26. + " response = " + new String(apiResponse.getBody(), "UTF-8"));
  27. System.out.println(new String(apiResponse.getBody(), "UTF-8"));
  28. //解析边缘网关返回值
  29. private String getResponseBody(ApiResponse apiResponse) {
  30. if (apiResponse == null || apiResponse.getCode() != 200) {
  31. return null;
  32. }
  33. String jsonObjectString;
  34. if (StringUtils.isEmpty(apiResponse.getBodyStr())) {
  35. try {
  36. jsonObjectString = new String(apiResponse.getBody(), "utf-8");
  37. } catch (UnsupportedEncodingException e) {
  38. return null;
  39. }
  40. } else {
  41. jsonObjectString = apiResponse.getBodyStr();
  42. }
  43. if (StringUtils.isEmpty(jsonObjectString)) {
  44. return null;
  45. }
  46. JSONObject iotxResultJson = JSON.parseObject(jsonObjectString);
  47. int iotCode = iotxResultJson.getInteger("code");
  48. if (iotCode != 200) {
  49. return null;
  50. }
  51. return iotxResultJson.getString("data");
  52. }

2.7.2 订阅队列


参考代码:

  1. String queue = getResponseBody(apiResponse);
  2. JSONObject jsonObject = (JSONObject) JSON.parse(queue);
  3. String endpoint = jsonObject.getString("endpoint");
  4. Integer port = jsonObject.getInteger("port");
  5. String user = jsonObject.getString("user");
  6. String password = jsonObject.getString("password");
  7. String queueName = jsonObject.getString("queue");
  8. String virtualHost = jsonObject.getString("virtualHost");
  9. ConnectionFactory connectionFactory = new ConnectionFactory();
  10. connectionFactory.setHost(endpoint);
  11. connectionFactory.setPort(port);
  12. connectionFactory.setUsername(user);
  13. connectionFactory.setPassword(password);
  14. connectionFactory.setVirtualHost(virtualHost);
  15. // 得到 连接
  16. try {
  17. connection = connectionFactory.newConnection();
  18. channel = connection.createChannel();
  19. AMQP.Queue.DeclareOk queueDeclare = null;
  20. queueDeclare = channel.queueDeclare(queueName, true, false, false, null);
  21. channel.basicQos(64);
  22. channel.basicConsume(queueName, false, "ConsumerTag", new EdgeConsumer(channel));
  23. } catch (IOException e) {
  24. e.printStackTrace();
  25. } catch (TimeoutException e) {
  26. logger.error("连接消息队列失败:" + e);
  27. try {
  28. close();
  29. } catch (Exception e1) {
  30. logger.error("关闭消息连接:" + e1);
  31. }
  32. return getResponseBody(apiResponse);
  33. }

2.7.3 推送消息解析


参考代码:

  1. //获取订阅消息的解析
  2. JSONObject jsonObject= JSON.parseObject(String.valueOf(apiResponse.getBody()));
  3. //模型ID
  4. String modelId = jsonObject2.getString("modelId");
  5. //操作类型,update表示更新,insert表示新增,delete表示删除
  6. String operationType = jsonObject2.getString("operationType");
  7. // 元素为long类型数值,数据在dop的唯一标示
  8. JSONArray dataIds = jsonObject2.getJSONArray("dataIds");