对于需要控制和反馈的场景,物联网平台提供同步服务的能力。本文以华东2(上海)地域的产品和设备为例,介绍如何实现同步服务调用。

前提条件

已在物联网平台控制台对应实例下,创建产品和设备,并获取设备证书信息(ProductKey、DeviceName和DeviceSerect)。具体操作,请参见创建产品创建设备

背景信息

同步服务调用是用户服务器调用物联网平台的云端API,通过物联网平台下发服务请求到设备端,设备端在一定时间内完成服务逻辑的执行并返回结果到物联网平台的过程。用户服务器通过同步服务控制设备,并获取到设备返回结果。物联网平台的同步服务调用通过RRPC实现,详细信息,请参见设备服务调用

使用说明

  • 只有设备端在线后,才能调用同步服务。
  • 调用同步服务的最大超时时间为8秒,若8秒内物联网平台未收到回复,则返回超时错误。

准备开发环境

示例使用的开发环境如下:

步骤一:创建同步服务

  1. 登录物联网控制台
  2. 实例概览页面,找到对应的实例,单击实例进入实例详情页面。
    注意 目前华东2(上海)、华北2(北京)、华南1(深圳)地域开通了企业版实例服务。其他地域,请跳过此步骤。
    实例概览
  3. 在左侧导航栏,选择设备管理 > 产品,找到您的产品,单击产品名称右侧的查看
  4. 产品详情页面,选择功能定义页签,然后单击编辑草稿,在默认模块下添加自定义功能(服务)。
    1. 配置同步服务,如下图所示。
      添加自定义服务
    2. 单击增加参数,分别添加如下图所示的输入参数输出参数
      图 1. 输入参数
      输入参数
      图 2. 输出参数
      输出参数
    3. 单击确认,然后单击发布上线,完成添加物模型服务。

