脚本使用指南
在数据集成作业中,输入、转换、输出节点都可以通过自定义脚本来实现功能。
支持的语法
脚本使用JavaScript语言进行编写,语法上支持ES5以及ES6的const和let语法。
ES5的语法及常见API可以参考地址: http://www.ecma-international.org/ecma-262/5.1/index.html
ES6语法
const
const声明一个只读的常量,一旦声明,常量的值就就不能改变;
const的作用域和let一样,只在声明所在的块级作用域内有效。
let
在ES5中,只有两种作用域,一种是函数作用域,另一种是全局作用域,这个导致在写脚本的时候容易出现变量覆盖的情况;
所以ES6增加了let语法,用于声明变量,用法类似于var,但是所声明的变量只在let所在的代码块内有效;
建议在脚本开发的时候函数内部尽量使用let来声明变量,全局的采用var。
基础扩展API
我们对标准的JavaScript API进行了相应的扩展,以提供访问数据库,调用IoT服务模型, 同步数据到主数据的能力。
如下:
1. 数据库操作
说明:数据库操作API的使用依赖标准扩展插件包
var iotPluginApi = require('./iotPlugin.js'); //(默认已经引用)
1.1 SQL查询接口 - iotSqlSelect
API: iotSqlSelect(statement)
用途:用于在JS脚本中执行sql select语句
参数 | 类型 | 说明 |
statement | String | 需要执行的select sql语句,底层已经适配了数据库,所以这里只需要填写对应的sql语句 |
返回值
参数 | 类型 | 说明 |
code | Integer | 200代表sql语句执行成功 其他代表sql执执行失败, 在使用时需要判断该返回值 |
data | RowValueObject[] 其中RowValueObject的类型为: Map<String,Object>; | 一个数组对象,数组成员为一个Map<String,Object>对象。 对应sql select出来的每一行内容 Map的Key为列名,Object为该列对应的值。 使用的时候需要判断该数组的长度。 示例: [ {"key1":1,"key2":2},{"key1":3,"key2":4} ] |
使用示例
let sqlselect = "select num, name from dbo.TestTable where num < 10";
let sqlResult = iotPluginApi.iotSqlSelect(sqlselect);
if (sqlResult.code == 200 && sqlResult.data.length > 0 ) {
var packingList = new Array();
for (var i = 0; i < sqlResult.data.length; i++) {
var itemData = new Map();
//每一行的数据
itemData["id"] = (sqlResult.data)[i]["num"];
itemData["name"] = (sqlResult.data)[i]["name];
}
}
1.2 SQL执行接口 - iotSqlExecute
API: iotSqlExecute(statement)
用途:用于在JS脚本中执行sql update 或者 delete语句
参数 | 类型 | 说明 |
statement | String | 需要执行的update或delete sql语句,底层已经适配了数据库,所以这里只需要填写对应的sql语句 |
返回值
参数 | 类型 | 说明 |
code | Integer | 200代表sql语句执行成功 其他代表sql执执行失败, 在使用时需要判断该返回值 |
data | Integer | 影响的行数 |
使用示例
let sqlUpdate = 'update dbo.TestTable set name = "abc" where num = 1';
let sqlResult = iotPluginApi.iotSqlExecute(sqlUpdate);
if (sqlResult.code == 200) {
console.log(sqlResult.data);
}
1.3 事务处理接口 - iotGetTransaction
API: iotGetTransaction()
用途:用于在JS脚本中创建一个事务
参数 | 类型 | 说明 |
返回值:事务对象transaction
方法 | 作用 | 说明 |
transaction.select(statement) | 查询数据 | 参考iotSqlSelect的调用方式 |
transaction.execute(statement) | 修改或删除数据 | 参考iotSqlExecute的调用方式 |
transaction.commit() | 事务提交 | |
transaction.rollback() | 事务回滚 |
使用示例
let transaction = iotPluginApi.iotGetTransaction();
let sql1 = "INSERT INTO dbo.TB1(id,name,remark) VALUES("1,'XXX1','YYY1')";
let sql2 = "INSERT INTO dbo.TB2(id,name,remark) VALUES("1,'XXX2','YYY2')";
try {
transaction.execute(sql1);
transaction.execute(sql2);
//事务提交
let r = transaction.commit();
console.log("commit:" + r);
return iotPluginApi.newIoTxResult(200, "OK", r);
} catch (e) {
console.log("rollback:" + e);
//事务回滚
transaction.rollback();
return iotPluginApi.newIoTxResult(500, "rollback");
}
1.4 数据快照接口 - newDbSnapshot
API: newDbSnapshot()
用途:一般用于【数据库查询】输入节点的脚本编辑
参数 | 类型 | 说明 |
statement | String | 需要执行的select sql语句,底层已经适配了数据库。所以这里只需要填写对应的sql语句 |
返回值:快照对象snapshot
使用示例
function snapshot(snapshotType, context) {
if (snapshotType === 'increment') {
//扫描整张表来计算增量
return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.TB");
}
}
2. API操作
说明:API操作的使用依赖HTTP扩展插件包
var iotPluginApi = require('./httpPlugin.js'); //(默认已经引用)
2.1 HTTP GET
API: httpGet()
用途:一般用于【API查询】输入节点或者【API依赖】输出节点的脚本编辑
参数 | 类型 | 说明 |
apiPath | String | HTTP路径,例如 /api/test |
headers | Map<String,String> | 需要扩展的HTTP Header |
返回值:
参数 | 类型 | 说明 |
code | Integer | 200代表HTTP请求成功 其他代表HTTP请求失败, 在使用时需要判断该返回值 |
data | String | 返回的HTTP BODY |
使用示例
let response = httpPlugin.httpGet('/api/test');
if (response.code === 200) {
//这里需要对response.data的数据处理
console.log(response.data);
}
2.2 HTTP POST
API: httpPost()
用途: 一般用于【API查询】输入节点或者【API依赖】输出节点的脚本编辑
参数 | 类型 | 说明 |
apiPath | String | HTTP路径,例如 /api/test |
jsonBody | String | POST的JSON数据 |
headers | Map<String,String> | 需要扩展的HTTP Header |
返回值:
参数 | 类型 | 说明 |
code | Integer | 200代表HTTP请求成功 其他代表HTTP请求失败, 在使用时需要判断该返回值 |
data | String | 返回的HTTP BODY |
使用示例
let response = httpPlugin.httpPostJson('/api/test', '{"name":"abc"}');
if (response.code === 200) {
//这里需要对response.data的数据处理
console.log(response.data);
}
3. 日志打印
3.1 API: log(format,..args)
用途:格式化日志打印
参数 | 类型 | 说明 |
format | String | 支持%s,%d,%c,%f等通用的格式转换符号 |
args | Object... | 需要打印的参数,可以为多个 |
返回值
参数 | 类型 | 说明 |
无 |
使用示例
var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
iotPluginApi.log("toJSONString %s",utils.toJSONString(msgPayLoadList));
}
3.2 API: log(object)
用途: 日志打印
参数 | 类型 | 说明 |
object | Object | 需要打印的对象,可以为String,也可以为其他类型,如果是其他类型,则默认转成相应的String打印出来 |
返回值
参数 | 类型 | 说明 |
无 |
使用示例
var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
iotPluginApi.log(eventType);
iotPluginApi.log(msgPayLoadList);
iotPluginApi.log('this is msgPlayLoad:' +msgPayLoadList);
}
4. 触发报警通知
API: triggerAlarm(param)
用途: 用于在JS脚本中触发一条报警消息到云端的数字工厂,可以指定接收报警的人员,角色和组织。
参数 | 类型 | 说明 |
param | Map<String,Object> | param为一个Map结构 |
param的具体定义如下 | ||
level | Integer | 报警级别 0:提醒 1: 故障 2:报警 |
receiverList | List<String> | 消息接收者的code码集合。 如果receiverType为role,则为角色码,如果receiverType为user,则为identityId,如果receiverType为organization,则为组织ID。 |
receiverType | String | role:角色 user:人员 organization:组织 说明:管理员的特殊角色码为ADMINISTRATOR |
title | String | 报警的标题 |
content | String | 报警的内容 |
参数 | 类型 | 说明 |
code | Integer | 200代表执行成功 其他代表执行失败, 在使用时需要判断该返回值 |
data | Object | 无返回,为null |
使用示例
var iotPluginApi = require('./iotPlugin.js');
var Map = Java.type('java.util.HashMap');
function transform(eventType,msgPayLoadList){
var transformResultList = new Array();
var receveiverList = ['ADMINISTRATOR'];
// 这里填入报警的相关参数
let triggerParam = new Map();
triggerParam.put("level",1);
triggerParam.put("receiverList",receveiverList);
triggerParam.put("receiverType","role")
triggerParam.put("title","边缘数据集成报警内容自定义标题");
triggerParam.put('content',"边缘数据集成报警内容:产线异常");
// 这里为调用扩展API触发报警
iotPluginApi.triggerAlarm(triggerParam);
return transformResultList;
}
5. 获取设备的属性
API: cloudThingPropertiesGet(param)
用途:获取物联网设备的属性
入参列表
入参名称 | 数据类型 | 是否必须 | 入参描述 |
param | Map<String,Object> | param | param |
param的具体定义如下 | |||
iotId | 字符串 | 否 | 设备ID,生活物联网平台为设备颁发的ID,设备的唯一标识符。productKey和deviceName为空时,该入参不允许为空。 |
productKey | 字符串 | 否 | 产品的Key,设备证书信息之一。创建产品时,生活物联网平台为该产品颁发的全局唯一标识。当iotId为空时,该参数不允许为空。 |
deviceName | 字符串 | 否 | 设备的名称,设备证书信息之一。在注册设备时,自定义的或系统生成的设备名称,具备产品维度内的唯一性。当iotId为空时,该参数不允许为空。 |
返回值
参数 | 类型 | 说明 |
code | Integer | 200代表执行成功 其他代表执行失败, 在使用时需要判断该返回值 |
data | Object | 无返回,为null |
使用示例
var iotPluginApi = require('./iotPlugin.js');
var Map = Java.type('java.util.HashMap');
function transform(eventType,msgPayLoadList){
let getParam = new Map();
getParam.put("productKey","a17nEF9qLYJ");
getParam.put("deviceName","mIwfRzXY9wLpRFz5RfJJ")
let result = iotPluginApi.cloudThingPropertiesGet(getParam);
if (result.code == 200) {
iotPluginApi.log("获取物的属性成功")
iotPluginApi.log("属性值为 %s",utils.toJSONString(result.data));
}
return;
}
6. 设置设备的属性
API: cloudThingPropertiesSet(param)
入参列表
入参名称 | 数据类型 | 是否必须 | 入参示例 | 入参描述 |
iotId | 字符串 | 否 | 设备ID,物联网平台为设备颁发的ID,设备的唯一标识符。productKey和deviceName为空时,该入参不允许为空 | |
items | JSON | 是 | {} | set参数 |
productKey | 字符串 | 否 | 产品的Key,设备证书信息之一。创建产品时,生活物联网平台为该产品颁发的全局唯一标识。当iotId为空时,该参数不允许为空 | |
deviceName | 字符串 | 否 | 设备的名称,设备证书信息之一。在注册设备时,自定义的或系统生成的设备名称,具备产品维度内的唯一性。当iotId为空时,该参数不允许为空 |
返回值
参数 | 类型 | 说明 |
code | Integer | 200代表执行成功 其他代表执行失败, 在使用时需要判断该返回值 |
data | Object | 无返回,为null |
使用示例
var iotPluginApi = require('./iotPlugin.js');
var Map = Java.type('java.util.HashMap');
function transform(eventType,msgPayLoadList){
let param = new Map();
let serviceParam = new Map();
param.put("productKey","a17nEF9qLYJ");
param.put("deviceName","mIwfRzXY9wLpRFz5RfJJ");
serviceParam.put("wendu",1);
serviceParam.put("shidu",2);
param.put("items",serviceParam);
let result = iotPluginApi.cloudThingPropertiesSet(param);
if (result.code == 200) {
iotPluginApi.log("设置物的属性成功")
}
return;
}
7. 调用设备的服务
API: cloudThingServiceInvoke
入参列表
入参名称 | 数据类型 | 是否必须 | 入参示例 | 入参描述 |
iotId | 字符串 | 否 | xxxxxxxx | 设备ID,物联网平台为设备颁发的ID,设备的唯一标识符。productKey和deviceName为空时,该入参不允许为空 |
identifier | 字符串 | 是 | xxxxxxxx | 服务的标识符 |
args | JSON | 是 | {} | 服务入参 |
productKey | 字符串 | 是 | 产品的Key,设备证书信息之一。创建产品时,生活物联网平台为该产品颁发的全局唯一标识。当iotId为空时,该参数不允许为空 | |
deviceName | 字符串 | 是 | 设备的名称,设备证书信息之一。在注册设备时,自定义的或系统生成的设备名称,具备产品维度内的唯一性。当iotId为空时,该参数不允许为空 |
返回值
参数 | 类型 | 说明 |
code | Integer | 200代表执行成功 其他代表执行失败, 在使用时需要判断该返回值 |
data | Object | 无返回,为null |
使用示例
var iotPluginApi = require('./iotPlugin.js');
var Map = Java.type('java.util.HashMap');
function transform(eventType,msgPayLoadList){
let param = new Map();
let args =new Map();
param.put("productKey","a17nEF9qLYJ");
param.put("deviceName","mIwfRzXY9wLpRFz5RfJJ");
param.put("identifier","start");
args.put("param1",1);
args.put("param2",2);
param.put("args",args);
let result = iotPluginApi.cloudThingServiceInvoke(param);
if (result.code == 200) {
iotPluginApi.log("调用物的服务成功")
iotPluginApi.log("返回值为 %s",utils.toJSONString(result.data));
}
return;
}
8. 工具函数utils
8.1 API: sleep(ms)
用途: 用于在JS脚本中休眠sleep
参数 | 类型 | 说明 |
ms | Integer | 需要休眠的毫秒数字 |
返回值
参数 | 类型 | 说明 |
无 |
使用示例
//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
utils.sleep(1000);
iotPluginApi.log("sleep 1 seconds");
}
8.2 API: toJSONString(param);
用途: 用于在JS中将一个JS对象转成JSON字符串5.1 API: sleep(ms);
参数 | 类型 | 说明 |
param | Object | 需要转换的JS对象,比如为Map,List可以JSON序列化的对象 |
返回值
参数 | 类型 | 说明 |
String | 返回一个JSON字符串 |
使用示例
//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
iotPluginApi.log("toJSONString %s",utils.toJSONString(msgPayLoadList));
}
8.3 全局变量
读取、存储、删除全局变量,在JS中调用。
方法 | 说明 |
utils.getVar(key) | 读取一个变量 |
utils.putVar(key) | 存储一个变量 |
utils.removeVar(key) | 删除一个变量 |
8.4 API: 时间格式化
(1)formatDate(timestamp, unitName, pattern)
参数 | 类型 | 说明 |
timestamp | Long | 时间戳 |
unitName | String | 时间单位 NANOSECONDS - 纳秒 MICROSECONDS - 微秒 MILLISECONDS - 毫秒 SECONDS - 秒 DAYS - 天 |
pattern | String | yyyy-MM-dd hh:mm:ss |
返回值
参数 | 类型 | 说明 |
String | 格式化时间字符串 |
返回值
//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
masterData["DatetimeX"] = utils.formatDate(msgPayload["DateTimeX"], "NANOSECONDS", "yyyy-MM-dd HH:mm:ss");
masterData["TimestampX"] = utils.formatDate(msgPayload["TsX"], "NANOSECONDS", "yyyy-MM-dd HH:mm:ss");
masterData["DateX"] = utils.formatDate(msgPayload["DateX"], "DAYS", "yyyy-MM-dd");
masterData["TimeX"] = utils.formatDate(msgPayload["TimeX"], "MICROSECONDS", "HH:mm:ss");
}
(2)formatDate(datetime, pattern)
参数 | 类型 | 说明 |
timestamp | Long | 时间戳 |
pattern | String | yyyy-MM-dd hh:mm:ss |
返回值
参数 | 类型 | 说明 |
String | 格式化时间字符串 |
返回值
//引用iot扩展Api模块,必须引用
var iotPluginApi = require('./iotPlugin.js');
function transform(eventType,msgPayLoadList){
let datestr = utils.formatDate(datetime, "yyyy-MM-dd HH:mm:ss");
}
9 环境变量
在输入/输出/转换节点的脚本函数中,都用一个context记录作业的上下文,context.envConfig.get("参数名") 可以读取到边缘数据集成中已经配置的参数值。
10 JS插件
写好一个插件,上传到边缘数据集成配置中,可以被这个配置中的任务一个作业节点脚本引用。
典型例子:写好一个API登录检测的包,在输入输出节点脚本中调用。
var httpPlugin = require('./../httpPlugin.js');
var token = null;
//上次使用token的时间
var token_time = 0;
function getToken(name, password) {
if (new Date().getTime() - token_time > 1000*30) {
//模拟获取Token的过程
let response = httpPlugin.httpPostJson("/api/getToken", {"name":name, "pwd": password});
if (response.code === 200) {
token = response.data;
} else {
console.log("GetToken error:" + response.code);
return null;
}
}
token_time = new Date().getTime();
return token;
}
module.exports.getToken = getToken;
常见问题
JS的变量提升问题
说明:在脚本中经常需要在函数中实现较多的转换逻辑,而这里需要注意的是JavaScript(ES5)中的var变量的作用域跟Java和C的作用域不同,否则会出现函数中的变量被覆盖的情况,导致各种问题,例如死循环。
在大多数编程语言中,会用花括号{}
来形成一个作用域,俗称“块作用域”,例如C语言、C++等。但是在JS中{}
并不能产生块作用域,JS中的作用域是依靠函数形成的。
在ECMAScript5中,JS只有两类作用域:全局作用域、函数作用域。
全局作用域:全局对象的作用域,在代码的任何地方都可访问,但有时会被函数作用域覆盖
函数作用域:作用于整个函数范围内,不管到底是在函数中的何处进行声明
在最新的数字工厂脚本引擎版本中,已经支持了ES6的let和const语法,建议在编写脚本的时候使用let和const以避免该问题的产生。
调试方法
1. 调试转换节点的脚本函数
在创建一个数据集成任务后,需要在作业任务的转换节点编写转换函数(即transform函数,默认有参考代码),
在编写完该函数后,可以通过mock数据的方法来运行该函数,并进行日志打印和结果输出。
步骤
1) 登录oc.supet.com,进入平台管理-》边缘计算-》选择一个数据集成任务-》编辑-》任务列表-》
点击转换节点的“配置”按钮;

