脚本使用指南

在数据集成作业中,输入、转换、输出节点都可以通过自定义脚本来实现功能。

支持的语法

脚本使用JavaScript语言进行编写,语法上支持ES5以及ES6的const和let语法。

ES5的语法及常见API可以参考地址: http://www.ecma-international.org/ecma-262/5.1/index.html

ES6语法

  • const

    • const声明一个只读的常量,一旦声明,常量的值就就不能改变;

    • const的作用域和let一样,只在声明所在的块级作用域内有效。

  • let

    • 在ES5中,只有两种作用域,一种是函数作用域,另一种是全局作用域,这个导致在写脚本的时候容易出现变量覆盖的情况;

    • 所以ES6增加了let语法,用于声明变量,用法类似于var,但是所声明的变量只在let所在的代码块内有效;

    • 建议在脚本开发的时候函数内部尽量使用let来声明变量,全局的采用var。

基础扩展API

我们对标准的JavaScript API进行了相应的扩展,以提供访问数据库,调用IoT服务模型, 同步数据到主数据的能力。

如下:

1. 数据库操作

说明:数据库操作API的使用依赖标准扩展插件包

var iotPluginApi = require('./iotPlugin.js'); //(默认已经引用)

1.1 SQL查询接口 - iotSqlSelect

API: iotSqlSelect(statement)

用途:用于在JS脚本中执行sql select语句

参数

类型

说明

statement

String

需要执行的select sql语句,底层已经适配了数据库,所以这里只需要填写对应的sql语句

返回值

参数

类型

说明

code

Integer

200代表sql语句执行成功

其他代表sql执执行失败,

在使用时需要判断该返回值

data

RowValueObject[]

其中RowValueObject的类型为:

Map<String,Object>;

一个数组对象,数组成员为一个Map<String,Object>对象。

对应sql select出来的每一行内容

Map的Key为列名,Object为该列对应的值。

使用的时候需要判断该数组的长度。

示例:

[

{"key1":1,"key2":2},{"key1":3,"key2":4}

]

使用示例

let sqlselect = "select num, name from dbo.TestTable where num < 10";
let sqlResult = iotPluginApi.iotSqlSelect(sqlselect);
if (sqlResult.code == 200 && sqlResult.data.length > 0 ) {
    var packingList = new Array();
    for (var i = 0; i < sqlResult.data.length; i++) {
        var itemData = new Map();
    //每一行的数据
        itemData["id"] = (sqlResult.data)[i]["num"];
        itemData["name"] = (sqlResult.data)[i]["name];
  }    
}

1.2 SQL执行接口 - iotSqlExecute

API: iotSqlExecute(statement)

用途:用于在JS脚本中执行sql update 或者 delete语句

参数

类型

说明

statement

String

需要执行的update或delete sql语句,底层已经适配了数据库,所以这里只需要填写对应的sql语句

返回值

参数

类型

说明

code

Integer

200代表sql语句执行成功

其他代表sql执执行失败,

在使用时需要判断该返回值

data

Integer

影响的行数

使用示例

let sqlUpdate = 'update dbo.TestTable set name = "abc" where num = 1';
let sqlResult = iotPluginApi.iotSqlExecute(sqlUpdate);
if (sqlResult.code == 200) {
    console.log(sqlResult.data);
}

1.3 事务处理接口 - iotGetTransaction

API: iotGetTransaction()

用途:用于在JS脚本中创建一个事务

参数

类型

说明

返回值:事务对象transaction

方法

作用

说明

transaction.select(statement)

查询数据

参考iotSqlSelect的调用方式

transaction.execute(statement)

修改或删除数据

参考iotSqlExecute的调用方式

transaction.commit()

事务提交

transaction.rollback()

事务回滚

使用示例

    let transaction = iotPluginApi.iotGetTransaction();

    let sql1 = "INSERT INTO dbo.TB1(id,name,remark) VALUES("1,'XXX1','YYY1')";
    let sql2 = "INSERT INTO dbo.TB2(id,name,remark) VALUES("1,'XXX2','YYY2')";

    try {
        transaction.execute(sql1);
        transaction.execute(sql2);
    //事务提交
        let r = transaction.commit();
        console.log("commit:" + r);

        return iotPluginApi.newIoTxResult(200, "OK", r);
    } catch (e) {
        console.log("rollback:" + e);
    //事务回滚
        transaction.rollback();
        return iotPluginApi.newIoTxResult(500, "rollback");
    }

