数据接入

更新时间:

数据接入服务,是一种接入领域服务,但是无须改造原始服务代码的接入模式,这种模式依赖于云测运行的函数脚本和需要连接的服务端进行连接,并在函数中进行业务逻辑转换,并将转换结果暴露到两条总线上的接入方式。

1.接入原理

image.jpg

2.脚本编写描述

2.1 依赖的二方包

函数脚本通过阿里云函数计算实现,所以需要引入对函数计算SDK的依赖。

<dependency>
  <groupId>com.aliyun.fc.runtime</groupId>
  <artifactId>fc-java-core</artifactId>
  <version>1.2.1</version>
</dependency>

2.2 函数请求参数

函数入口传入参数需转换为基本请求参数,基本请求参数中各属性定义如下:

属性名称

作用

默认参数

id

请求中全局唯一ID

version

请求协议版本

request

系统参数

服务模型:{ “type”:”SERVICE”, “apiPath”:”调用的服务接口apiPath}“数据模型:{ “type”:”DATA”, “dataModelId”:”数据模型ID”}

params

业务参数

1. 服务模型:服务模型中对应接口的入参2. 数据模型:{“startTime”: “unix时间戳,秒级”,“endTime”:”unix时间戳,秒级”}

2.3 函数请求BaseRequest

package yourPackage;

import java.io.Serializable;
import java.util.Map;

public class BaseRequest implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * request里的全局唯一id透传
     */
    private String id;

    /**
     * 请求协议版本
     */
    private String version;

    /**
     * 系统参数
     */
    private Map<String, Object> request;

    /**
     * 业务参数
     */
    private Map<String,Object> params;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getVersion() {
        return version;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    public Map<String, Object> getRequest() {
        return request;
    }

    public void setRequest(Map<String, Object> request) {
        this.request = request;
    }

    public Map<String,Object> getParams() {
        return params;
    }

    public void setParams(Map<String,Object> params) {
        this.params = params;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("{");
        sb.append("\"id\":\"")
                .append(id == null ? "" : id).append('\"');
        sb.append(",\"version\":\"")
                .append(version == null ? "" : version).append('\"');
        sb.append(",\"request\":")
                .append(request);
        sb.append(",\"params\":")
                .append(params);
        sb.append('}');
        return sb.toString();
    }
}

2.4 函数响应BaseResponse

package yourPackage;

import java.io.Serializable;

public class BaseResponse<T> implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * request里的全局唯一id透传
     */
    private String id;

    /**
     * code
     */
    private int code = 200;

    /**
     * 失败时必填,错误调试信息;成功时不填
     */
    private String message;

    /**
     * 失败时必填,用户可理解语言描述的错误信息;成功时不填
     */
    private String localizedMsg;

    /**
     * 成功时必填,失败时选填
     */
    private T data;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public String getLocalizedMsg() {
        return localizedMsg;
    }

    public void setLocalizedMsg(String localizedMsg) {
        this.localizedMsg = localizedMsg;
    }

    public T getData() {
        return data;
    }

    public void setData(T data) {
        this.data = data;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("{");
        sb.append("\"id\":\"")
                .append(id == null ? "" : id).append('\"');
        sb.append(",\"code\":")
                .append(code);
        sb.append(",\"message\":\"")
                .append(message == null ? "" : message).append('\"');
        sb.append(",\"localizedMsg\":\"")
                .append(localizedMsg == null ? "" : localizedMsg).append('\"');
        sb.append(",\"data\":")
                .append(data);
        sb.append('}');
        return sb.toString();
    }
}

2.5 Demo的具体实现

package yourPackage;

import com.alibaba.fastjson.JSON;
import com.aliyun.fc.runtime.Context;
import com.aliyun.fc.runtime.FunctionInitializer;
import com.aliyun.fc.runtime.StreamRequestHandler;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

public class ApplicationAccessDemo implements StreamRequestHandler, FunctionInitializer {
    @Override
    public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
        if (inputStream != null) {
            String request = IOUtils.toString(inputStream, "UTF-8");
            BaseRequest baseRequest = JSON.parseObject(request, BaseRequest.class);
            if (StringUtils.isNotEmpty(request) && baseRequest != null && MapUtils.isEmpty(baseRequest.getRequest())) {
                //定时任务启动时解析传入请求
                baseRequest = JSON.parseObject((String) JSON.parseObject(request).get("payload"), BaseRequest.class);
            }
            String type = MapUtils.getString(baseRequest.getRequest(), "type");
            BaseResponse baseResponse = null;
            if ("SERVICE".equals(type)) {
                String apiPath = MapUtils.getString(baseRequest.getRequest(), "apiPath");
                //TODO 服务模型具体的处理逻辑,返回baseResponse
            } else if ("DATA".equals(type)) {
                String dataModelId = MapUtils.getString(baseRequest.getRequest(), "dataModelId");
                //TODO 数据模型具体的处理逻辑,返回baseResponse
            }

            //输出baseResponse
            try {
                outputStream.write(JSON.toJSONString(baseResponse).getBytes("UTF-8"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void initialize(Context context) throws IOException {
        //TODO 按需编写初始化操作,如agent的启动等等
    }
}

文件开发完成,打包后在控制台上传即可(mvn clean package -Dmaven.skip.test=true)。注意在上传时函数入口与初始化函数入口的填写,以demo为例,函数入口yourPackage.ApplicationAccessDemo::handleRequest ,初始化入口:yourPackage.ApplicationAccessDemo::initialize。

2.6 网络代理的使用

见网络代理使用文档。

3. 业务操作流程

3.1 创建转换器

在应用服务平台中>数据接入>新增转换器,选择脚本所对应的模型image.jpg

3.2 转换器-上传函数

创建完的转换器中将会展示所有领域模型关联的服务模型和数据模型,每个模型都可以上传一个函数脚本,脚本格式为jar包,并且填写函数的主入口和初始化入口image.jpg

3.3 转换器-数据模型配置

数据模型的对接是通过定期器周期性调用函数,让函数将服务端侧的数据写入数据模型,所以需要配置一个调用时间周期image.jpg

3.4 转换器-手工触发

脚本上传后,可以对脚本进行测试或者手工的触发临时发起

  • 手工触发服务模型,需要在请求参数参数中输入对应的入参json

  • 手工触发数据模型,选择触发的时间段image.jpg

3.5 转换器-绑定至项目

进入具体项目,在项目功能中选择数据接入,并添加转换器image.jpg

3.6 转换器-启动转换器

转换器进入正式运行状态,需要人工启动任务,任务启动后数据模型的定时任务将打开,服务模型可以被项目中其他应用调用image.jpg