数据开发者在使用MaxCompute开发过程中,需要统计MaxCompute项目中账号的费用以及作业的耗时情况,助力合理规划和调整作业。本文为您介绍如何通过MaxCompute元数据(Information Schema)统计TOP费用账号及耗时作业,同时通过钉钉推送到客户群。
背景信息
通常,数据开发者会通过DataWorks标准模式使用MaxCompute,MaxCompute会在Information Schema中记录所有作业的执行账号为同一个主账号,只有小部分的作业执行账号为RAM用户。此时数据开发者会关注如何统计各个账号的费用和耗时作业。MaxCompute提供如下方案解决这两个问题:
账号费用:您可以通过账单详情中的用量明细来查询,但是这种方式无法将用量明细归属到对应的RAM用户。Information Schema视图中的TASKS_HISTORY会记录MaxCompute项目内已完成的作业详情,且保留近14天数据。您可以将TASKS_HISTORY中的数据备份到指定MaxCompute项目中,基于该数据统计TOP费用账号。
耗时作业:您可以通过TASKS_HISTORY中的数据统计TOP耗时作业。
更多关于Information Schema的功能及使用限制,请参见项目级别Information Schema。
统计MaxCompute TOPN费用账号及耗时作业的流程如下:
步骤一:获取Information Schema服务
自2024年03月01日开始,MaxCompute停止对新增项目自动安装项目级别Information Schema,即新增的项目默认没有项目级别Information Schema的Package。若您有查元数据的业务,您可以查询租户级别的Information Schema,以便获取更全的信息。租户级别Information Schema的具体使用说明请参见租户级别Information Schema。
对于存量MaxCompute项目,在您开始使用Information Schema服务前,需要以项目所有者(Project Owner)或具备Super_Administrator管理角色的RAM用户身份安装Information Schema权限包,获得访问项目元数据的权限。更多为用户授权管理角色操作信息,请参见将角色赋予用户。安装方式有如下两种:
登录MaxCompute客户端,执行如下命令:
install package Information_Schema.systables;
登录DataWorks控制台,进入临时查询界面。更多临时查询操作详情,请参见使用临时查询运行SQL语句(可选)。执行如下命令:
install package Information_Schema.systables;
执行示例如下。
如果统计多个MaxCompute项目的元数据,您需要分别对各个MaxCompute项目安装Information Schema权限包。然后把各个MaxCompute项目的元数据的备份数据插入到同一个表中做集中统计分析。
(可选)步骤二:对除Project Owner外的用户授权
Information Schema的视图包含了项目级别的所有用户数据,默认项目所有者可以查看。如果项目内其他用户或角色需要查看,需要进行授权,请参见基于Package跨项目访问资源。
授权语法如下。
grant <actions> on package Information_Schema.systables to user <user_name>;
grant <actions> on package Information_Schema.systables to role <role_name>;
actions:待授予的操作权限,取值为Read。
user_name:已添加至项目中的阿里云账号或RAM用户。
您可以通过MaxCompute客户端执行
list users;
命令获取用户账号。role_name:已添加至项目中的角色。
您可以通过MaxCompute客户端执行
list roles;
命令获取角色名称。
授权示例如下。
grant read on package Information_Schema.systables to user RAM$Bob@aliyun.com:user01;
步骤三:下载并备份元数据
在MaxCompute项目上创建元数据备份表,并定时将元数据写入备份表中。以MaxCompute客户端为例,操作流程如下:
登录MaxCompute客户端,执行如下命令创建元数据备份表。
--project_name为MaxCompute项目名称。 create table if not exists <project_name>.information_history ( task_catalog STRING ,task_schema STRING ,task_name STRING ,task_type STRING ,inst_id STRING ,`status` STRING ,owner_id STRING ,owner_name STRING ,result STRING ,start_time DATETIME ,end_time DATETIME ,input_records BIGINT ,output_records BIGINT ,input_bytes BIGINT ,output_bytes BIGINT ,input_tables STRING ,output_tables STRING ,operation_text STRING ,signature STRING ,complexity DOUBLE ,cost_cpu DOUBLE ,cost_mem DOUBLE ,settings STRING ,ds STRING );
进入DataWorks数据开发界面,创建ODPS SQL节点(information_history)并配置定时调度,用于定时将数据写入备份表information_history。完成后单击左上角图标保存。
创建ODPS SQL节点操作,请参见开发ODPS SQL任务。
ODPS SQL节点运行的命令示例如下:
--project_name为MaxCompute项目名称。 use <project_name>; insert into table <project_name>.information_history select * from information_schema.tasks_history where ds ='datetime1';
${datetime1}
为DataWorks的调度参数,您需要在ODPS SQL节点右侧,单击调度配置,在基础属性区域配置参数值为datetime1=${yyyymmdd}
。说明如果需要同时对多个MaxCompute项目的元数据进行统计分析,您可以创建多个ODPS SQL节点,将这些MaxCompute项目的元数据写入到同一张数据备份表中。
步骤四:创建统计TOPN费用账号及耗时作业
TASKS_HISTORY视图中的settings会记录上层调度或用户传入的信息,以JSON格式存储。包含的具体信息有:useragent、bizid、skynet_id和skynet_nodename。您可以通过settings字段定位到创建作业的RAM用户信息。因此您可以基于备份数据表计算TOPN费用账号及耗时作业。操作流程如下:
登录MaxCompute客户端,创建一张RAM用户明细表user_ram,记录需要统计的账号及账号ID。
命令示例如下:
create table if not exists <project_name>.user_ram ( user_id STRING ,user_name STRING );
创建一张统计账号费用的明细表cost_topn,记录TOPN费用账号明细。
命令示例如下:
create table if not exists <project_name>.cost_topn ( cost_sum DECIMAL(38,5) ,task_owner STRING ) partitioned by ( ds STRING );
建一张统计耗时作业的明细表time_topn,记录TOPN耗时作业明细。
命令示例如下:
create table if not exists <project_name>.time_topn ( inst_id STRING ,cost_time BIGINT ,task_owner STRING ) partitioned by ( ds STRING );
进入DataWorks数据开发界面,创建ODPS SQL节点(topn)并配置定时调度,用于定时将cost_topn表中统计的数据写入user_ram表。完成后单击左上角图标保存。
创建ODPS SQL节点操作,请参见开发ODPS SQL任务。
ODPS SQL节点运行的命令示例如下:
--开启2.0数据类型开关。2.0数据类型详情,请参见2.0数据类型版本。 set odps.sql.decimal.odps2=true; --将元数据写入cost_topn、time_topn表。user_id为账号ID。您可以在个人信息页面查看账号ID。 insert into table <project_name>.cost_topn partition (ds = '${datetime1}') select nvl(cost_sum,0) cost_sum ,case when a.task_owner='<user_id>' or a.task_owner='<user_id>' or a.task_owner='<user_id>' then b.user_name else a.task_owner end task_owner from ( select inst_id ,owner_name ,task_type ,a.input_bytes ,a.cost_cpu ,a.status ,case when a.task_type = 'SQL' then cast(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 as DECIMAL(18,5) ) when a.task_type = 'SQLRT' then cast(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 as DECIMAL(18,5) ) when a.task_type = 'CUPID' and a.status='Terminated'then cast(a.cost_cpu/100/3600 * 0.66 as DECIMAL(18,5) ) else 0 end cost_sum ,a.settings ,get_json_object(settings, "$.SKYNET_ONDUTY") owner ,case when get_json_object(a.settings, "$.SKYNET_ONDUTY") is null then owner_name else get_json_object(a.settings, "$.SKYNET_ONDUTY") end task_owner from information_history where ds = '${datetime1}' ) a left join <project_name>.user_ram b on a.task_owner = b.user_id; insert into table <project_name>.time_topn partition(ds = '${datetime1}') select inst_id ,cost_time ,case when a.task_owner='<user_id>' or a.task_owner='<user_id>' or a.task_owner='<user_id>' then b.user_name else a.task_owner end task_owner from ( select inst_id ,task_type ,status ,datediff(a.end_time, a.start_time, 'ss') AS cost_time ,case when get_json_object(a.settings, "$.SKYNET_ONDUTY") is null then owner_name else get_json_object(a.settings, "$.SKYNET_ONDUTY") end task_owner from <project_name>.information_history a where ds = '${datetime1}' ) a left join <project_name>.user_ram b on a.task_owner = b.user_id ;
说明示例中的
task_type = 'SQL'
表示SQL作业,task_type = 'SQLRT'
表示查询加速作业,task_type = 'CUPID'
表示Spark作业。如果需要统计其他计费作业,例如MapReduce、Lightning(交互式分析)、Mars,您可以按照计费公式添加相应代码行。计费详情,请参见计算费用(按量计费)。${datetime1}
为DataWorks的调度参数,您需要在ODPS SQL节点右侧,单击调度配置,在基础属性区域配置参数值为datetime1=${yyyymmdd}
。
步骤五:创建钉钉群机器人并推送TOPN费用账号及耗时作业信息
以PC端为例,创建钉钉群机器人并推送TOPN费用账号及耗时作业信息的操作流程如下:
创建钉钉群机器人。
选择目标钉钉群,单击右上角的图标。
在群设置面板,单击智能群助手。
在智能群助手面板,单击添加机器人。
在群机器人对话框的添加机器人区域,单击图标。
在群机器人对话框,单击自定义机器人。
在机器人详情对话框,单击添加。
在添加机器人对话框,编辑机器人信息。
属性名称
设置规则
头像
单击头像右下角的图标来编辑头像。
机器人名字
输入机器人名字。
安全设置
完成必要的安全设置(至少选择1种),勾选我已阅读并同意《自定义机器人服务及免责条款》,单击完成。
安全设置有3种方式:
自定义关键词:最多可以设置10个关键词。
加签:勾选加签可以获取到机器人的密钥。
IP地址(段):只有来自IP地址范围内的请求才会被正常处理。
在添加机器人对话框,复制生成的Webhook地址。单击完成。
重要请保管好此Webhook地址,不要公布在外部网站上,泄露后会有安全风险。
通过IntelliJ IDEA创建Maven项目并编译推送钉钉群消息的Java程序,编译完成后生成JAR包。
IntelliJ IDEA操作详情,请单击IntelliJ IDEA工具界面右上角的Help获取。
配置Pom依赖。
Pom依赖如下。
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>DingTalk_Information</groupId> <artifactId>DingTalk_Information</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-sdk-core</artifactId> <version>0.35.5-public</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.15</version> <exclusions> <exclusion> <groupId>com.sun.jmx</groupId> <artifactId>jmxri</artifactId> </exclusion> <exclusion> <groupId>com.sun.jdmk</groupId> <artifactId>jmxtools</artifactId> </exclusion> <exclusion> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>alibaba-dingtalk-service-sdk</artifactId> <version>1.0.1</version> </dependency> <dependency> <groupId>com.aliyun.odps</groupId> <artifactId>odps-jdbc</artifactId> <version>3.0.1</version> <classifier>jar-with-dependencies</classifier> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.4.1</version> <configuration> <!-- get all project dependencies --> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <!-- MainClass in mainfest make a executable jar --> <archive> <manifest> <mainClass>com.alibaba.sgri.message.test</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <!-- bind to the packaging phase --> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
开发Java程序并生成JAR包topn_new.jar。
Java代码示例如下:
package com.alibaba.sgri.message; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; import com.aliyun.odps.Instance; import com.aliyun.odps.Odps; import com.aliyun.odps.OdpsException; import com.aliyun.odps.account.Account; import com.aliyun.odps.account.AliyunAccount; import com.aliyun.odps.data.ResultSet; import com.aliyun.odps.task.SQLTask; import com.dingtalk.api.DefaultDingTalkClient; import com.dingtalk.api.DingTalkClient; import com.dingtalk.api.request.OapiRobotSendRequest; import com.dingtalk.api.response.OapiRobotSendResponse; import com.taobao.api.ApiException; public class test { public static void main(String[] args) throws ApiException { if (args.length < 1) { System.out.println("请输入日期参数"); System.exit(0); } System.out.println("开始读取数据"); DingTalkClient client = new DefaultDingTalkClient( "https://oapi.dingtalk" + ".com/robot/send?access_token=<机器人Webhook地址>\n"); OapiRobotSendRequest request = new OapiRobotSendRequest(); request.setMsgtype("markdown"); OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown(); //这里的日期作为参数 markdown.setText(getContent(args[0])); markdown.setTitle("作业消费TOPN"); request.setMarkdown(markdown); OapiRobotSendResponse response = client.execute(request); System.out.println("消息发送成功"); } /** * 读取ODPS,获取要发送的数据 */ public static String getContent(String day) { Odps odps = createOdps(); StringBuilder sb = new StringBuilder(); try { //==================这是费用账号===================== String costTopnSql = "select sum(cost_sum)cost_sum,task_owner from cost_topn where ds='" + day + "' " + "group by task_owner order by cost_sum desc limit 5;"; Instance costInstance = SQLTask.run(odps, costTopnSql); costInstance.waitForSuccess(); ResultSet costTopnRecords = SQLTask.getResultSet(costInstance); sb.append("<font color=#FF0000 size=4>").append("费用账号TOPN(").append(day).append( ")[按照阿里云按量付费计算]").append("</font>").append("\n\n"); AtomicInteger costIndex = new AtomicInteger(1); costTopnRecords.forEach(item -> { sb.append(costIndex.getAndIncrement()).append(".").append("账号:"); sb.append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>"); sb.append(" ").append(" ").append("消费:").append("<font color=#2E64FE>").append(item.get("cost_sum")) .append("元").append( "</font>").append("\n\n") .append("</font>"); }); //==================这是耗时作业===================== String timeTopnSql = "select * from time_topn where ds='" + day + "' ORDER BY cost_time DESC limit 5;"; Instance timeInstance = SQLTask.run(odps, timeTopnSql); timeInstance.waitForSuccess(); ResultSet timeTopnRecords = SQLTask.getResultSet(timeInstance); sb.append("<font color=#FF8C00 size=4>").append("耗时作业TOPN(").append(day).append(")") .append("\n\n").append("</font>"); AtomicInteger timeIndex = new AtomicInteger(1); timeTopnRecords.forEach(item -> { sb.append(timeIndex.getAndIncrement()).append(".").append("作业:"); sb.append("<font color=#2E64FE>").append(item.getString("inst_id")).append("\n\n").append("</font>"); sb.append(" ").append("账号:").append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>"); sb.append(" ").append("耗时:).append("<font color=#2E64FE>").append(item.get("cost_time")) .append("秒").append( "</font>").append("\n\n"); }); } catch (OdpsException | IOException e) { e.printStackTrace(); } return sb.toString(); } /** * 创建ODPS */ public static Odps createOdps() { String project = "<project_name>"; // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户 // 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里 // 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险 private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); String endPoint = "http://service.odps.aliyun.com/api"; Account account = new AliyunAccount(accessId, accessKey); Odps odps = new Odps(account); odps.setEndpoint(endPoint); odps.setDefaultProject(project); return odps; } }
说明自定义钉钉群机器人开发API,请参见机器人开发。
上传生成的topn_new.jar包为MaxCompute资源。
上传MaxCompute资源操作,请参见创建并使用MaxCompute资源。
创建Shell节点(dingsend),引用topn_new.jar包并配置定时调度。
创建Shell节点操作,请参见Shell节点。
Shell节点运行的命令示例如下:
java -jar topn_new.jar $1
$1
为DataWorks的调度参数,您需要在Shell节点右侧,单击调度配置,在基础属性区域配置参数值为${yyyymmdd}
。
步骤六:配置上下游节点调度属性并运行节点
在业务流程面板将information_history、topn和dingsend节点连线形成依赖关系,并配置每个节点的重跑属性和依赖的上游节点。配置完成后在节点上单击右键,选择运行节点即可。
依赖关系配置,请参见配置同周期调度依赖。
节点上下游配置,请参见配置节点上下文。
效果展示
钉钉群推送内容效果如下,仅供参考。
相关文档
在线支持
如果您在使用MaxCompute的过程中有任何疑问或建议,欢迎填写钉钉群申请表单加入钉钉群进行反馈。