1.4 数据快照接口 - newDbSnapshot

API: newDbSnapshot()

用途:一般用于【数据库查询】输入节点的脚本编辑

参数

类型

说明

statement

String

需要执行的select sql语句,底层已经适配了数据库。所以这里只需要填写对应的sql语句

返回值:快照对象snapshot

使用示例

function snapshot(snapshotType, context) {
    if (snapshotType === 'increment') {
        //扫描整张表来计算增量
        return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.TB");
  }
}

2. API操作

说明:API操作的使用依赖HTTP扩展插件包

var iotPluginApi = require('./httpPlugin.js'); //(默认已经引用)

2.1 HTTP GET

API: httpGet()

用途:一般用于【API查询】输入节点或者【API依赖】输出节点的脚本编辑

参数

类型

说明

apiPath

String

HTTP路径,例如 /api/test

headers

Map<String,String>

需要扩展的HTTP Header

返回值:

参数

类型

说明

code

Integer

200代表HTTP请求成功

其他代表HTTP请求失败,

在使用时需要判断该返回值

data

String

返回的HTTP BODY

使用示例

let response = httpPlugin.httpGet('/api/test');
if (response.code === 200) {
    //这里需要对response.data的数据处理
    console.log(response.data);
}

2.2 HTTP POST

API: httpPost()

用途: 一般用于【API查询】输入节点或者【API依赖】输出节点的脚本编辑

参数

类型

说明

apiPath

String

HTTP路径,例如 /api/test

jsonBody

String

POST的JSON数据

headers

Map<String,String>

需要扩展的HTTP Header

返回值:

参数

类型

说明

code

Integer

200代表HTTP请求成功

其他代表HTTP请求失败,

在使用时需要判断该返回值

data

String

返回的HTTP BODY

使用示例

let response = httpPlugin.httpPostJson('/api/test', '{"name":"abc"}');
if (response.code === 200) {
    //这里需要对response.data的数据处理
    console.log(response.data);
}

3. 日志打印

3.1 API: log(format,..args)

用途:格式化日志打印

参数

类型

说明

format

String

支持%s,%d,%c,%f等通用的格式转换符号

args

Object...

需要打印的参数,可以为多个

返回值

参数

类型

说明

使用示例

var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
   iotPluginApi.log("toJSONString %s",utils.toJSONString(msgPayLoadList));
}

3.2 API: log(object)

用途: 日志打印

参数

类型

说明

object

Object

需要打印的对象,可以为String,也可以为其他类型,如果是其他类型,则默认转成相应的String打印出来

返回值

参数

类型

说明

使用示例

var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
    iotPluginApi.log(eventType);
  iotPluginApi.log(msgPayLoadList);
  iotPluginApi.log('this is msgPlayLoad:' +msgPayLoadList);
}

4. 触发报警通知

API: triggerAlarm(param)

用途: 用于在JS脚本中触发一条报警消息到云端的数字工厂,可以指定接收报警的人员,角色和组织。

参数

类型

说明

param

Map<String,Object>

param为一个Map结构

param的具体定义如下

level

Integer

报警级别

0:提醒

1: 故障

2:报警

receiverList

List<String>

消息接收者的code码集合。

如果receiverType为role,则为角色码,如果receiverType为user,则为identityId,如果receiverType为organization,则为组织ID。

receiverType

String

role:角色

user:人员

organization:组织

说明:管理员的特殊角色码为ADMINISTRATOR

