统计MaxCompute TOPN费用账号及耗时作业

数据开发者在使用MaxCompute开发过程中,需要统计MaxCompute项目中账号的费用以及作业的耗时情况,助力合理规划和调整作业。本文为您介绍如何通过MaxCompute元数据(Information Schema)统计TOP费用账号及耗时作业,同时通过钉钉推送到客户群。

背景信息

通常,数据开发者会通过DataWorks标准模式使用MaxCompute,MaxCompute会在Information Schema中记录所有作业的执行账号为同一个主账号,只有小部分的作业执行账号为RAM用户。此时数据开发者会关注如何统计各个账号的费用和耗时作业。MaxCompute提供如下方案解决这两个问题:

  • 账号费用:您可以通过账单详情中的用量明细来查询,但是这种方式无法将用量明细归属到对应的RAM用户。Information Schema视图中的TASKS_HISTORY会记录MaxCompute项目内已完成的作业详情,且保留近14天数据。您可以将TASKS_HISTORY中的数据备份到指定MaxCompute项目中,基于该数据统计TOP费用账号。

  • 耗时作业:您可以通过TASKS_HISTORY中的数据统计TOP耗时作业。

更多关于Information Schema的功能及使用限制,请参见项目级别Information Schema

步骤一:获取Information Schema服务

自2024年03月01日开始,MaxCompute停止对新增项目自动安装项目级别Information Schema,即新增的项目默认没有项目级别Information Schema的Package。若您有查元数据的业务,您可以查询租户级别的Information Schema,以便获取更全的信息。租户级别Information Schema的具体使用说明请参见租户级别Information Schema

对于存量MaxCompute项目,在您开始使用Information Schema服务前,需要以项目所有者(Project Owner)或具备Super_Administrator管理角色的RAM用户身份安装Information Schema权限包,获得访问项目元数据的权限。更多为用户授权管理角色操作信息,请参见将角色赋予用户。安装方式有如下两种:

说明

如果统计多个MaxCompute项目的元数据,您需要分别对各个MaxCompute项目安装Information Schema权限包。然后把各个MaxCompute项目的元数据的备份数据插入到同一个表中做集中统计分析。

(可选)步骤二:对除Project Owner外的用户授权

Information Schema的视图包含了项目级别的所有用户数据,默认项目所有者可以查看。如果项目内其他用户或角色需要查看,需要进行授权,请参见基于Package跨项目访问资源

授权语法如下。

grant <actions> on package Information_Schema.systables to user <user_name>;
grant <actions> on package Information_Schema.systables to role <role_name>;
  • actions:待授予的操作权限,取值为Read。

  • user_name:已添加至项目中的阿里云账号或RAM用户。

    您可以通过MaxCompute客户端执行list users;命令获取用户账号。

  • role_name:已添加至项目中的角色。

    您可以通过MaxCompute客户端执行list roles;命令获取角色名称。

授权示例如下。

grant read on package Information_Schema.systables to user RAM$Bob@aliyun.com:user01;

步骤三:下载并备份元数据

在MaxCompute项目上创建元数据备份表,并定时将元数据写入备份表中。以MaxCompute客户端为例,操作流程如下:

  1. 登录MaxCompute客户端,执行如下命令创建元数据备份表。

    --project_name为MaxCompute项目名称。
    create table if not exists <project_name>.information_history
    (
        task_catalog STRING
        ,task_schema STRING
        ,task_name STRING
        ,task_type STRING
        ,inst_id STRING
        ,`status` STRING
        ,owner_id STRING
        ,owner_name STRING
        ,result STRING
        ,start_time DATETIME
        ,end_time DATETIME
        ,input_records BIGINT
        ,output_records BIGINT
        ,input_bytes BIGINT
        ,output_bytes BIGINT
        ,input_tables STRING
        ,output_tables STRING
        ,operation_text STRING
        ,signature STRING
        ,complexity DOUBLE
        ,cost_cpu DOUBLE
        ,cost_mem DOUBLE
        ,settings STRING
        ,ds STRING
    );
  2. 进入DataWorks数据开发界面,创建ODPS SQL节点(information_history)并配置定时调度,用于定时将数据写入备份表information_history。完成后单击左上角保存图标保存。

    创建ODPS SQL节点操作,请参见开发ODPS SQL任务

    ODPS SQL节点运行的命令示例如下:

    --project_name为MaxCompute项目名称。
    use <project_name>;
    insert into table <project_name>.information_history select * from information_schema.tasks_history where ds ='datetime1';

    ${datetime1}为DataWorks的调度参数,您需要在ODPS SQL节点右侧,单击调度配置,在基础属性区域配置参数值为datetime1=${yyyymmdd}

    说明

    如果需要同时对多个MaxCompute项目的元数据进行统计分析,您可以创建多个ODPS SQL节点,将这些MaxCompute项目的元数据写入到同一张数据备份表中。

