更新时间:2019-09-18 15:38
大概框架如上图所示
在Studio里面创建一个服务型机器人应用,编写代码并配置好 参数面板
,然后发布到企业应用市场。
参数的获取,按照正常的应用写法获取参数。 rpa.params[“myInput”]参数的返回,调用提供的API,把应用的结果返回。rpa.workbench.task_result(resStr)
应用调起者登陆控制台,并在企业应用市场里找到已发布的应用,并点击 获取应用
应用调起者登陆控制台,并在企业应用市场里找到已发布的应用,并点击 查看接口
接口url中的rpahost用控制台的域名替换
详情说明请查看服务型机器人使用说明
用Postman调用接口,说明一下服务型机器人应用调用的过程,具体步骤如下:
查询任务运行状态和任务的运行结果都是GET请求,记得在Headers中填写AK信息。如果请求时写了回调地址且回调地址可用时,运行结果返回给回调地址,过后再主动查询是不会得到运行结果的,意思是主动查询和写回调地址只能二选一,目前是这样的。
python实现服务型机器人应用调用的示例:
from rpa.core import *
from rpa.utils import *
import rpa
import requests
from urllib3 import encode_multipart_formdata
import json
from datetime import datetime
RPAHOST = "rpa-demo.allinbots.com"
ACCESSKEY = "54f7261d92756a85"
ACCESSKEYSECRET = "b485d705-91e9-426d-b8d2-3f0b3c19551e"
def start():
"""
1、从控制台获取服务型机器人的接口,记得修改rpahost
2、从控制台的系统设置中获取AK信息
3、向接口连接发一个post请求,将AK信息添加到头参数中,body中可以传参数和调起制定电脑的服务型机器人
4、从返回的json数据中提取resultUrl,向这个resultUrl发一个get请求,获取服务型机器人执行后的结果
"""
rpa.datatable.clear()
# url为服务型机器人的接口
url = "http://{rpahost}/rpa/api/ak/server/call/40e1d35a-1967-409a-a67f-8077329d7adb".format(rpahost=RPAHOST)
# headers里添加ak信息
headers = {"accessKey": ACCESSKEY,
"accessKeySecret": ACCESSKEYSECRET,
"Content-Type":"application/x-www-form-urlencoded"}
# body里appParams对于服务型机器人应用中的数据面板中的参数,robotName对应开启服务型机器人电脑的名字
body = {"config":r'{"appParams":"{\"myInput\":\"码栈\"}","robotName":"GUOJING622E","callbackUrl": "http://129.204.89.219/rpa/received"}'}
write_datatable("body:{}".format(json.dumps(body)))
# 发送一个port请求,调用服务型机器人应用
r = requests.post(url, headers=headers,data=body)
write_datatable("r.headers: {}".format(r.headers))
write_datatable("r.status_code: {}".format(r.status_code))
write_datatable("r.text: {}".format(r.text))
if "没有空闲的服务机器人" in json.loads(r.text)["msg"]:
rpa.alert.msgbox("提示", "没有空闲的服务机器人")
# 获取返回的json数据,数据中包含参看服务型机器人的状态taskId,和获取服务型机器人应用结果的请求的接口resultUrl
body = json.loads(r.text)
write_datatable('body["data"]: {}'.format(body["data"]))
result_url = "http://{rpahost}".format(rpahost=RPAHOST) + body["data"]['resultUrl']
write_datatable("resultUrl: {}".format(result_url))
robot_status_url = "http://{rpahost}/rpa/api/ak/server/task/status/".format(rpahost=RPAHOST) + body["data"]["taskId"]
write_datatable("robot_status_url: {}".format(robot_status_url))
while True:
# 查看服务型机器人的状态
r_robot_status = requests.get(robot_status_url, headers=headers)
write_datatable("r_robot_status.text: {}".format(r_robot_status.text))
r_robot_status_data = json.loads(r_robot_status.text)
write_datatable("{}".format(r_robot_status_data["data"]))
robot_status = r_robot_status_data["data"]["status"]
write_datatable("robot status:{}".format(robot_status))
# 当机器人运行状态为completion时,任务运行完成,可以查看任务运行结果
if "completion" in robot_status:
r_result = requests.get(result_url, headers=headers)
write_datatable("r_result.text:{}".format(r_result.text))
r_result_data = json.loads(r_result.text)['data']
if r_result_data != None:
write_datatable("运行结果: {}".format(r_result_data))
break
# 在数据视图中添加记录
def write_datatable(str):
now = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
rpa.datatable.addrow([now,str])
rpa.datatable.scrollindex(len(rpa.datatable))
调用地址:http://rpa-demo.allinbots.com/rpa/api/ak/server/uploadFile
请求方式:multipart/form-data
请求头记得添加AK信息记得修改成自己的RPA服务端的域名
参数key | **类 | **请求方 | **是否必 | ** |
---|---|---|---|---|
file | File | FormParam | 是 | 上传的文件 |
用Postman调用文件上传接口的示例:
python实现文件上传的示例:
from rpa.core import *
from rpa.utils import *
import rpa
import requests
from urllib3 import encode_multipart_formdata
import json
from datetime import datetime
def upload():
rpa.datatable.clear()
filepath = r'C:\temp\test.xlsx' # 待上传的文件
rpahost= "http://rpa-demo.allinbots.com" # 服务器的域名,用于与文件的url拼接
request_url = "http://rpa-demo.allinbots.com/rpa/api/ak/server/uploadFile"
#ak信息从控制台的 系统设置 中获取
headers = {"accessKey":"54f7261d92756a85",
"accessKeySecret":"b485d705-91e9-426d-b8d2-3f0b3c19551e",
"Content-Type":"application/x-www-form-urlencoded"}
_data = {}
filename = "test.xls"
url = rpahost + post_files(request_url, headers, _data, filename, filepath)
write_datatable(url)
def post_files(url,header,data,filename,filepath):
"""
url: 请求地址
header: 请求头
data: 请求body
filename: 上传文件的文件名
filepath:上传文件的完整路径
return: 文件上传成功后的下载链接
"""
data['file']= (filename,open(filepath,'rb').read())
encode_data = encode_multipart_formdata(data)
data = encode_data[0]
header['Content-Type'] = encode_data[1]
r = requests.post(url, headers=header, data=data)
write_datatable("r.headers: {}".format(r.headers))
write_datatable("r.status_code: {}".format(r.status_code))
write_datatable("r.text: {}".format(r.text))
result_data = json.loads(r.text)["data"]
if result_data:
return result_data["url"]
else:
return None
# 在数据视图中添加记录
def write_datatable(str):
now = datetime.today().strftime("%Y-%m-%d %H:%M:%S")
rpa.datatable.addrow([now,str])
rpa.datatable.scrollindex(len(rpa.datatable))
在文档使用中是否遇到以下问题
更多建议
匿名提交