title

String

报警的标题

content

String

报警的内容

参数

类型

说明

code

Integer

200代表执行成功

其他代表执行失败,

在使用时需要判断该返回值

data

Object

无返回,为null

使用示例

var iotPluginApi = require('./iotPlugin.js');
var Map = Java.type('java.util.HashMap');

function transform(eventType,msgPayLoadList){
   var transformResultList = new Array();
   var receveiverList = ['ADMINISTRATOR'];

  // 这里填入报警的相关参数
   let triggerParam = new Map();
   triggerParam.put("level",1);
   triggerParam.put("receiverList",receveiverList);
   triggerParam.put("receiverType","role")
   triggerParam.put("title","边缘数据集成报警内容自定义标题");
   triggerParam.put('content',"边缘数据集成报警内容:产线异常");
  
  // 这里为调用扩展API触发报警
   iotPluginApi.triggerAlarm(triggerParam);
  
     return transformResultList;
}

5. 获取设备的属性

API: cloudThingPropertiesGet(param)

用途:获取物联网设备的属性

入参列表

入参名称

数据类型

是否必须

入参描述

param

Map<String,Object>

param

param

param的具体定义如下

iotId

字符串

设备ID,生活物联网平台为设备颁发的ID,设备的唯一标识符。productKey和deviceName为空时,该入参不允许为空。

productKey

字符串

产品的Key,设备证书信息之一。创建产品时,生活物联网平台为该产品颁发的全局唯一标识。当iotId为空时,该参数不允许为空。

deviceName

字符串

设备的名称,设备证书信息之一。在注册设备时,自定义的或系统生成的设备名称,具备产品维度内的唯一性。当iotId为空时,该参数不允许为空。

返回值

参数

类型

说明

code

Integer

200代表执行成功

其他代表执行失败,

在使用时需要判断该返回值

data

Object

无返回,为null

使用示例

var iotPluginApi = require('./iotPlugin.js');
var Map = Java.type('java.util.HashMap');

function transform(eventType,msgPayLoadList){
  let getParam = new Map();
  getParam.put("productKey","a17nEF9qLYJ");
  getParam.put("deviceName","mIwfRzXY9wLpRFz5RfJJ")
  let result = iotPluginApi.cloudThingPropertiesGet(getParam);

  if (result.code == 200) {
      iotPluginApi.log("获取物的属性成功")
      iotPluginApi.log("属性值为 %s",utils.toJSONString(result.data));
  }
  return;
}

6. 设置设备的属性

API: cloudThingPropertiesSet(param)

入参列表

入参名称

数据类型

是否必须

入参示例

入参描述

iotId

字符串

设备ID,物联网平台为设备颁发的ID,设备的唯一标识符。productKey和deviceName为空时,该入参不允许为空

items

JSON

{}

set参数

productKey

字符串

产品的Key,设备证书信息之一。创建产品时,生活物联网平台为该产品颁发的全局唯一标识。当iotId为空时,该参数不允许为空

deviceName

字符串

设备的名称,设备证书信息之一。在注册设备时,自定义的或系统生成的设备名称,具备产品维度内的唯一性。当iotId为空时,该参数不允许为空

返回值

参数

类型

说明

code

Integer

200代表执行成功

其他代表执行失败,

在使用时需要判断该返回值

data

Object

无返回,为null

使用示例

var iotPluginApi = require('./iotPlugin.js');
var Map = Java.type('java.util.HashMap');

function transform(eventType,msgPayLoadList){
  let param = new Map();
  let serviceParam = new Map();
  param.put("productKey","a17nEF9qLYJ");
  param.put("deviceName","mIwfRzXY9wLpRFz5RfJJ");
  serviceParam.put("wendu",1);
  serviceParam.put("shidu",2);
  param.put("items",serviceParam);
  let result = iotPluginApi.cloudThingPropertiesSet(param);

  if (result.code == 200) {
      iotPluginApi.log("设置物的属性成功")
  }
  return;
}