步骤二:开发设备端

  1. 打开IntelliJ IDEA,创建一个Maven工程。例如syncServiceDemo
  2. 在工程中的pom.xml文件中,配置Maven仓库地址。
    <repositories>
        <repository>
            <id>aliyunmaven</id>
            <name>aliyun maven</name>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>aliyunmavensnapshot</id>
            <name>aliyun maven snapshot</name>
            <url>https://maven.aliyun.com/nexus/content/repositories/snapshots</url>
        </repository>
    </repositories>
  3. 在工程中的pom.xml文件中,添加Maven依赖,然后单击Load Maven Changes图标,完成依赖包下载。
    <dependency>
      <groupId>com.aliyun.alink.linksdk</groupId>
      <artifactId>iot-linkkit-java</artifactId>
      <version>1.2.0.1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.8.1</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.83</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>23.0</version>
    </dependency>

    更多信息,请参见远程配置

  4. 在工程syncServiceDemo的路径/src/main/java下,创建Java类。例如SyncServiceProcessor.java,输入以下代码。
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    import com.alibaba.fastjson.JSONObject;
    import com.aliyun.alink.dm.api.DeviceInfo;
    import com.aliyun.alink.dm.api.InitResult;
    import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
    import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
    import com.aliyun.alink.linkkit.api.LinkKit;
    import com.aliyun.alink.linkkit.api.LinkKitInitParams;
    import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest;
    import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
    import com.aliyun.alink.linksdk.cmp.core.base.ARequest;
    import com.aliyun.alink.linksdk.cmp.core.base.AResponse;
    import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
    import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
    import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener;
    import com.aliyun.alink.linksdk.tools.AError;
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    
    /**
     * 同步服务调用<br>
     */
    public class SyncServiceProcessor {
    
      // ===================需要用户填写的参数,开始===========================
      // 修改Config.*的参数为您的实际信息
      // 站点id,根据实际站点获取对应id
      private static String regionId = "cn-shanghai";
      // 产品productKey,设备证书信息之一
      private static String productKey = "Config.productKey";
      // 设备名称deviceName,设备证书信息之一
      private static String deviceName = "Config.deviceName";
      // 设备密钥deviceSecret,设备证书信息之一
      private static String deviceSecret = "Config.deviceSecret";
      //设备所属物联网平台实例的ID,旧版公共实例传入空值
      private static String iotInstanceId = "";
      // ===================需要用户填写的参数,结束===========================
    
      private static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
          Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
          new ThreadFactoryBuilder().setDaemon(true).setNameFormat("http2-downlink-handle-%d").build(),
          new ThreadPoolExecutor.AbortPolicy());
    
      /**
       * 1、启动本程序模拟设备在线 <br>
       * 2、启动InvokeSyncService发起服务调用 <br>
       * 
       * @param args
       * @throws InterruptedException
       */
      public static void main(String[] args) throws InterruptedException {
    
        // 下行数据监听
        registerNotifyListener();
    
        // 设备接入
        connect(productKey, deviceName, deviceSecret);
      }
    
      /**
       * 建立连接
       * 
       * @param productKey 产品key
       * @param deviceName 设备名称
       * @param deviceSecret 设备密钥
       * @throws InterruptedException
       */
      private static void connect(String productKey, String deviceName, String deviceSecret) throws InterruptedException {
    
        // 初始化参数
        LinkKitInitParams params = new LinkKitInitParams();
    
        // 设置 Mqtt 初始化参数
        IoTMqttClientConfig config = new IoTMqttClientConfig();
        // channelHost为设备的MQTT接入地址。若设备属于企业版实例,修改值为iotInstanceId + ".mqtt.iothub.aliyuncs.com:1883"
        config.channelHost = productKey + ".iot-as-mqtt." + regionId + ".aliyuncs.com:1883";
        config.productKey = productKey;
        config.deviceName = deviceName;
        config.deviceSecret = deviceSecret;
        params.mqttClientConfig = config;
    
        // 设置初始化设备证书信息,用户传入
        DeviceInfo deviceInfo = new DeviceInfo();
        deviceInfo.productKey = productKey;
        deviceInfo.deviceName = deviceName;
        deviceInfo.deviceSecret = deviceSecret;
        params.deviceInfo = deviceInfo;
    
        // 初始化
        LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
          public void onError(AError aError) {
            System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
                + aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
          }
    
          public void onInitDone(InitResult initResult) {
            System.out.println("init success !!");
          }
        });
    
        // 确保初始化成功后才执行后面的步骤,可以根据实际情况适当延长这里的延时
        Thread.sleep(2000);
      }
    
      /**
       * 发布消息
       * 
       * @param topic 发送消息的topic
       * @param payload 发送的消息内容
       */
      private static void publish(String topic, String payload) {
        MqttPublishRequest request = new MqttPublishRequest();
        request.topic = topic;
        request.payloadObj = payload;
        request.qos = 0;
        LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
          @Override
          public void onResponse(ARequest aRequest, AResponse aResponse) {
          }
    
          @Override
          public void onFailure(ARequest aRequest, AError aError) {
          }
        });
      }
    
      /**
       * 监听下行数据
       */
      private static void registerNotifyListener() {
        LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
          @Override
          public boolean shouldHandle(String connectId, String topic) {
            return true;
          }
    
          @Override
          public void onNotify(String connectId, String topic, AMessage aMessage) {
            // 接收同步服务调用,并响应
            // 这里另启线程,避免回调堵塞
            executorService.submit(() -> doService(topic, aMessage));
          }
    
          @Override
          public void onConnectStateChange(String connectId, ConnectState connectState) {
          }
        });
      }
    
      /**
       * 处理同步服务调用
       * 
       * @param topic 服务调用指令的topic
       * @param aMessage 服务调用指令的内容
       */
      private static void doService(String topic, AMessage aMessage) {
    
        String content = new String((byte[]) aMessage.getData());
        System.out.println("服务请求Topic:" + topic);
        System.out.println("服务指令:" + content);
    
        /**
         * 服务请求 aMessage 消息体内容是一个json
         *   {
         *     "id": "123",
         *     "version": "1.0",
         *     "params": // 服务参数,取决于服务定义
         *       {
         *         "input": 50
         *       },
         *     "method": "thing.service.{tsl.service.identifier}"
         *   }
         */
        JSONObject request = JSONObject.parseObject(content);
        JSONObject params = request.getJSONObject("params");
        if (!params.containsKey("input")) { // 检查入参
          return;
        }
        Integer input = params.getInteger("input"); // 获取入参
    
        /**
         * 服务响应格式消息体内容是一个json
         *   {
         *     "id": "123",  // 同上面服务请求的id
         *     "code": 200,  // 200表示成功
         *     "data": {}    // 服务返回,取决于服务定义
         *   }
         */
        JSONObject response = new JSONObject();
        JSONObject data = new JSONObject();
        data.put("output", input + 1);
        response.put("id", request.get("id"));
        response.put("code", 200);
        response.put("data", data);
    
        // 服务响应
        String respTopic = topic.replace("/request/", "/response/");
        publish(respTopic, response.toString());
        System.out.println("服务响应Topic:" + respTopic);
        System.out.println("服务响应内容:" + response.toString());
      }
    }

    关于设备通用code内容,请参见设备端通用code

  5. 在以上代码中配置实际设备相关参数。
    参数 示例 说明
    region cn-shanghai 您物联网平台设备所在地域的代码。地域代码表达方法,请参见地域列表
    productKey a1W**** 您添加设备后,保存的设备证书信息,请参见获取设备证书

    您也可在控制台中设备的设备详情页面查看。

    deviceName device2
    deviceSecret ff01e59d1a***
    iotInstanceId "" 实例ID。您可在物联网平台控制台实例概览页面,查看当前实例的ID。
    • 若有ID值,必须传入该ID值。
    • 若无实例概览页面或ID值,传入空值,即iotInstanceId = ''

    实例的详细说明,请参见实例概述

    config.channelHost productKey + ".iot-as-mqtt." + regionId + ".aliyuncs.com:1883 设备的MQTT接入地址。
    • 旧版公共实例:config.channelHost = productKey + ".iot-as-mqtt." + regionId + ".aliyuncs.com:1883";
    • 新版公共实例和企业版实例:config.channelHost = iotInstanceId + ".mqtt.iothub.aliyuncs.com:1883";

    详细说明,请参见查看实例终端节点

