DataWorks提供了丰富的OpenAPI,您可以根据需要使用DataWorks的OpenAPI等开放能力实现各种业务场景。本文以使用运维中心的OpenAPI与OpenEvent为例,为您示例如何搭建运维大屏。
背景信息
本实践搭建的运维大屏可满足以下业务场景的需求:
实时任务的运行监控与节点状态监控。
非实时任务的一周调度任务数量趋势、任务完成情况、近一个月运行时长排行、近一个月任务出错排行、昨日任务类型分布、昨日运行状态分布等能力。
完成本实践后搭建的运维大屏如下所示。
最佳实践:搭建自定义运维大屏
在进行本实践之前,建议您先参考以下链接,了解DataWorks的开发平台基本能力和概念:
准备工作:开启并配置消息订阅(OpenEvent)
在进行代码开发前,您需要先在开放平台中创建一个开放事件(OpenEvent),操作步骤请参见开启并配置消息订阅(OpenEvent)。
实践1:实时任务运行监控
完成准备工作后,以下步骤将介绍如何使用开发平台来实现实时任务的运行监控。
后端代码开发。
在工程中编写代码实现任务运行消息订阅。
package com.aliyun.dataworks.demo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.aliyun.dataworks.services.DataWorksOpenApiClient; import com.aliyun.dataworks.services.EventService; import com.aliyun.dataworks.utils.Constants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author dataworks demo */ @RestController @RequestMapping("/eventBridge") public class OpenEventConsumer { @Autowired(required = false) private DataWorksOpenApiClient dataWorksOpenApiClient; @Autowired private EventService eventService; /** * 接收eventBridge推送过来的消息 * * @param messageContent */ @PostMapping("/consume") public void consume(@RequestBody String messageContent) { JSONObject jsonObj = JSON.parseObject(messageContent); String eventCode = jsonObj.getString(Constants.EVENT_CODE_KEY); String messageId = jsonObj.getString(Constants.EVENT_MESSAGE_ID); JSONObject jsonObject = jsonObj.getJSONObject(Constants.EVENT_DATA_KEY); System.out.println(messageId + "," + eventCode); eventService.consumeEventBridge(messageId, eventCode, jsonObject); } }
上述代码中会调用consumeEventBridge方法来缓存接收到的信息以方便前端调用,consumeEventBridge的实现如下。
package com.aliyun.dataworks.services; import com.alibaba.fastjson.JSONObject; import com.aliyun.dataworks.utils.Constants; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; /** * @author dataworks demo */ @Service public class EventService { /** * 大屏实时数据,DEMO代码存放到内存中,在实际生产环境应该持久化存储 */ private Map<String, AtomicInteger> screenMap = new ConcurrentHashMap(); /** * @param messageId * @param eventCode * @param messageBody * @return */ public boolean consumeEventBridge(String messageId, String eventCode, JSONObject messageBody) { if (!isWorkbenchEvent(eventCode)) { return false; } AtomicInteger currentStatus = screenMap.get(eventCode); if (currentStatus == null) { addZero(eventCode); currentStatus = screenMap.get(eventCode); } //DEMO代码存放到内存中,在实际生产环境应该持久化存储 currentStatus.incrementAndGet(); System.out.println("messageId=" + messageId); System.out.println("messageBody=" + messageBody.toJSONString()); return true; } /** * @return */ public Map<String, Integer> getCurrentStatus() { AtomicInteger number = screenMap.get("nodeChangeCreated"); Map<String, Integer> result = new HashMap<>(); screenMap.keySet().forEach(key -> result.put(key, screenMap.get(key).intValue())); return result; } private boolean isWorkbenchEvent(String eventCode) { return Constants.INSTANCE_STATUS_CHANGES.equals(eventCode) || Constants.NODE_CHANGE_CREATED.equals(eventCode) || Constants.NODE_CHANGE_DELETED.equals(eventCode) || Constants.NODE_CHANGE_CREATED.equals(eventCode); } private synchronized void addZero(String eventCode) { AtomicInteger currentStatus = screenMap.get(eventCode); if (currentStatus == null) { screenMap.put(eventCode, new AtomicInteger(0)); } } }
构建一个提供给前端调用以获取缓存消息的接口getRealTimeNodeChanges。
package com.aliyun.dataworks.demo; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.services.EventService; import com.aliyun.dataworks.services.WorkbenchOpenApiService; import com.aliyuncs.dataworks_public.model.v20200518.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.Map; /** * @author dataworks demo */ @RestController @RequestMapping("/screen") public class WorkbenchScreenController { @Autowired private WorkbenchOpenApiService workbenchOpenApiService; @Autowired private EventService eventService; /** * 获取节点和任务实时变更数据 * * @return */ @GetMapping("/getRealTimeNodeChanges") public Map<String, Integer> getRealTimeNodeChanges() { return eventService.getCurrentStatus(); } }
前端代码开发。
前端部分使用仪表盘来表现,将后端接口返回的instanceSuccessCount与instanceFailureCount进行运算获得任务的运行成功率。示例代码如下。
import React from 'react'; import cn from 'classnames'; import { Loading } from '@alifd/next'; import { Gauge } from '@antv/g2plot'; import * as helpers from '../helpers'; import * as services from '../services'; import classes from '../styles/app.module.css'; export interface Props {} // 划分刻度,50%以下表示劣,50%-90%为中,90%以上为优 const ticks = [0, 1 / 2, 9 / 10, 1]; const color = ['#F4664A', '#FAAD14', '#30BF78']; const RealtimeTaskMonitor: React.FunctionComponent<Props> = () => { const ref = React.useRef<HTMLDivElement>(null); const [percent, setPercent] = React.useState<number | undefined>(undefined); const [visible, setVisible] = React.useState(true); const [chart, setChart] = React.useState<Gauge>(); // 获取后端数据 const fetchData = async () => { try { percent !== undefined && await helpers.pause(5000); const response = await services.workbench.getRealtimeNodeChanges(); // 计算任务运行成功率 let result = response.instanceSuccessCount / (response.instanceSuccessCount + response.instanceFailureCount); result = Number(result.toFixed(2)); setPercent(result); chart?.changeData(result); } catch (e) { console.error(e); } finally { visible && setVisible(false); } }; React.useEffect(() => { fetchData(); }, [ percent, chart, ]); React.useEffect(() => { if (ref.current) { const gauge = new Gauge(ref.current, { percent: percent || 0.5, range: { ticks, color: ['#F4664A', '#FAAD14', '#30BF78'], }, padding: 'auto', indicator: { pointer: { style: { stroke: '#D0D0D0', }, }, pin: { style: { stroke: '#D0D0D0', }, }, }, statistic: { title: { formatter: (datum) => { const nextPercent = datum?.percent; if (nextPercent < ticks[1]) { return `差(${Math.round(nextPercent * 100)}%)`; } if (nextPercent < ticks[2]) { return `中(${Math.round(nextPercent * 100)}%)`; } return `优(${Math.round(nextPercent * 100)}%)`; }, style: ({ percent }) => { return { fontSize: '36px', lineHeight: 1, color: percent < ticks[1] ? color[0] : percent < ticks[2] ? color[1] : color[2], }; }, }, content: { offsetY: 36, style: { fontSize: '24px', color: '#4B535E', }, formatter: () => '运行成功率', }, }, }); setChart(gauge); gauge.render(); return () => { gauge.destroy(); }; } }, [ ref.current, ]); return ( <Loading className={cn(classes.appLoading)} visible={visible}> <div ref={ref} /> </Loading> ); }; export default RealtimeTaskMonitor;
上述代码中使用pause函数,即暂停5秒,5秒后调用一次后端接口以获取最新的任务运行状态并刷新前端组件,然后再重新开始调用后端接口。
完成上述代码开发后,您可在本地部署并运行工程代码。部署并运行的操作请参见通用操作:本地部署。
完成以上实践步骤后,您可搭建一个实时任务运行监控的大盘,如下所示。
实践2:实时节点状态监控
完成准备工作后,以下步骤将介绍如何使用开发平台来实现实时节点状态监控。
后端代码开发。
后端部分的实现过程与实时任务运行监控相同,调用的也是同一个接口,代码示例请参见后端代码开发。
前端代码开发。
前端部分使用了面积图来展现已创建、已更新与已删除三种节点状态的分布情况。示例代码如下。
import React from 'react'; import cn from 'classnames'; import moment from 'moment'; import { Loading } from '@alifd/next'; import { Area } from '@antv/g2plot'; import * as helpers from '../helpers'; import * as services from '../services'; import classes from '../styles/app.module.css'; export interface Props {} export interface Point { time: string; value: number; type: string; } const RealtimeNodeMonitor: React.FunctionComponent<Props> = () => { const ref = React.useRef(null); const [visible, setVisible] = React.useState(true); const [data, setData] = React.useState<Point[]>([]); const [chart, setChart] = React.useState<Area>(); const fetchData = async () => { try { data.length > 0 && await helpers.pause(5000); const response = await services.workbench.getRealtimeNodeChanges(); const time = moment().format('HH:mm:ss'); const nextData = data.concat([ { time, value: response.nodeChangeCreated, type: '已创建' }, { time, value: response.nodeChangeUpdated, type: '已更新' }, { time, value: response.nodeChangeDeleted, type: '已删除' }, ]); setData(nextData); chart?.changeData(nextData); } catch (e) { console.error(e); } finally { visible && setVisible(false); } }; React.useEffect(() => { fetchData(); }, [ data, chart, ]); React.useEffect(() => { if (ref.current) { const area = new Area(ref.current, { data, padding: 'auto', xField: 'time', yField: 'value', seriesField: 'type', }); setChart(area); area.render(); return () => { area.destroy(); }; } }, [ ref.current, ]); return ( <Loading className={cn(classes.appLoading)} visible={visible}> <div ref={ref} /> </Loading> ); }; export default RealtimeNodeMonitor;
上述代码中使用pause函数,即暂停5秒,5秒后调用一次后端接口以获取最新的任务运行状态并刷新前端组件,然后再重新开始调用后端接口。
完成上述代码开发后,您可在本地部署并运行工程代码。部署并运行的操作请参见通用操作:本地部署。
完成以上实践步骤后,您可搭建一个实时节点状态监控的大盘,如下所示。
实践3:一周调度任务数量趋势
完成准备工作后,以下步骤将介绍如何使用开发平台来实现展示一周调度任务数量趋势。
后端代码开发。
在后端实现一个方法来调用ListInstanceAmount这个API。
一周调度任务数量趋势使用了ListInstanceAmount这个API来获取周期实例数量的趋势,因此后端代码开发时您首先需要实现一个方法来调用此API。
package com.aliyun.dataworks.services; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.enums.DagType; import com.aliyun.dataworks.enums.SchedulerType; import com.aliyuncs.dataworks_public.model.v20200518.*; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.utils.StringUtils; import org.junit.Assert; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * @author dataworks demo */ @Service public class WorkbenchOpenApiService { @Autowired private DataWorksOpenApiClient dataWorksOpenApiClient; /** * 帮助文档 https://help.aliyun.com/document_detail/212602.html * * @param beginDate * @param endDate * @param projectId * @return {@link ListInstanceAmountResponse.IntanceCounts} */ public List<ListInstanceAmountResponse.IntanceCounts> listInstanceAmount(String beginDate, String endDate, Long projectId) { Assert.assertNotNull(beginDate); Assert.assertNotNull(endDate); Assert.assertNotNull(projectId); ListInstanceAmountRequest request = new ListInstanceAmountRequest(); // 开始业务日期,精确到天。该参数需要配置为yyyy-MM-dd'T'HH:mm:ssZ的UTC格式。 request.setBeginDate(beginDate); // 结束业务日期,精确到天。该参数需要配置为yyyy-MM-dd'T'HH:mm:ssZ的UTC格式。 request.setEndDate(endDate); // 提供projectId入参,DataWorks OpenAPI会做相应的权限校验 request.setProjectId(projectId); try { ListInstanceAmountResponse response = dataWorksOpenApiClient.createClient().getAcsResponse(request); // 打印出requestId,方便排查问题 System.out.println(response.getRequestId()); return response.getInstanceCounts(); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); // 请求ID System.out.println(e.getRequestId()); // 错误码 System.out.println(e.getErrCode()); // 错误信息 System.out.println(e.getErrMsg()); } return null; } }
实现一个入口供前端调用。
package com.aliyun.dataworks.demo; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.services.EventService; import com.aliyun.dataworks.services.WorkbenchOpenApiService; import com.aliyuncs.dataworks_public.model.v20200518.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.Map; /** * @author dataworks demo */ @RestController @RequestMapping("/screen") public class WorkbenchScreenController { @Autowired private WorkbenchOpenApiService workbenchOpenApiService; @Autowired private EventService eventService; /** * 获取项目空间实例总量 * * @param beginDate * @param endDate * @param projectId * @return */ @GetMapping("/listInstanceAmount") public List<ListInstanceAmountResponse.IntanceCounts> listInstanceAmount(String beginDate, String endDate, Long projectId) { return workbenchOpenApiService.listInstanceAmount(beginDate, endDate, projectId); } }
前端代码开发。
在前端实现一个横向的柱状图并请求后端接口进行渲染。
import React from 'react'; import cn from 'classnames'; import moment from 'moment'; import { Loading } from '@alifd/next'; import { Bar } from '@antv/g2plot'; import type { InstanceAmount } from '../services/workbench'; import * as services from '../services'; import classes from '../styles/app.module.css'; export interface Props { projectId: number; } const InstanceAmountList: React.FunctionComponent<Props> = (props) => { const ref = React.useRef(null); const [visible, setVisible] = React.useState(false); const [data, setData] = React.useState<InstanceAmount[]>([]); const [chart, setChart] = React.useState<Bar>(); // 获取后端数据的方法 const fetchData = async (signal: AbortSignal, projectId: number) => { try { setVisible(true); // 开始时间使用7天前 const beginDate = moment().subtract(7, 'days').format('yyyy-MM-DDTHH:mm:ssZZ'); // 结束时间使用今天 const endDate = moment().format('yyyy-MM-DDTHH:mm:ssZZ'); const response = await services.workbench.getInstanceAmountList(signal, beginDate, endDate, projectId); chart?.changeData(response.map(i => ({ date: moment(i.date).format('YYYY-MM-DD'), count: i.count }))); setData(response); } catch (e) { console.error(e); } finally { setVisible(false); } }; // 开始渲染时获取数据 React.useEffect(() => { const controller = new AbortController(); props.projectId && fetchData(controller.signal, props.projectId); return () => { controller.abort(); }; }, [ // 当projectId变化时重新获取后端数据 props.projectId, ]); // 新建一个横向的柱状图 React.useEffect(() => { if (ref.current) { const bar = new Bar(ref.current, { data, xField: 'count', yField: 'date', barBackground: { style: { fill: 'rgba(0,0,0,0.1)', }, }, height: 200, maxBarWidth: 10, }); bar.render(); setChart(bar); return () => { bar.destroy(); }; } }, [ ref.current, ]); // 渲染组件 return ( <Loading className={cn(classes.appLoading)} visible={visible}> <div ref={ref} /> </Loading> ); }; export default InstanceAmountList;
完成上述代码开发后,您可在本地部署并运行工程代码。部署并运行的操作请参见通用操作:本地部署。
完成以上实践步骤后,您可搭建一个一周调度任务数量趋势的大盘,如下所示。
实践4:任务完成情况
完成准备工作后,以下步骤将介绍任务完成情况的整个实现过程。
后端代码开发。
在后端实现一个方法来调用ListSuccessInstanceAmount这个API。
任务完成情况使用了ListSuccessInstanceAmount,因此后端代码开发时您首先需要实现一个方法来调用此API。
package com.aliyun.dataworks.services; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.enums.DagType; import com.aliyun.dataworks.enums.SchedulerType; import com.aliyuncs.dataworks_public.model.v20200518.*; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.utils.StringUtils; import org.junit.Assert; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * @author dataworks demo */ @Service public class WorkbenchOpenApiService { @Autowired private DataWorksOpenApiClient dataWorksOpenApiClient; /** * 帮助文档 https://help.aliyun.com/document_detail/212622.html * 提供projectId入参,DataWorks OpenAPI会做相应的权限校验 * * @param projectId * @return {@link ListSuccessInstanceAmountResponse.InstanceStatusTrend} */ public ListSuccessInstanceAmountResponse.InstanceStatusTrend listSuccessInstanceAmount(Long projectId) { Assert.assertNotNull(projectId); ListSuccessInstanceAmountRequest request = new ListSuccessInstanceAmountRequest(); // 提供projectId入参,DataWorks OpenAPI会做相应的权限校验 request.setProjectId(projectId); try { ListSuccessInstanceAmountResponse response = dataWorksOpenApiClient.createClient().getAcsResponse(request); // 打印出requestId,方便排查问题 System.out.println(response.getRequestId()); return response.getInstanceStatusTrend(); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); // 请求ID System.out.println(e.getRequestId()); // 错误码 System.out.println(e.getErrCode()); // 错误信息 System.out.println(e.getErrMsg()); } return null; } }
实现一个入口供前端调用。
package com.aliyun.dataworks.demo; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.services.EventService; import com.aliyun.dataworks.services.WorkbenchOpenApiService; import com.aliyuncs.dataworks_public.model.v20200518.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.Map; /** * @author dataworks demo */ @RestController @RequestMapping("/screen") public class WorkbenchScreenController { @Autowired private WorkbenchOpenApiService workbenchOpenApiService; @Autowired private EventService eventService; /** * 获取任务成功量的趋势图数据 * * @param projectId * @return {@link ListSuccessInstanceAmountResponse.InstanceStatusTrend} */ @GetMapping("/listSuccessInstanceAmount") public ListSuccessInstanceAmountResponse.InstanceStatusTrend listSuccessInstanceAmount(Long projectId) { return workbenchOpenApiService.listSuccessInstanceAmount(projectId); } }
前端代码开发。
在前端实现一个折线图组件来获取并加载数据。
import React from 'react'; import cn from 'classnames'; import { Loading } from '@alifd/next'; import { Line } from '@antv/g2plot'; import * as services from '../services'; import type { SuccessInstanceAmount } from '../services/workbench'; import classes from '../styles/app.module.css'; export interface Props { projectId: number; } export interface Point extends SuccessInstanceAmount { type?: string; } const SuccessInstanceAmountList: React.FunctionComponent<Props> = (props) => { const ref = React.useRef(null); const [visible, setVisible] = React.useState(false); const [data, setData] = React.useState<Point[]>([]); const [chart, setChart] = React.useState<Line>(); // 获取数据的方法 const fetchData = async (signal: AbortSignal, projectId: number) => { try { setVisible(true); const response = await services.workbench.getSuccessInstanceAmountList(signal, projectId); let collection: Point[] = []; // 把数据结构拍平以进行绘制 collection = collection.concat(response.avgTrend.map(i => ({ ...i, type: '平均值' }))); collection = collection.concat(response.todayTrend.map(i => ({ ...i, type: '今日' }))); collection = collection.concat(response.yesterdayTrend.map(i => ({ ...i, type: '昨日' }))); // 取最大值来美化Y轴的展示 const max = Math.max(...collection.map(i => i.count)) + 1; chart?.changeData(collection); chart?.update({ yAxis: { max } }); setData(collection); } catch (e) { console.error(e); } finally { setVisible(false); } }; // 当首次加载或projectId变化时,重新获取后端数据并加载 React.useEffect(() => { const controller = new AbortController(); props.projectId && fetchData(controller.signal, props.projectId); return () => { controller.abort(); }; }, [ props.projectId, ]); // 构建折线图 React.useEffect(() => { if (ref.current) { const line = new Line(ref.current, { data, xField: 'timePoint', yField: 'count', seriesField: 'type', height: 300, yAxis: { tickInterval: 1, }, }); setChart(line); line.render(); return () => { line.destroy(); }; } }, [ ref.current, ]); // 渲染组件 return ( <Loading className={cn(classes.appLoading)} visible={visible}> <div ref={ref} /> </Loading> ); }; export default SuccessInstanceAmountList;
完成上述代码开发后,您可在本地部署并运行工程代码。部署并运行的操作请参见通用操作:本地部署。
完成以上实践步骤后,您可搭建一个任务完成情况的大盘,如下所示。
实践5:近一个月运行时长排行
完成准备工作后,以下步骤将介绍近一个月运行时长排行的整个实现过程。
后端代码开发。
在后端构建了一个方法来处理入参并发送请求给TopTenElapsedTimeInstance这个API。
package com.aliyun.dataworks.services; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.enums.DagType; import com.aliyun.dataworks.enums.SchedulerType; import com.aliyuncs.dataworks_public.model.v20200518.*; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.utils.StringUtils; import org.junit.Assert; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * @author dataworks demo */ @Service public class WorkbenchOpenApiService { @Autowired private DataWorksOpenApiClient dataWorksOpenApiClient; /** * 帮助文档 https://help.aliyun.com/document_detail/212579.html * * @param projectId * @return {@link TopTenElapsedTimeInstanceResponse.InstanceConsumeTimeRank} */ public TopTenElapsedTimeInstanceResponse.InstanceConsumeTimeRank topTenElapsedTimeInstance(Long projectId) { Assert.assertNotNull(projectId); TopTenElapsedTimeInstanceRequest request = new TopTenElapsedTimeInstanceRequest(); // 提供projectId入参,DataWorks OpenAPI会做相应的权限校验 request.setProjectId(projectId); try { TopTenElapsedTimeInstanceResponse response = dataWorksOpenApiClient.createClient().getAcsResponse(request); // 打印出requestId,方便排查问题 System.out.println(response.getRequestId()); return response.getInstanceConsumeTimeRank(); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); // 请求ID System.out.println(e.getRequestId()); // 错误码 System.out.println(e.getErrCode()); // 错误信息 System.out.println(e.getErrMsg()); } return null; } }
实现一个入口供前端调用。
package com.aliyun.dataworks.demo; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.services.EventService; import com.aliyun.dataworks.services.WorkbenchOpenApiService; import com.aliyuncs.dataworks_public.model.v20200518.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.Map; /** * @author dataworks demo */ @RestController @RequestMapping("/screen") public class WorkbenchScreenController { @Autowired private WorkbenchOpenApiService workbenchOpenApiService; @Autowired private EventService eventService; /** * 获取近一个月运行时长topN排行 * * @param projectId * @return */ @GetMapping("/topTenElapsedTimeInstance") public TopTenElapsedTimeInstanceResponse.InstanceConsumeTimeRank topTenElapsedTimeInstance(Long projectId) { return workbenchOpenApiService.topTenElapsedTimeInstance(projectId); } }
前端代码开发。
在前端选择表格组件来展示数据。
import React from 'react'; import moment from 'moment'; import { Table } from '@alifd/next'; import * as services from '../services'; import type { ConsumeTimeRank } from '../services/workbench'; export interface Props { projectId: number; } const { Column } = Table; const TopNInstanceElapsedTime: React.FunctionComponent<Props> = (props) => { const [visible, setVisible] = React.useState(false); const [dataSource, setDataSource] = React.useState<ConsumeTimeRank[]>([]); // 获取后端数据的方法 const fetchData = async (signal: AbortSignal, projectId: number) => { try { setVisible(true); const response = await services.workbench.getTopNInstanceElapsedTime(signal, projectId); setDataSource(response.consumeTimeRank); } catch (e) { console.error(e); } finally { setVisible(false); } }; // 当初次渲染或projectId变化时,重新调用接口获取数据并渲染 React.useEffect(() => { const controller = new AbortController(); props.projectId && fetchData(controller.signal, props.projectId); return () => { controller.abort(); }; }, [ props.projectId, ]); // 选染组件 return ( <Table loading={visible} dataSource={dataSource} maxBodyHeight={400} fixedHeader > <Column dataIndex="nodeId" title="节点ID" /> <Column dataIndex="nodeName" title="节点名称" /> <Column dataIndex="instanceId" title="实例ID" /> <Column dataIndex="owner" title="负责人" /> <Column dataIndex="businessDate" cell={value => moment(value).format('YYYY-MM-DD')} title="业务日期" /> <Column dataIndex="consumed" title="运行时长" /> </Table> ); }; export default TopNInstanceElapsedTime;
完成上述代码开发后,您可在本地部署并运行工程代码。部署并运行的操作请参见通用操作:本地部署。
完成以上实践步骤后,您可搭建一个近一个月运行时长排行的大盘,如下所示。
实践6:近一个月任务出错排行
完成准备工作后,以下步骤将介绍近一个月任务出错排行的整个实现过程。
后端代码开发。
在后端构建了一个方法来处理入参并发送请求给TopTenErrorTimesInstance这个API。
package com.aliyun.dataworks.services; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.enums.DagType; import com.aliyun.dataworks.enums.SchedulerType; import com.aliyuncs.dataworks_public.model.v20200518.*; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.utils.StringUtils; import org.junit.Assert; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * @author dataworks demo */ @Service public class WorkbenchOpenApiService { @Autowired private DataWorksOpenApiClient dataWorksOpenApiClient; /** * 帮助文档 https://help.aliyun.com/document_detail/212587.html * * @param projectId * @return {@link TopTenErrorTimesInstanceResponse.InstanceErrorRank} */ public TopTenErrorTimesInstanceResponse.InstanceErrorRank topTenErrorTimesInstance(Long projectId) { TopTenErrorTimesInstanceRequest request = new TopTenErrorTimesInstanceRequest(); request.setProjectId(projectId); try { TopTenErrorTimesInstanceResponse response = dataWorksOpenApiClient.createClient().getAcsResponse(request); // 打印出requestId,方便排查问题 System.out.println(response.getRequestId()); return response.getInstanceErrorRank(); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); // 请求ID System.out.println(e.getRequestId()); // 错误码 System.out.println(e.getErrCode()); // 错误信息 System.out.println(e.getErrMsg()); } return null; } }
实现一个入口供前端调用。
package com.aliyun.dataworks.demo; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.services.EventService; import com.aliyun.dataworks.services.WorkbenchOpenApiService; import com.aliyuncs.dataworks_public.model.v20200518.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.Map; /** * @author dataworks demo */ @RestController @RequestMapping("/screen") public class WorkbenchScreenController { @Autowired private WorkbenchOpenApiService workbenchOpenApiService; @Autowired private EventService eventService; /** * 获取任务运行错误次数topN排行 * * @param projectId * @return */ @GetMapping("/topTenErrorTimesInstance") public TopTenErrorTimesInstanceResponse.InstanceErrorRank topTenErrorTimesInstance(Long projectId) { return workbenchOpenApiService.topTenErrorTimesInstance(projectId); } }
前端代码开发。
在前端选择表格组件来展示数据。
import React from 'react'; import { Table } from '@alifd/next'; import * as services from '../services'; import type { ErrorInstance } from '../services/workbench'; export interface Props { projectId: number; } const { Column } = Table; const TopNErrorInstance: React.FunctionComponent<Props> = (props) => { const [visible, setVisible] = React.useState(false); const [dataSource, setDataSource] = React.useState<ErrorInstance[]>([]); // 获取数据的方法 const fetchData = async (signal: AbortSignal, projectId: number) => { try { setVisible(true); const response = await services.workbench.getTopNErrorInstance(signal, projectId); setDataSource(response.errorRank); } catch (e) { console.error(e); } finally { setVisible(false); } }; // 初次渲染或projectId变化时重新调用接口并更新渲染 React.useEffect(() => { const controller = new AbortController(); props.projectId && fetchData(controller.signal, props.projectId); return () => { controller.abort(); }; }, [ props.projectId, ]); // 泫染组件 return ( <Table loading={visible} dataSource={dataSource} maxBodyHeight={400} fixedHeader > <Column dataIndex="nodeId" title="节点ID" /> <Column dataIndex="nodeName" title="节点名称" /> <Column dataIndex="owner" title="负责人" /> <Column dataIndex="programType" title="任务类型" cell={(value: number) => { switch (value) { case 6: return 'Shell'; case 10: return 'ODPS SQL'; case 11: return 'ODPS MR'; case 23: return '数据集成'; case 24: return 'ODPS Script'; case 99: return '虚拟节点'; case 221: return 'PyODPS 2'; case 225: return 'ODPS Spark'; case 227: return 'EMR Hive'; case 228: return 'EMR Spark'; case 229: return 'EMR Spark SQL'; case 230: return 'EMR MR'; case 239: return 'OSS对象检查'; case 257: return 'EMR Shell'; case 258: return 'EMR Spark Shell'; case 259: return 'EMR Presto'; case 260: return 'EMR Impala'; case 900: return '实时同步'; case 1089: return '跨租户节点'; case 1091: return 'Hologres开发'; case 1093: return 'Hologres SQL'; case 1100: return '赋值节点'; case 1221: return 'PyODPS 3'; } }} /> <Column dataIndex="count" title="出错次数" /> </Table> ); }; export default TopNErrorInstance;
完成上述代码开发后,您可在本地部署并运行工程代码。部署并运行的操作请参见通用操作:本地部署。
完成以上实践步骤后,您可搭建一个近一个月运行时长排行的大盘,如下所示。
实践7:昨日任务类型分布
完成准备工作后,以下步骤将介绍昨日任务类型分布的整个实现过程。
后端代码开发。
在后端构建了一个方法来处理入参并发送请求给GetFileTypeStatistic这个API。
package com.aliyun.dataworks.services; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.enums.DagType; import com.aliyun.dataworks.enums.SchedulerType; import com.aliyuncs.dataworks_public.model.v20200518.*; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.utils.StringUtils; import org.junit.Assert; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * @author dataworks demo */ @Service public class WorkbenchOpenApiService { @Autowired private DataWorksOpenApiClient dataWorksOpenApiClient; /** * 帮助文档 https://help.aliyun.com/document_detail/212717.html * * @param projectId * @param projectEnv * @return Array of {@link GetFileTypeStatisticResponse.ProgramTypeAndCount} */ public List<GetFileTypeStatisticResponse.ProgramTypeAndCount> getFileTypeStatistic(Long projectId, String projectEnv) { Assert.assertNotNull(projectId); projectEnv = StringUtils.isEmpty(projectEnv) ? "PROD" : projectEnv; GetFileTypeStatisticRequest request = new GetFileTypeStatisticRequest(); request.setProjectId(projectId); request.setProjectEnv(projectEnv); try { GetFileTypeStatisticResponse response = dataWorksOpenApiClient.createClient().getAcsResponse(request); // 打印出requestId,方便排查问题 System.out.println(response.getRequestId()); return response.getProgramTypeAndCounts(); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); // 请求ID System.out.println(e.getRequestId()); // 错误码 System.out.println(e.getErrCode()); // 错误信息 System.out.println(e.getErrMsg()); } return null; } }
实现一个入口供前端调用。
package com.aliyun.dataworks.demo; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.services.EventService; import com.aliyun.dataworks.services.WorkbenchOpenApiService; import com.aliyuncs.dataworks_public.model.v20200518.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.Map; /** * @author dataworks demo */ @RestController @RequestMapping("/screen") public class WorkbenchScreenController { @Autowired private WorkbenchOpenApiService workbenchOpenApiService; @Autowired private EventService eventService; /** * 获取项目空间下文件类型分布情况 * * @param projectId * @param projectEnv * @return */ @GetMapping("/getFileTypeStatistic") public List<GetFileTypeStatisticResponse.ProgramTypeAndCount> getFileTypeStatistic(Long projectId, String projectEnv) { return workbenchOpenApiService.getFileTypeStatistic(projectId, projectEnv); } }
前端代码开发。
在前端选用饼图来展示数据。
import React from 'react'; import { Loading } from '@alifd/next'; import { Pie } from '@antv/g2plot'; import cn from 'classnames'; import * as services from '../services'; import type { FileStatus } from '../services/workbench'; import classes from '../styles/app.module.css'; export interface Props { projectId: number; } const TaskStatus: React.FunctionComponent<Props> = (props) => { const ref = React.useRef(null); const [visible, setVisible] = React.useState(false); const [data, setData] = React.useState<FileStatus[]>([]); const [chart, setChart] = React.useState<Pie>(); // 获取后端数据的方法 const fetchData = async (signal: AbortSignal, projectId: number) => { try { setVisible(true); const response = await services.workbench.getFileTypeStatistic(signal, projectId); chart?.changeData(response); setData(response); } catch (e) { console.error(e); } finally { setVisible(false); } }; // 首次渲染或projectId变化时重新获取数据并渲染 React.useEffect(() => { const controller = new AbortController(); props.projectId && fetchData(controller.signal, props.projectId); return () => { controller.abort(); }; }, [ props.projectId, ]); // 实例化饼图 React.useEffect(() => { if (ref.current) { const pie = new Pie(ref.current, { data, angleField: 'count', colorField: 'programType', label: { type: 'inner', offset: '-50%', content: '{value}', style: { textAlign: 'center', fontSize: 18, }, }, statistic: { title: false, }, }); setChart(pie); pie.render(); return () => { pie.destroy(); }; } }, [ ref.current, ]); // 渲染组件 return ( <Loading className={cn(classes.appLoading)} visible={visible}> <div ref={ref} /> </Loading> ); }; export default TaskStatus;
完成上述代码开发后,您可在本地部署并运行工程代码。部署并运行的操作请参见通用操作:本地部署。
完成以上实践步骤后,您可搭建一个昨日任务类型分布的大盘,如下所示。
实践8:昨日运行状态分布
完成准备工作后,以下步骤将介绍昨日运行状态分布的整个实现过程。
后端代码开发。
在后端构建了一个方法来处理入参并发送请求给GetInstanceStatusStatistic这个API。
package com.aliyun.dataworks.services; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.enums.DagType; import com.aliyun.dataworks.enums.SchedulerType; import com.aliyuncs.dataworks_public.model.v20200518.*; import com.aliyuncs.exceptions.ClientException; import com.aliyuncs.exceptions.ServerException; import com.aliyuncs.utils.StringUtils; import org.junit.Assert; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; /** * @author dataworks demo */ @Service public class WorkbenchOpenApiService { @Autowired private DataWorksOpenApiClient dataWorksOpenApiClient; /** * 帮助文档 https://help.aliyun.com/document_detail/212637.html * * @param getInstanceStatusStatisticDto * @return */ public GetInstanceStatusStatisticResponse.StatusCount getInstanceStatusStatistic( GetInstanceStatusStatisticDto getInstanceStatusStatisticDto) { Assert.assertNotNull(getInstanceStatusStatisticDto.getBizDate()); Assert.assertNotNull(getInstanceStatusStatisticDto.getProjectId()); String projectEnv = StringUtils.isEmpty(getInstanceStatusStatisticDto.getProjectEnv()) ? "PROD" : getInstanceStatusStatisticDto.getProjectEnv(); GetInstanceStatusStatisticRequest request = new GetInstanceStatusStatisticRequest(); request.setBizDate(getInstanceStatusStatisticDto.getBizDate()); if (getInstanceStatusStatisticDto.getDagType() != null) { DagType dagType = DagType.valueOf(getInstanceStatusStatisticDto.getDagType()); if (dagType != null) { request.setDagType(dagType.name()); } } request.setProjectEnv(projectEnv); if (getInstanceStatusStatisticDto.getSchedulerType() != null) { SchedulerType schedulerType = SchedulerType.valueOf(getInstanceStatusStatisticDto.getSchedulerType()); if (schedulerType != null) { request.setSchedulerType(schedulerType.name()); } } request.setProjectId(getInstanceStatusStatisticDto.getProjectId()); try { GetInstanceStatusStatisticResponse response = dataWorksOpenApiClient.createClient().getAcsResponse(request); // 打印出requestId,方便排查问题 System.out.println(response.getRequestId()); return response.getStatusCount(); } catch (ServerException e) { e.printStackTrace(); } catch (ClientException e) { e.printStackTrace(); // 请求ID System.out.println(e.getRequestId()); // 错误码 System.out.println(e.getErrCode()); // 错误信息 System.out.println(e.getErrMsg()); } return null; } }
实现一个入口供前端调用。
package com.aliyun.dataworks.demo; import com.aliyun.dataworks.dto.GetInstanceStatusStatisticDto; import com.aliyun.dataworks.services.EventService; import com.aliyun.dataworks.services.WorkbenchOpenApiService; import com.aliyuncs.dataworks_public.model.v20200518.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.CrossOrigin; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.Map; /** * @author dataworks demo */ @RestController @RequestMapping("/screen") public class WorkbenchScreenController { @Autowired private WorkbenchOpenApiService workbenchOpenApiService; @Autowired private EventService eventService; /** * 获取项目空间下实例状态分布 * * @param getInstanceStatusStatisticDto * @return */ @GetMapping("/getInstanceStatusStatistic") public GetInstanceStatusStatisticResponse.StatusCount getInstanceStatusStatistic( GetInstanceStatusStatisticDto getInstanceStatusStatisticDto) { return workbenchOpenApiService.getInstanceStatusStatistic(getInstanceStatusStatisticDto); } }
前端代码开发。
在前端选用饼图来展示数据。
import React from 'react'; import { Loading } from '@alifd/next'; import { Pie } from '@antv/g2plot'; import cn from 'classnames'; import * as services from '../services'; import classes from '../styles/app.module.css'; export interface Props { projectId: number; } export interface Point { type: string; value: number; } const InstanceStatus: React.FunctionComponent<Props> = (props) => { const ref = React.useRef(null); const [visible, setVisible] = React.useState(false); const [data, setData] = React.useState<Point[]>([]); const [chart, setChart] = React.useState<Pie>(); // 获取后端数据的方法 const fetchData = async (signal: AbortSignal) => { try { setVisible(true); const nextData: Point[] = []; const response = await services.workbench.getInstanceStatus(signal, props.projectId); nextData.push({ type: '未运行', value: response.notRunCount }); nextData.push({ type: '等待中', value: response.waitTimeCount + response.waitResCount }); nextData.push({ type: '运行中', value: response.runningCount }); nextData.push({ type: '运行成功', value: response.successCount }); nextData.push({ type: '运行失败', value: response.failureCount }); chart?.changeData(nextData); setData(nextData); } catch (e) { console.error(e); } finally { setVisible(false); } }; // 首次渲染或projectId变化时重新获取数据并渲染 React.useEffect(() => { const controller = new AbortController(); props.projectId && fetchData(controller.signal); return () => { controller.abort(); }; }, [ props.projectId, ]); // 实例化饼图 React.useEffect(() => { if (ref.current) { const pie = new Pie(ref.current, { data, angleField: 'value', colorField: 'type', innerRadius: 0.6, label: { type: 'inner', offset: '-50%', content: '{value}', style: { textAlign: 'center', fontSize: 18, }, }, statistic: { title: false, }, }); setChart(pie); pie.render(); return () => { pie.destroy(); }; } }, [ ref.current, ]); // 渲染组件 return ( <Loading className={cn(classes.appLoading)} visible={visible}> <div ref={ref} /> </Loading> ); }; export default InstanceStatus;
完成上述代码开发后,您可在本地部署并运行工程代码。部署并运行的操作请参见通用操作:本地部署。
完成以上实践步骤后,您可搭建一个昨日运行状态分布的大盘,如下所示。
通用操作:本地部署
准备依赖环境。
您需准备好以下依赖环境:java8及以上、maven构建工具、node环境、pnpm工具。您可以执行以下命令来确定是否具备上述环境:
npm -v // 如果已安装成功,执行命令将展示版本号,否则会报没有命令错误 java -version // 如果已安装成功,执行命令将展示版本号,否则会报没有命令错误 pnpm -v // 如果已安装成功,执行命令将展示版本号,否则会报没有命令错误
下载工程。
工程下载链接:workbench-screen-demo.zip。下载工程后,进入工程中并执行以下命令:
pnpm i
在范例工程中的backend/src/main/resources路径下找到application.properties文件,修改文件中的核心参数。
api.access-key-id与api.access-key-secret为您阿里云账号的AccessKey ID与AccessKey Secret。
您可以在AccessKey 管理页面获取阿里云账号的相关信息。
api.region-id、api.endpoint需修改为待调用OpenAPI的地域信息。
其他参数可根据实际情况修改,修改后的填写示例如下。
在工程根目录下执行以下命令,运行示例实践代码。
npm run dev
- 本页导读 (1)
- 背景信息
- 准备工作:开启并配置消息订阅(OpenEvent)
- 实践1:实时任务运行监控
- 实践2:实时节点状态监控
- 实践3:一周调度任务数量趋势
- 实践4:任务完成情况
- 实践5:近一个月运行时长排行
- 实践6:近一个月任务出错排行
- 实践7:昨日任务类型分布
- 实践8:昨日运行状态分布
- 通用操作:本地部署