7. 调用设备的服务

API: cloudThingServiceInvoke

入参列表

入参名称

数据类型

是否必须

入参示例

入参描述

iotId

字符串

xxxxxxxx

设备ID,物联网平台为设备颁发的ID,设备的唯一标识符。productKey和deviceName为空时,该入参不允许为空

identifier

字符串

xxxxxxxx

服务的标识符

args

JSON

{}

服务入参

productKey

字符串

产品的Key,设备证书信息之一。创建产品时,生活物联网平台为该产品颁发的全局唯一标识。当iotId为空时,该参数不允许为空

deviceName

字符串

设备的名称,设备证书信息之一。在注册设备时,自定义的或系统生成的设备名称,具备产品维度内的唯一性。当iotId为空时,该参数不允许为空

返回值

参数

类型

说明

code

Integer

200代表执行成功

其他代表执行失败,

在使用时需要判断该返回值

data

Object

无返回,为null

使用示例

var iotPluginApi = require('./iotPlugin.js');
var Map = Java.type('java.util.HashMap');

function transform(eventType,msgPayLoadList){
  let param = new Map();
  let args =new Map();
  param.put("productKey","a17nEF9qLYJ");
  param.put("deviceName","mIwfRzXY9wLpRFz5RfJJ");
  param.put("identifier","start");
  args.put("param1",1);
  args.put("param2",2);
  param.put("args",args);
  let result = iotPluginApi.cloudThingServiceInvoke(param);

  if (result.code == 200) {
      iotPluginApi.log("调用物的服务成功")
      iotPluginApi.log("返回值为 %s",utils.toJSONString(result.data));
  }
  return;
}

8. 工具函数utils

8.1 API: sleep(ms)

用途: 用于在JS脚本中休眠sleep

参数

类型

说明

ms

Integer

需要休眠的毫秒数字

返回值

参数

类型

说明

使用示例

//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');

function transform(eventType,msgPayLoadList){
    utils.sleep(1000);
    iotPluginApi.log("sleep 1 seconds");
}

8.2 API: toJSONString(param);

用途: 用于在JS中将一个JS对象转成JSON字符串5.1 API: sleep(ms);

参数

类型

说明

param

Object

需要转换的JS对象,比如为Map,List可以JSON序列化的对象

返回值

参数

类型

说明

String

返回一个JSON字符串

使用示例

//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');

function transform(eventType,msgPayLoadList){
     iotPluginApi.log("toJSONString %s",utils.toJSONString(msgPayLoadList));
}

8.3 全局变量

读取、存储、删除全局变量,在JS中调用。

方法

说明

utils.getVar(key)

读取一个变量

utils.putVar(key)

存储一个变量

utils.removeVar(key)

删除一个变量

8.4 API: 时间格式化

(1)formatDate(timestamp, unitName, pattern)

参数

类型

说明

timestamp

Long

时间戳

unitName

String

时间单位

NANOSECONDS - 纳秒

MICROSECONDS - 微秒

MILLISECONDS - 毫秒

SECONDS - 秒

DAYS - 天

pattern

String

yyyy-MM-dd hh:mm:ss

返回值

参数

类型

说明

String

格式化时间字符串

返回值

//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');

function transform(eventType,msgPayLoadList){
  masterData["DatetimeX"] = utils.formatDate(msgPayload["DateTimeX"], "NANOSECONDS", "yyyy-MM-dd HH:mm:ss");
  masterData["TimestampX"] = utils.formatDate(msgPayload["TsX"], "NANOSECONDS", "yyyy-MM-dd HH:mm:ss");
  masterData["DateX"] = utils.formatDate(msgPayload["DateX"], "DAYS", "yyyy-MM-dd");
  masterData["TimeX"] = utils.formatDate(msgPayload["TimeX"], "MICROSECONDS", "HH:mm:ss");
}

(2)formatDate(datetime, pattern)

参数