步骤四:创建统计TOPN费用账号及耗时作业

TASKS_HISTORY视图中的settings会记录上层调度或用户传入的信息,以JSON格式存储。包含的具体信息有:useragent、bizid、skynet_id和skynet_nodename。您可以通过settings字段定位到创建作业的RAM用户信息。因此您可以基于备份数据表计算TOPN费用账号及耗时作业。操作流程如下:

  1. 登录MaxCompute客户端,创建一张RAM用户明细表user_ram,记录需要统计的账号及账号ID。

    命令示例如下:

    create table if not exists <project_name>.user_ram
    (
        user_id STRING
        ,user_name STRING
    );
  2. 创建一张统计账号费用的明细表cost_topn,记录TOPN费用账号明细。

    命令示例如下:

    create table if not exists <project_name>.cost_topn
    (
        cost_sum DECIMAL(38,5)
        ,task_owner STRING
    )
    partitioned by 
    (
        ds STRING
    );
  3. 建一张统计耗时作业的明细表time_topn,记录TOPN耗时作业明细。

    命令示例如下:

    create table if not exists <project_name>.time_topn
    (
        inst_id STRING
        ,cost_time BIGINT
        ,task_owner STRING
    )
    partitioned by 
    (
        ds STRING
    );
  4. 进入DataWorks数据开发界面,创建ODPS SQL节点(topn)并配置定时调度,用于定时将cost_topn表中统计的数据写入user_ram表。完成后单击左上角保存图标保存。

    创建ODPS SQL节点操作,请参见开发ODPS SQL任务

    ODPS SQL节点运行的命令示例如下:

    --开启2.0数据类型开关。2.0数据类型详情,请参见2.0数据类型版本。
    set odps.sql.decimal.odps2=true;
    --将元数据写入cost_topn、time_topn表。user_id为账号ID。您可以在个人信息页面查看账号ID。
    insert into table <project_name>.cost_topn partition (ds = '${datetime1}')
    select   
    nvl(cost_sum,0) cost_sum
            ,case when a.task_owner='<user_id>' or a.task_owner='<user_id>' or a.task_owner='<user_id>' then b.user_name 
                  else a.task_owner 
             end task_owner 
    from    (
                select  inst_id
                        ,owner_name
                        ,task_type
                        ,a.input_bytes
                        ,a.cost_cpu
                        ,a.status
                        ,case    when a.task_type = 'SQL' then cast(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 as DECIMAL(18,5) )
                                 when a.task_type = 'SQLRT' then cast(a.input_bytes/1024/1024/1024 * a.complexity * 0.3 as DECIMAL(18,5) )
                                 when a.task_type = 'CUPID' and a.status='Terminated'then cast(a.cost_cpu/100/3600 * 0.66 as DECIMAL(18,5) ) 
                                 else 0 
                         end cost_sum
                        ,a.settings
                        ,get_json_object(settings, "$.SKYNET_ONDUTY") owner
                        ,case    when get_json_object(a.settings, "$.SKYNET_ONDUTY") is null then owner_name 
                                 else get_json_object(a.settings, "$.SKYNET_ONDUTY") 
                         end task_owner
                from    information_history
                where   ds = '${datetime1}'
            ) a
    left join <project_name>.user_ram b
    on      a.task_owner = b.user_id;
    
    insert into table <project_name>.time_topn partition(ds = '${datetime1}')
    select  inst_id
            ,cost_time
            ,case    when a.task_owner='<user_id>' or a.task_owner='<user_id>' or a.task_owner='<user_id>' then b.user_name 
                     else a.task_owner 
             end task_owner
    from    (
                select  inst_id
                        ,task_type
                        ,status
                        ,datediff(a.end_time, a.start_time, 'ss') AS cost_time
                        ,case    when get_json_object(a.settings, "$.SKYNET_ONDUTY") is null then owner_name 
                                 else get_json_object(a.settings, "$.SKYNET_ONDUTY") 
                         end task_owner
                from    <project_name>.information_history a
                where   ds = '${datetime1}'
            ) a
    left join <project_name>.user_ram b
    on      a.task_owner = b.user_id
    ;
    说明

    示例中的task_type = 'SQL'表示SQL作业,task_type = 'SQLRT'表示查询加速作业,task_type = 'CUPID'表示Spark作业。如果需要统计其他计费作业,例如MapReduce、Lightning(交互式分析)、Mars,您可以按照计费公式添加相应代码行。计费详情,请参见计算费用(按量计费)

    ${datetime1}为DataWorks的调度参数,您需要在ODPS SQL节点右侧,单击调度配置,在基础属性区域配置参数值为datetime1=${yyyymmdd}