mock参数说明:mock参数是在点击调试的时候,transform函数的入参:msgPayLoad 其类型为一个Map<String,Object>类型。
2. 查看JS的调试日志
在调试阶段,可以在云端直接针对某个集成任务进行实时调试,即脚本在边缘运行时,日志可以实时输出到云端的调试窗口中,该功能主要帮助在开发、调试阶段查看脚本的运行结果和日志情况。
2.1 在边缘数据集成中选择一个需要调试的任务,点击调试。

2.2 通过修改数据库或者其他方法触发数据变化,然后查看脚本的日志打印
3. 查看JS的运行日志
当调试完一个脚本后,需要将该脚本正式下发到边缘应用,以正式数据集成。
数据集成时可能会产生全量数据,增量数据,进而出发脚本运行,如果需要查看运行中的JS日志,可以通过如下方法查看:
进入边缘数据集成-》点击查看日志
输入阿里云账号和密码:

打开SLS日志窗口后,输入关键字“JsScriptLog”进行搜索查看。

作业节点脚本函数说明
1. 输入节点
输入节点类型是“数据库增量”、“服务提供”、“API上传”的情况不需要编写脚本。
1.1 数据库查询
方法一:获取整张表的全量数据,增量数据由系统计算两次SELECT之间的差别。
/**
*
* @param snapshotType full=全量同步,increment=增量同步,retry=重传数据
* @param context
* context.snapshotOffset:记录目前偏移值,类型是数字、文本、MAP
* context.snapshotDiffDelete:是否计算删除增量
* context.snapshotRetryKeys:重传的唯一标识,Map类型,只有在snapshotType=retry才有值
*/
function snapshot(snapshotType, context) {
//扫描整张表读取全量数据
return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.BG_Item");
}
方法二:完整的实现全量、增量、重传
/**
*
* @param snapshotType full=全量同步,increment=增量同步,retry=重传数据
* @param context
* context.snapshotOffset:记录目前偏移值,类型是数字、文本、MAP
* context.snapshotDiffDelete:是否计算删除增量
* context.snapshotRetryKeys:重传的唯一标识,Map类型,只有在snapshotType=retry才有值
*/
function snapshot(snapshotType, context) {
if (snapshotType === 'increment') {
//扫描整张表来计算增量
return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.BG_Item");
/* 高级用法:
//自定义增量
if (context.snapshotOffset == null) {
context.snapshotOffset = utils.currentTimeMillis();
}
let sql = "SELECT * FROM dbo.TB WHERE timestamp > " + context.snapshotOffset;
context.snapshotOffset = utils.currentTimeMillis();
context.snapshotDiffDelete = false;
return iotPluginApi.newDbSnapshot(sql);
*/
} else if (snapshotType === 'retry') {
//失败数据重传
return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.BG_Item WHERE cItemCode = " + context.snapshotRetryKeys['cItemCode']);
} else {
//扫描整张表读取全量数据
return iotPluginApi.newDbSnapshot("SELECT * FROM dbo.BG_Item");
}
}
1.2 API查询
说明:从一个API获取增量数据,不实现全量和重传功能。
/**
*
* @param snapshotType full=全量同步,increment=增量同步,retry=重传数据
* @param context
* context.path: api路径
* context.snapshotOffset:记录目前偏移值,类型是数字、文本、MAP
* context.snapshotDiffDelete:是否计算删除增量
* context.snapshotMore:如果全量太多数据,需要分页,设置这个参数是true
* context.snapshotRetryKeys:重传的唯一标识,Map类型,只有在snapshotType=retry才有值
*/
function snapshot(snapshotType, context) {
if (snapshotType === 'increment') { //重传,不实现无法拉增量数据
if (context.snapshotOffset == null) {
context.snapshotOffset = utils.currentTimeMillis(); //从当前时间戳开始
}
//构造请求参数
let params = {'timestamp': context.snapshotOffset};
//发起HTTP请求,也支持httpGet
let body = httpPlugin.httpPostJson(context.path, utils.toJSONString(params)); //发起HTTP请求
if (body.code !== 200) {
return null;
}
//处理返回结果
let records = iotPluginApi.newList();
let record = JSON.parse(body.data);
records.add(record);
//records.add(xxx);添加更多数据
//不需要计算增量删除
context.snapshotDiffDelete = false;
//更新记录
context.snapshotOffset = utils.currentTimeMillis();
return records;
} else if (snapshotType === 'full') { //全量同步,不实现无法拉取全量数据
//参考增量实现方法
//如果需要分页:可以用context.snapshotOffset记录位置
// 同时设置context.snapshotMore=true
return null;
} else if (snapshotType === 'retry') { //重传,不实现无法重传数据
//参考增量实现方法
//重传的标识符从context.snapshotRetryKeys读取
return null;
}
}
2. 转换节点
说明:转换输入节点采集到的原始数据到目标数据格式。
/**
* 客户实现: transform()转换函数
* 这是脚本转换器的第一个转换函数,负责将 数据来源(如数据库变换的数据) 转换成 目标数据格式,如主数据,服务模型数据,物联网数据等
* 如下的代码是一个示例:仅供参考,需要根据您的实际业务场景转换逻辑进行编写
* 示例代码是数据源的数据转换成iot工业数字工厂的人员主数据格式
* @param eventType为事件类型,对于数据库事件有 insert,update,delete
* @param msgPayload 为需要转换的目标数据,注意其格式是一个Map,对应Java的格式为Map<String, Object>
* @context 系统上下文参数
* @return 返回转换后的数据格式,必须也为一个Map,即数据输入为Map,转换格式后还是为Map
*/
function transform(eventType, msgPayload, context) {
let masterData = new iotPluginApi.newMap();
//在这里实现数据转换逻辑
masterData["num"] = msgPayload["id"];
masterData["name"] = msgPayload["name"];
//这里也可以调用数据库操作API来查询数据
return masterData;
}
注:转换节点如果需要调用【数据库操作】或者【API操作】,默认访问的数据源是输入数据源。
3. 输出节点
输出节点类似是“主数据 ”、“服务依赖”的情况不需要编写脚本。
3.1 数据库回写
不需要事务支持。
/**
* 客户实现: distribute()分发函数
* 负责将 transform转换后的数据写入边缘数据库
* @param eventType 为事件类型,对于数据库事件有 insert,update,delete
* @param transformData transform转换后的数据,是一个Map
* @param context
*/
function distribute(eventType, transformData, context) {
let sql = "INSERT INTO dbo.TB2(id,name,remark) VALUES(" + transformData['id'] + ",'XXX','YYY')";
//写入边缘数据库
let ret = iotPluginApi.iotSqlExecute(sql);
//在这里构造服务模型返回数据
if (ret.code === 200) {
//成功
} else {
//失败
}
return ret;
}
方法二:事务支持。
/**
* 客户实现: distribute()分发函数
* 负责将 transform转换后的数据写入边缘数据库
* @param eventType 为事件类型,对于数据库事件有 insert,update,delete
* @param transformData transform转换后的数据,是一个Map
* @param context
*/
function distribute(eventType, transformData, context) {
let transaction = iotPluginApi.iotGetTransaction();
let sql1 = "INSERT INTO dbo.TB1(id,name,remark) VALUES(" + transformData['id'] + ",'XXX','YYY')";
let sql2 = "INSERT INTO dbo.TB2(id,name,remark) VALUES(" + transformData['id'] + ",'XXX','YYY')";
try {
transaction.execute(sql1);
transaction.execute(sql2);
let r = transaction.commit();
console.log("commit:" + r);
return iotPluginApi.newIoTxResult(200, "OK", r);
} catch (e) {
console.log("rollback:" + e);
transaction.rollback();
return iotPluginApi.newIoTxResult(500, "rollback");
}
}
3.2 调用API
说明:发起HTTP请求并分析响应结果。
/**
* 客户实现: distribute()分发函数
* 负责将 transform转换后的返回数据通过API方式上报
* @param eventType 为事件类型,对于数据库事件有 insert,update,delete
* @param transformData transform转换后的数据,是一个Map
* @param context
* context.path:传入来的API路径
*/
function distribute(eventType, transformData, context) {
//发起一个HTTP请求,
let data = httpPlugin.httpPostJson(context.path, utils.toJSONString(transformData));
console.log("HTTP返回数据:" + data);
//如果返回是一个JSON结构
let jsonMap = JSON.parse(data.data);
//返回结果
return iotPluginApi.newIoTxResult(200, "success", jsonMap);
}
3.3 告警
说明:输出节点为告警的场景适用于监听数据变化后触发通知,注意这里的报警需要提前在数字工厂->通知规则中进行报警定义,并定义报警的标题和内容。报警触发后,会在数字工厂的报警弹窗和钉钉中通知。
/**
* 客户实现: distribute()分发函数,用于输出节点
* 如下的代码是一个示例:仅供参考,需要根据您的实际业务场景转换逻辑进行编写
* @param eventType为事件类型,对于数据库事件有 insert,update,delete
* @param transformData 这是转换节点的transform()函数返回的内容,即转换后的数据格式
* @context 系统上下文参数
* @return
*/
function distribute(eventType, transformData, context) {
//提示:可以在这里对transformData进行判断,符合触发条件的才走下面的告警逻辑
iotPluginApi.log('告警');
let a = iotPluginApi.triggerCloudAlarm('报警的内容写在这里');
return iotPluginApi.newIoTxResult(200, "success", null);
}
3.4 物联网设备服务
说明:需要在IoT的物联网设备上为该设备创建一个服务,并在输出时选择设备及需要调用的设备的服务。
/**
* 客户实现: distribute()分发函数,用于输出节点为设备物模型服务调用
* 如下的代码是一个示例:仅供参考,需要根据您的实际业务场景转换逻辑进行编写
* @param eventType为事件类型,对于数据库事件有 insert,update,delete
* @param transformData 这是转换节点的transform()函数返回的内容,即转换后的数据格式
* @context 系统上下文参数
* @return
*/
function distribute(eventType, transformData, context) {
//注意:triggerDeviceServiceCall的参数为一个JSON结构,即设备服务调用的参数
//如果该服务未设置任何参数,这传入空的json对象接口。
//也可以在这里进行if判断,对数据进行过滤,只有符合条件对才触发设备调用
let a = iotPluginApi.triggerDeviceServiceCall({});
return a;
}
3.5 自定义脚本
说明:自定义脚本为用户自定义的输出逻辑,用户可以在该脚本中调用八爪鱼脚本的API函数实现自己的逻辑。注:该distribute函数需要返回IoTxResult 200代表函数调用成功。
/**
* 客户实现: distribute()分发函数,用于输出节点
* 如下的代码是一个示例:仅供参考,需要根据您的实际业务场景转换逻辑进行编写
* @param eventType为事件类型,对于数据库事件有 insert,update,delete
* @param transformData 这是转换节点的transform()函数返回的内容,即转换后的数据格式
* @context 系统上下文参数
* @return
*/
function distribute(eventType, transformData, context) {
//这是一个自定义的脚本,您可以在这里通过调用数据集成的脚本API实现自定义的输出逻辑
//这个示例脚本仅仅只是打印了一条语句,用于参考
iotPluginApi.log('this is user-defined script');
return iotPluginApi.newIoTxResult(200, "success", null);
}
4. 使用步骤
下面介绍如何使用脚本的几个步骤。
4.1. 创建数据集成和开发作业任务
创建一个数据集成,然后在该数据中创建一个作业任务,同时完成输入节点、转换节点、输出节点的配置和脚本编写。
转换节点必须实现脚本,其他的输入节点、输出节点根据不同的类型有些需要配置,
有些需要进行脚本编写,请根据实际业务场景进行配置。
4.2. 调试脚本
在完成脚本编写后,通过Mock参数和调试的方法进行脚本调试,查看调试的返回值确认脚本是否编写正确。
4.3. 下载配置
脚本编写和调试完成后,进行数据集成页面的编辑页面,点击“下载配置”,将脚本下载到边缘应用。
4.4. 查看数据集成结果
查看主数据/服务模型/API等输出节点的同步结果。
下载成功后,查看主数据的数据,或者服务模型的数据以判断是否同步成功。
也可以通过在边缘的数据库进行数据库表的修改触发增量数据,进而脚本转换查看同步的结果。