类型

说明

timestamp

Long

时间戳

pattern

String

yyyy-MM-dd hh:mm:ss

返回值

参数

类型

说明

String

格式化时间字符串

返回值

//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');

function transform(eventType,msgPayLoadList){
  let datestr = utils.formatDate(datetime, "yyyy-MM-dd HH:mm:ss");
}

9 环境变量

环境变量在输入/输出/转换节点的脚本函数中,都用一个context记录作业的上下文,context.envConfig.get("参数名") 可以读取到边缘数据集成中已经配置的参数值。

10 JS插件

写好一个插件,上传到边缘数据集成配置中,可以被这个配置中的任务一个作业节点脚本引用。

典型例子:写好一个API登录检测的包,在输入输出节点脚本中调用。

var httpPlugin = require('./../httpPlugin.js');

var token = null;
//上次使用token的时间
var token_time = 0;

function getToken(name, password) {
 if (new Date().getTime() - token_time > 1000*30) {
  //模拟获取Token的过程
  let response = httpPlugin.httpPostJson("/api/getToken", {"name":name, "pwd": password});
  if (response.code === 200) {
   token = response.data;
  } else {
   console.log("GetToken error:" + response.code);
      return null;
  }
 }
 token_time = new Date().getTime();
 return token;
}

module.exports.getToken = getToken;

常见问题

  1. JS的变量提升问题

说明:在脚本中经常需要在函数中实现较多的转换逻辑,而这里需要注意的是JavaScript(ES5)中的var变量的作用域跟Java和C的作用域不同,否则会出现函数中的变量被覆盖的情况,导致各种问题,例如死循环。

在大多数编程语言中,会用花括号{}来形成一个作用域,俗称“块作用域”,例如C语言、C++等。但是在JS中{}并不能产生块作用域,JS中的作用域是依靠函数形成的。

在ECMAScript5中,JS只有两类作用域:全局作用域、函数作用域。

  • 全局作用域:全局对象的作用域,在代码的任何地方都可访问,但有时会被函数作用域覆盖

  • 函数作用域:作用于整个函数范围内,不管到底是在函数中的何处进行声明

在最新的数字工厂脚本引擎版本中,已经支持了ES6的let和const语法,建议在编写脚本的时候使用let和const以避免该问题的产生。

调试方法

1. 调试转换节点的脚本函数

在创建一个数据集成任务后,需要在作业任务的转换节点编写转换函数(即transform函数,默认有参考代码),

在编写完该函数后,可以通过mock数据的方法来运行该函数,并进行日志打印和结果输出。

步骤

1) 登录oc.supet.com,进入平台管理-》边缘计算-》选择一个数据集成任务-》编辑-》任务列表-》

点击转换节点的“配置”按钮;

image.png

mock参数说明:mock参数是在点击调试的时候,transform函数的入参:msgPayLoad 其类型为一个Map<String,Object>类型。

2. 查看JS的调试日志

在调试阶段,可以在云端直接针对某个集成任务进行实时调试,即脚本在边缘运行时,日志可以实时输出到云端的调试窗口中,该功能主要帮助在开发、调试阶段查看脚本的运行结果和日志情况。

2.1 在边缘数据集成中选择一个需要调试的任务,点击调试。

image.png

2.2 通过修改数据库或者其他方法触发数据变化,然后查看脚本的日志打印

3. 查看JS的运行日志

当调试完一个脚本后,需要将该脚本正式下发到边缘应用,以正式数据集成。

数据集成时可能会产生全量数据,增量数据,进而出发脚本运行,如果需要查看运行中的JS日志,可以通过如下方法查看:

进入边缘数据集成-》点击查看日志

输入阿里云账号和密码:

image.png

打开SLS日志窗口后,输入关键字“JsScriptLog”进行搜索查看。

image.png

作业节点脚本函数说明

1. 输入节点

输入节点类型是“数据库增量”、“服务提供”、“API上传”的情况不需要编写脚本。