步骤五:创建钉钉群机器人并推送TOPN费用账号及耗时作业信息

以PC端为例,创建钉钉群机器人并推送TOPN费用账号及耗时作业信息的操作流程如下:

  1. 创建钉钉群机器人。

    1. 选择目标钉钉群,单击右上角的1图标。

    2. 群设置面板,单击智能群助手

    3. 智能群助手面板,单击添加机器人

    4. 群机器人对话框的添加机器人区域,单击添加图标。

    5. 群机器人对话框,单击自定义机器人。

    6. 机器人详情对话框,单击添加

    7. 添加机器人对话框,编辑机器人信息。

      属性名称

      设置规则

      头像

      单击头像右下角的编辑图标来编辑头像。

      机器人名字

      输入机器人名字。

      安全设置

      完成必要的安全设置(至少选择1种),勾选我已阅读并同意《自定义机器人服务及免责条款》,单击完成

      安全设置有3种方式:

      • 自定义关键词:最多可以设置10个关键词。

      • 加签:勾选加签可以获取到机器人的密钥。

      • IP地址(段):只有来自IP地址范围内的请求才会被正常处理。

    8. 添加机器人对话框,复制生成的Webhook地址。单击完成

      重要

      请保管好此Webhook地址,不要公布在外部网站上,泄露后会有安全风险。

  2. 通过IntelliJ IDEA创建Maven项目并编译推送钉钉群消息的Java程序,编译完成后生成JAR包。

    IntelliJ IDEA操作详情,请单击IntelliJ IDEA工具界面右上角的Help获取。

    1. 配置Pom依赖。

      Pom依赖如下。

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <groupId>DingTalk_Information</groupId>
          <artifactId>DingTalk_Information</artifactId>
          <version>1.0-SNAPSHOT</version>
          <dependencies>
              <dependency>
                  <groupId>com.aliyun.odps</groupId>
                  <artifactId>odps-sdk-core</artifactId>
                  <version>0.35.5-public</version>
              </dependency>
              <dependency>
                  <groupId>log4j</groupId>
                  <artifactId>log4j</artifactId>
                  <version>1.2.15</version>
                  <exclusions>
                      <exclusion>
                          <groupId>com.sun.jmx</groupId>
                          <artifactId>jmxri</artifactId>
                      </exclusion>
                      <exclusion>
                          <groupId>com.sun.jdmk</groupId>
                          <artifactId>jmxtools</artifactId>
                      </exclusion>
                      <exclusion>
                          <groupId>javax.jms</groupId>
                          <artifactId>jms</artifactId>
                      </exclusion>
                  </exclusions>
              </dependency>
              <dependency>
                  <groupId>com.aliyun</groupId>
                  <artifactId>alibaba-dingtalk-service-sdk</artifactId>
                  <version>1.0.1</version>
              </dependency>
              <dependency>
                  <groupId>com.aliyun.odps</groupId>
                  <artifactId>odps-jdbc</artifactId>
                  <version>3.0.1</version>
                  <classifier>jar-with-dependencies</classifier>
              </dependency>
          </dependencies>
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-assembly-plugin</artifactId>
                      <version>2.4.1</version>
                      <configuration>
                          <!-- get all project dependencies -->
                          <descriptorRefs>
                              <descriptorRef>jar-with-dependencies</descriptorRef>
                          </descriptorRefs>
                          <!-- MainClass in mainfest make a executable jar -->
                          <archive>
                              <manifest>
                                  <mainClass>com.alibaba.sgri.message.test</mainClass>
                              </manifest>
                          </archive>
                      </configuration>
                      <executions>
                          <execution>
                              <id>make-assembly</id>
                              <!-- bind to the packaging phase -->
                              <phase>package</phase>
                              <goals>
                                  <goal>single</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      </project>
    2. 开发Java程序并生成JAR包topn_new.jar。

      Java代码示例如下:

      package com.alibaba.sgri.message;
      import java.io.IOException;
      import java.util.concurrent.atomic.AtomicInteger;
      import com.aliyun.odps.Instance;
      import com.aliyun.odps.Odps;
      import com.aliyun.odps.OdpsException;
      import com.aliyun.odps.account.Account;
      import com.aliyun.odps.account.AliyunAccount;
      import com.aliyun.odps.data.ResultSet;
      import com.aliyun.odps.task.SQLTask;
      import com.dingtalk.api.DefaultDingTalkClient;
      import com.dingtalk.api.DingTalkClient;
      import com.dingtalk.api.request.OapiRobotSendRequest;
      import com.dingtalk.api.response.OapiRobotSendResponse;
      import com.taobao.api.ApiException;
      
      public class test {
      
          public static void main(String[] args) throws ApiException {
              if (args.length < 1) {
                  System.out.println("请输入日期参数");
                  System.exit(0);
              }
              System.out.println("开始读取数据");
              DingTalkClient client = new DefaultDingTalkClient(
                      "https://oapi.dingtalk"
                              + ".com/robot/send?access_token=<机器人Webhook地址>\n");
              OapiRobotSendRequest request = new OapiRobotSendRequest();
              request.setMsgtype("markdown");
              OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
              //这里的日期作为参数
              markdown.setText(getContent(args[0]));
              markdown.setTitle("作业消费TOPN");
              request.setMarkdown(markdown);
              OapiRobotSendResponse response = client.execute(request);
              System.out.println("消息发送成功");
          }
      
          /**
           * 读取ODPS,获取要发送的数据
           */
      
          public static String getContent(String day) {
              Odps odps = createOdps();
              StringBuilder sb = new StringBuilder();
              try {
                  //==================这是费用账号=====================
                  String costTopnSql = "select sum(cost_sum)cost_sum,task_owner from cost_topn where ds='" + day + "' " + "group by task_owner order by cost_sum desc limit 5;";
                  Instance costInstance = SQLTask.run(odps, costTopnSql);
                  costInstance.waitForSuccess();
                  ResultSet costTopnRecords = SQLTask.getResultSet(costInstance);
                  sb.append("<font color=#FF0000 size=4>").append("费用账号TOPN(").append(day).append(
                          ")[按照阿里云按量付费计算]").append("</font>").append("\n\n");
                  AtomicInteger costIndex = new AtomicInteger(1);
                  costTopnRecords.forEach(item -> {
                      sb.append(costIndex.getAndIncrement()).append(".").append("账号:");
                      sb.append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
                      sb.append("  ").append(" ").append("消费:").append("<font color=#2E64FE>").append(item.get("cost_sum"))
                              .append("元").append(
                              "</font>").append("\n\n")
                              .append("</font>");
                  });
                  //==================这是耗时作业=====================
                  String timeTopnSql = "select * from time_topn where ds='" + day + "' ORDER BY cost_time DESC limit 5;";
                  Instance timeInstance = SQLTask.run(odps, timeTopnSql);
                  timeInstance.waitForSuccess();
                  ResultSet timeTopnRecords = SQLTask.getResultSet(timeInstance);
                  sb.append("<font color=#FF8C00 size=4>").append("耗时作业TOPN(").append(day).append(")")
                          .append("\n\n").append("</font>");
                  AtomicInteger timeIndex = new AtomicInteger(1);
                  timeTopnRecords.forEach(item -> {
                      sb.append(timeIndex.getAndIncrement()).append(".").append("作业:");
                      sb.append("<font color=#2E64FE>").append(item.getString("inst_id")).append("\n\n").append("</font>");
                      sb.append("   ").append("账号:").append("<font color=#2E64FE>").append(item.getString("task_owner")).append("\n\n").append("</font>");
                      sb.append("   ").append("耗时:).append("<font color=#2E64FE>").append(item.get("cost_time"))
                              .append("秒").append(
                              "</font>").append("\n\n");
                  });
              } catch (OdpsException | IOException e) {
                  e.printStackTrace();
              }
              return sb.toString();
          }
      
          /**
           * 创建ODPS
           */
          public static Odps createOdps() {
              String project = "<project_name>";
              // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
      				// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
      				// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
      				private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
      				private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
              String endPoint = "http://service.odps.aliyun.com/api";
              Account account = new AliyunAccount(accessId, accessKey);
              Odps odps = new Odps(account);
              odps.setEndpoint(endPoint);
              odps.setDefaultProject(project);
              return odps;
          }
      }
      说明

      自定义钉钉群机器人开发API,请参见机器人开发

    3. 上传生成的topn_new.jar包为MaxCompute资源。

      上传MaxCompute资源操作,请参见创建并使用MaxCompute资源

  3. 创建Shell节点(dingsend),引用topn_new.jar包并配置定时调度。

    创建Shell节点操作,请参见Shell节点

    Shell节点运行的命令示例如下:

    java -jar  topn_new.jar  $1

    $1为DataWorks的调度参数,您需要在Shell节点右侧,单击调度配置,在基础属性区域配置参数值为${yyyymmdd}

步骤六:配置上下游节点调度属性并运行节点

在业务流程面板将information_history、topn和dingsend节点连线形成依赖关系,并配置每个节点的重跑属性和依赖的上游节点。配置完成后在节点上单击右键,选择运行节点即可。

依赖关系配置,请参见配置同周期调度依赖

节点上下游配置,请参见配置节点上下文

效果展示

钉钉群推送内容效果如下,仅供参考。

推送

相关文档

在线支持

如果您在使用MaxCompute的过程中有任何疑问或建议,欢迎填写钉钉群申请表单加入钉钉群进行反馈。