步骤三:开发服务端SDK

  1. 在IntelliJ IDEA中,创建一个Maven工程。例如ServerDemo。
  2. 在工程中的pom.xml文件中,配置Maven仓库地址。
    <repositories>
        <repository>
            <id>aliyunmaven</id>
            <name>aliyun maven</name>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>aliyunmavensnapshot</id>
            <name>aliyun maven snapshot</name>
            <url>https://maven.aliyun.com/nexus/content/repositories/snapshots</url>
        </repository>
    </repositories>
  3. 在工程中的pom.xml文件中,增加Maven依赖,然后单击Load Maven Changes图标,完成依赖包下载。
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-iot</artifactId>
        <version>7.41.0</version>
    </dependency>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.6</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>

    更多信息,请参见Java SDK使用说明

  4. 在工程ServerDemo的路径/src/main/java下,创建Java类。例如InvokeSyncService.java,输入以下代码。
    import com.alibaba.fastjson.JSONObject;
    import com.aliyuncs.DefaultAcsClient;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import com.aliyuncs.iot.model.v20180120.InvokeThingServiceRequest;
    import com.aliyuncs.iot.model.v20180120.InvokeThingServiceResponse;
    import com.aliyuncs.profile.DefaultProfile;
    import com.aliyuncs.profile.IClientProfile;
    
    /**
     * 同步服务调用<br>
     */
    public class InvokeSyncService {
    
      // ===================需要用户填写的参数开始===========================
      // 修改Config.*的参数为您的实际信息
      // 站点id,根据实际站点获取对应id
      private static String regionId = "cn-shanghai";
      // 用户账号AccessKey
      private static String accessKeyID = "Config.accessKey";
      // 用户账号AccesseKeySecret
      private static String accessKeySecret = "Config.accessKeySecret";
      // 产品productKey,执行服务的设备证书信息之一
      private static String productKey = "Config.productKey";
      // 设备名字deviceName,执行服务的设备证书信息之一
      private static String deviceName = "Config.deviceName";
      //设备所属物联网平台实例的ID,旧版公共实例传入空值
      private static String iotInstanceId = "";
      // ===================需要用户填写的参数结束===========================
    
      /**
       * 1、启动SyncServiceProcessor模拟设备在线 <br>
       * 2、启动本程序发起服务调用 <br>
       * 
       * @param args
       * @throws ServerException
       * @throws ClientException
       */
      public static void main(String[] args) throws ServerException, ClientException {
    
        // 获取服务端请求客户端
        DefaultAcsClient client = null;
        try {
          IClientProfile profile = DefaultProfile.getProfile(regionId, accessKeyID, accessKeySecret);
          DefaultProfile.addEndpoint(regionId, regionId, "Iot", "iot." + regionId + ".aliyuncs.com");
          client = new DefaultAcsClient(profile);
        } catch (Exception e) {
          System.out.println("create OpenAPI Client failed !! exception:" + e.getMessage());
        }
    
        // 填充服务调用的参数
        InvokeThingServiceRequest request = new InvokeThingServiceRequest();
        request.setIotInstanceId(iotInstanceId);
        request.setProductKey(productKey); // 设备证书之productKey
        request.setDeviceName(deviceName); // 设备证书之deviceName
        request.setIdentifier("MySyncService"); // 要调用的服务标识符,取决于服务定义
        JSONObject json = new JSONObject(); // 构造服务入参,服务入参是一个JSON String
        json.put("input", 50); // 取决于服务定义,取值要符合服务定义时配置的参数规格
        request.setArgs(json.toString());
    
        // 获得服务调用响应
        InvokeThingServiceResponse response = client.getAcsResponse(request);
        if (response == null) {
          System.out.println("调用服务异常");
          return;
        }
        System.out.println("requestId:" + response.getRequestId());
        System.out.println("code:" + response.getCode());
        System.out.println("success:" + response.getSuccess());
        System.out.println("error message:" + response.getErrorMessage());
        if (response != null && response.getSuccess()) { // 服务调用成功,仅代表发送服务指令的成功,不代表执行服务本身是否成功
          System.out.println("服务调用成功");
          System.out.println("消息id:" + response.getData().getMessageId());
          System.out.println("服务返回结果:" + response.getData().getResult()); // 仅同步服务有result
        } else {
          System.out.println("服务调用失败");
        }
      }
    
    }

    其中,同步服务调用的更多信息,请参见InvokeThingService

  5. 在以上代码中配置实际设备相关参数。
    参数 示例 说明
    region cn-shanghai 您物联网平台设备所在地域的代码。地域代码表达方法,请参见地域列表
    accessKeySecret LTAI4****jiW5j3 您的阿里云账号的AccessKey ID和AccessKey Secret。

    登录物联网平台控制台,将鼠标移至账号头像上,然后单击AccessKey管理,获取AccessKey ID和AccessKey Secret。

    说明 如果使用RAM用户,您需授予该RAM用户管理物联网平台的权限(AliyunIOTFullAccess),否则将连接失败。授权方法请参见授权RAM用户访问物联网平台
    accessKeySecret iMS8******s
    productKey a1W**** 您添加设备后,保存的设备证书信息,请参见获取设备证书

    您也可在控制台中设备的设备详情页面查看。

    deviceName device2
    iotInstanceId "" 实例ID。您可在物联网平台控制台实例概览页面,查看当前实例的ID。
    • 若有ID值,必须传入该ID值。
    • 若无实例概览页面或ID值,传入空值,即iotInstanceId = ''

    实例的详细说明,请参见实例概述

步骤四:整体联调

  1. 运行程序文件SyncServiceProcessor.java,启动设备。
  2. 运行程序文件InvokeSyncService.java,调用同步服务。
  3. 查看调用服务的日志。
    设备端日志:设备端
    服务端日志:服务端
    您也可在物联网平台控制台对应实例的监控运维 > 日志服务页面,选择对应产品,查看相关日志。云端日志