1.1 数据库查询

方法一:获取整张表的全量数据,增量数据由系统计算两次SELECT之间的差别。

/**
 *
 * @param snapshotType full=全量同步,increment=增量同步,retry=重传数据
 * @param context
 *     context.snapshotOffset:记录目前偏移值,类型是数字、文本、MAP
 *     context.snapshotDiffDelete:是否计算删除增量
 *     context.snapshotRetryKeys:重传的唯一标识,Map类型,只有在snapshotType=retry才有值
 */
function snapshot(snapshotType, context) {
        //扫描整张表读取全量数据
        return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.BG_Item");
}

方法二:完整的实现全量、增量、重传

/**
 *
 * @param snapshotType full=全量同步,increment=增量同步,retry=重传数据
 * @param context
 *     context.snapshotOffset:记录目前偏移值,类型是数字、文本、MAP
 *     context.snapshotDiffDelete:是否计算删除增量
 *     context.snapshotRetryKeys:重传的唯一标识,Map类型,只有在snapshotType=retry才有值
 */
function snapshot(snapshotType, context) {
    if (snapshotType === 'increment') {
        //扫描整张表来计算增量
        return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.BG_Item");

        /* 高级用法:
        //自定义增量
        if (context.snapshotOffset == null) {
            context.snapshotOffset = utils.currentTimeMillis();
        }
        let sql = "SELECT * FROM dbo.TB WHERE timestamp > " + context.snapshotOffset;
        context.snapshotOffset = utils.currentTimeMillis();
        context.snapshotDiffDelete = false;
    return iotPluginApi.newDbSnapshot(sql);
         */
    } else if (snapshotType === 'retry') {
        //失败数据重传
        return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.BG_Item WHERE cItemCode = " + context.snapshotRetryKeys['cItemCode']);
    } else {
        //扫描整张表读取全量数据
        return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.BG_Item");
    }
}

1.2 API查询

说明:从一个API获取增量数据,不实现全量和重传功能。

/**
 *
 * @param snapshotType full=全量同步,increment=增量同步,retry=重传数据
 * @param context
 *       context.path: api路径
 *     context.snapshotOffset:记录目前偏移值,类型是数字、文本、MAP
 *     context.snapshotDiffDelete:是否计算删除增量
 *     context.snapshotMore:如果全量太多数据,需要分页,设置这个参数是true
 *     context.snapshotRetryKeys:重传的唯一标识,Map类型,只有在snapshotType=retry才有值
 */
function snapshot(snapshotType, context) {
    if (snapshotType === 'increment') { //重传,不实现无法拉增量数据
        if (context.snapshotOffset == null) {
            context.snapshotOffset = utils.currentTimeMillis(); //从当前时间戳开始
        }

        //构造请求参数
        let params = {'timestamp': context.snapshotOffset};

        //发起HTTP请求,也支持httpGet
        let body = httpPlugin.httpPostJson(context.path, utils.toJSONString(params)); //发起HTTP请求

        if (body.code !== 200) {
            return null;
        }
        //处理返回结果
        let records = iotPluginApi.newList();
        let record = JSON.parse(body.data);
        records.add(record);
        //records.add(xxx);添加更多数据

        //不需要计算增量删除
        context.snapshotDiffDelete = false;

        //更新记录
        context.snapshotOffset = utils.currentTimeMillis();

        return records;
    } else if (snapshotType === 'full') { //全量同步,不实现无法拉取全量数据
        //参考增量实现方法
        //如果需要分页:可以用context.snapshotOffset记录位置
        //           同时设置context.snapshotMore=true
        return null;
    } else if (snapshotType === 'retry') { //重传,不实现无法重传数据
        //参考增量实现方法
        //重传的标识符从context.snapshotRetryKeys读取
        return null;
    }
}

2. 转换节点

说明:转换输入节点采集到的原始数据到目标数据格式。

/**
 * 客户实现: transform()转换函数
 * 这是脚本转换器的第一个转换函数,负责将 数据来源(如数据库变换的数据) 转换成 目标数据格式,如主数据,服务模型数据,物联网数据等
 * 如下的代码是一个示例:仅供参考,需要根据您的实际业务场景转换逻辑进行编写
 * 示例代码是数据源的数据转换成iot工业数字工厂的人员主数据格式
 * @param eventType为事件类型,对于数据库事件有 insert,update,delete
 * @param msgPayload 为需要转换的目标数据,注意其格式是一个Map,对应Java的格式为Map<String, Object>
 * @context 系统上下文参数
 * @return 返回转换后的数据格式,必须也为一个Map,即数据输入为Map,转换格式后还是为Map
 */
function transform(eventType, msgPayload, context) {
    let masterData = new iotPluginApi.newMap();
  //在这里实现数据转换逻辑
    masterData["num"] = msgPayload["id"];
    masterData["name"] = msgPayload["name"];
  
  //这里也可以调用数据库操作API来查询数据

    return masterData;
}

转换节点如果需要调用【数据库操作】或者【API操作】,默认访问的数据源是输入数据源。

3. 输出节点

输出节点类似是“主数据 ”、“服务依赖”的情况不需要编写脚本。

3.1 数据库回写

不需要事务支持。

/**
 * 客户实现: distribute()分发函数
 * 负责将 transform转换后的数据写入边缘数据库
 * @param eventType 为事件类型,对于数据库事件有 insert,update,delete
 * @param transformData transform转换后的数据,是一个Map
 * @param context
 */
function distribute(eventType, transformData, context) {
    let sql = "INSERT INTO dbo.TB2(id,name,remark) VALUES(" + transformData['id'] + ",'XXX','YYY')";

    //写入边缘数据库
    let ret = iotPluginApi.iotSqlExecute(sql);
    //在这里构造服务模型返回数据
    if (ret.code === 200) {
        //成功
    } else {
        //失败
    }
    return ret;
}

方法二:事务支持。

/**
 * 客户实现: distribute()分发函数
 * 负责将 transform转换后的数据写入边缘数据库
 * @param eventType 为事件类型,对于数据库事件有 insert,update,delete
 * @param transformData transform转换后的数据,是一个Map
 * @param context
 */
function distribute(eventType, transformData, context) {
    let transaction = iotPluginApi.iotGetTransaction();

    let sql1 = "INSERT INTO dbo.TB1(id,name,remark) VALUES(" + transformData['id'] + ",'XXX','YYY')";
    let sql2 = "INSERT INTO dbo.TB2(id,name,remark) VALUES(" + transformData['id'] + ",'XXX','YYY')";

    try {
        transaction.execute(sql1);
        transaction.execute(sql2);
        let r = transaction.commit();
        console.log("commit:" + r);

        return iotPluginApi.newIoTxResult(200, "OK", r);
    } catch (e) {
        console.log("rollback:" + e);
        transaction.rollback();
        return iotPluginApi.newIoTxResult(500, "rollback");
    }
}

3.2 调用API

说明:发起HTTP请求并分析响应结果。

/**
 * 客户实现: distribute()分发函数
 * 负责将 transform转换后的返回数据通过API方式上报
 * @param eventType 为事件类型,对于数据库事件有 insert,update,delete
 * @param transformData transform转换后的数据,是一个Map
 * @param context
 *     context.path:传入来的API路径
 */
function distribute(eventType, transformData, context) {
    //发起一个HTTP请求,
    let data = httpPlugin.httpPostJson(context.path, utils.toJSONString(transformData));
    console.log("HTTP返回数据:" + data);
    //如果返回是一个JSON结构
    let jsonMap = JSON.parse(data.data);
    //返回结果
    return iotPluginApi.newIoTxResult(200, "success", jsonMap);
}

3.3 告警

说明:输出节点为告警的场景适用于监听数据变化后触发通知,注意这里的报警需要提前在数字工厂->通知规则中进行报警定义,并定义报警的标题和内容。报警触发后,会在数字工厂的报警弹窗和钉钉中通知。

/**
 * 客户实现: distribute()分发函数,用于输出节点
 * 如下的代码是一个示例:仅供参考,需要根据您的实际业务场景转换逻辑进行编写
 * @param eventType为事件类型,对于数据库事件有 insert,update,delete
 * @param transformData 这是转换节点的transform()函数返回的内容,即转换后的数据格式
 * @context 系统上下文参数
 * @return
 */
function distribute(eventType, transformData, context) {
   //提示:可以在这里对transformData进行判断,符合触发条件的才走下面的告警逻辑
   iotPluginApi.log('告警');
   let a = iotPluginApi.triggerCloudAlarm('报警的内容写在这里');
   return iotPluginApi.newIoTxResult(200, "success", null);
}

3.4 物联网设备服务

说明:需要在IoT的物联网设备上为该设备创建一个服务,并在输出时选择设备及需要调用的设备的服务。

/**
 * 客户实现: distribute()分发函数,用于输出节点为设备物模型服务调用
 * 如下的代码是一个示例:仅供参考,需要根据您的实际业务场景转换逻辑进行编写
 * @param eventType为事件类型,对于数据库事件有 insert,update,delete
 * @param transformData 这是转换节点的transform()函数返回的内容,即转换后的数据格式
 * @context 系统上下文参数
 * @return
 */
function distribute(eventType, transformData, context) {
    //注意:triggerDeviceServiceCall的参数为一个JSON结构,即设备服务调用的参数
    //如果该服务未设置任何参数,这传入空的json对象接口。
    //也可以在这里进行if判断,对数据进行过滤,只有符合条件对才触发设备调用
   let a = iotPluginApi.triggerDeviceServiceCall({});
   return a;
}

3.5 自定义脚本

说明:自定义脚本为用户自定义的输出逻辑,用户可以在该脚本中调用八爪鱼脚本的API函数实现自己的逻辑。注:该distribute函数需要返回IoTxResult 200代表函数调用成功。

/**
 * 客户实现: distribute()分发函数,用于输出节点
 * 如下的代码是一个示例:仅供参考,需要根据您的实际业务场景转换逻辑进行编写
 * @param eventType为事件类型,对于数据库事件有 insert,update,delete
 * @param transformData 这是转换节点的transform()函数返回的内容,即转换后的数据格式
 * @context 系统上下文参数
 * @return
 */
function distribute(eventType, transformData, context) {
   //这是一个自定义的脚本,您可以在这里通过调用数据集成的脚本API实现自定义的输出逻辑
   //这个示例脚本仅仅只是打印了一条语句,用于参考
   iotPluginApi.log('this is user-defined script');
   return iotPluginApi.newIoTxResult(200, "success", null);
}

4. 使用步骤

下面介绍如何使用脚本的几个步骤。

4.1. 创建数据集成和开发作业任务

创建一个数据集成,然后在该数据中创建一个作业任务,同时完成输入节点、转换节点、输出节点的配置和脚本编写。

转换节点必须实现脚本,其他的输入节点、输出节点根据不同的类型有些需要配置,

有些需要进行脚本编写,请根据实际业务场景进行配置。

4.2. 调试脚本

在完成脚本编写后,通过Mock参数和调试的方法进行脚本调试,查看调试的返回值确认脚本是否编写正确。

4.3. 下载配置

脚本编写和调试完成后,进行数据集成页面的编辑页面,点击“下载配置”,将脚本下载到边缘应用。

4.4. 查看数据集成结果

查看主数据/服务模型/API等输出节点的同步结果。

下载成功后,查看主数据的数据,或者服务模型的数据以判断是否同步成功。

也可以通过在边缘的数据库进行数据库表的修改触发增量数据,进而脚本转换查看同步的结果。

阿里云首页 工业互联网平台 相关技术圈