全部产品
云市场

获取批量计算Metrics

更新时间:2019-09-10 10:34:41

本文主要介绍批量计算metrics统计项以及获取方式。批量计算对外的资源主要表现在集群和作业两个维度,因此metrics统计也从以上2个维度展现。

集群 Metrics

Metrics 项目

集群对外提供的 Metrics 统计项包括如下:

统计项 名称 单位 聚合统计方法
cls_dataVfsFsSizePused 数据盘利用率 % Average,Maximum,Minimum
cls_systemCpuLoad CPU负载 % Average,Maximum,Minimum
cls_systemCpuUtilIdle CPU空闲率 % Average,Maximum,Minimum
cls_systemCpuUtilUsed CPU利用率 % Average,Maximum,Minimum
cls_vfsFsSizePused 系统盘利用率 % Average,Maximum,Minimum
cls_vmMemorySizePused 内存利用率 % Average,Maximum,Minimum
  • 以上统计项目以实例为单位进行上报,也即每个实例都有以上统计项;
  • 每个metrics项目上报的数据以clusterId、GroupId、InstanceId等维度组织;
  • 每条记录都包括,当前统计项在过去1分钟内的平均值、最大值以及最小值;
  • 批量计算默认10秒钟推送一次数据;
  • 获取 Metrics 时可以设置聚合周期(“Period”),默认为1分钟。

数据记录示例

clsdata

控制台获取方法

clsinfoinstanceInfoinstanceMetrics

作业 Metrics

Metrics 项目

作业对外提供的 Metrics 统计项包括如下:

统计项 名称 单位 聚合统计方法
job_dataVfsFsSizePused 数据盘利用率 % Average,Maximum,Minimum
job_systemCpuLoad CPU负载 % Average,Maximum,Minimum
job_systemCpuUtilIdle CPU空闲率 % Average,Maximum,Minimum
job_systemCpuUtilUsed CPU利用率 % Average,Maximum,Minimum
job_vfsFsSizePused 系统盘利用率 % Average,Maximum,Minimum
job_vmMemorySizePused 内存利用率 % Average,Maximum,Minimum

数据记录示例

jobdata

控制台获取方式

jobInfojobMetrics

相关 API

批量计算所有的Metrics统计信息都推送到阿里云云监控服务中。所有 Metrics 获取依赖云监控API。

DescribeMetricMetaList

查询 Metrics 项,通过该接口可以获取批量计算对外提供的统计项。

DescribeMetricData

查询 Metrics 统计数据,通过该接口可以获取指定的集群或者作业的各个统计项的数据

Demo 示例代码

  1. #!/usr/bin/env python
  2. #coding=utf-8
  3. # https://help.aliyun.com/document_detail/51936.html?spm=a2c4g.11186623.6.692.347048d34VV7RU
  4. import os
  5. import json
  6. import time
  7. import sys
  8. import datetime
  9. from functools import wraps
  10. from aliyunsdkcore.client import AcsClient
  11. from aliyunsdkcore.acs_exception.exceptions import ClientException
  12. from aliyunsdkcore.acs_exception.exceptions import ServerException
  13. from aliyunsdkcms.request.v20190101.DescribeMetricListRequest import DescribeMetricListRequest
  14. from aliyunsdkcms.request.v20190101.DescribeMetricMetaListRequest import DescribeMetricMetaListRequest
  15. akId = 'your key Id'
  16. akKey = 'your key'
  17. region = 'cn-hangzhou'
  18. # jobId = "job-000000005D16F74B00006883000303E9"
  19. jobId = "job-000000005D16F74B00006883000332DD"
  20. def retryWrapper(func):
  21. @wraps(func)
  22. def wrapper(*args,**kwargs):
  23. index = 0
  24. while True:
  25. try:
  26. res = func(*args,**kwargs)
  27. break
  28. except Exception,e:
  29. if index > 6:
  30. raise Exception(str(e))
  31. else:
  32. time.sleep(0.5 * pow(2,index))
  33. index += 1
  34. return res
  35. return wrapper
  36. @retryWrapper
  37. def listBatchMetricMeta(client, objId):
  38. metrics = []
  39. request = DescribeMetricMetaListRequest()
  40. request.set_accept_format('json')
  41. request.set_Namespace("acs_batchcomputenew")
  42. response = client.do_action_with_exception(request)
  43. res = json.loads(response)
  44. prefix = objId.strip().split("-")[0]
  45. for metric in res["Resources"]["Resource"]:
  46. if prefix not in metric["MetricName"]:
  47. continue
  48. metrics.append(metric["MetricName"])
  49. return metrics
  50. @retryWrapper
  51. def getSpecJobMetricsInfo(client, objId, metrics, startTime = None):
  52. nextToken = None
  53. request = DescribeMetricListRequest()
  54. request.set_accept_format('json')
  55. request.set_Period("60")
  56. request.set_Length("1000")
  57. request.set_EndTime(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())))
  58. # 默认查询7天的记录
  59. if not startTime:
  60. sevenDayAgo = (datetime.datetime.now() - datetime.timedelta(days = 7))
  61. startTime = sevenDayAgo.strftime("%Y-%m-%d %H:%M:%S")
  62. request.set_StartTime(startTime)
  63. prefix = objId.strip().split("-")[0]
  64. if "job" in prefix:
  65. dimensionInfo = [{"jobId":objId}]
  66. else:
  67. dimensionInfo = [{"clusterId":objId}]
  68. request.set_Dimensions(json.dumps(dimensionInfo))
  69. request.set_MetricName(metrics)
  70. request.set_Namespace("acs_batchcomputenew")
  71. metricsInfo = []
  72. while True:
  73. if nextToken:
  74. request.set_NextToken(nextToken)
  75. response = client.do_action_with_exception(request)
  76. res = json.loads(response)
  77. if res.has_key("Datapoints") and len(res["Datapoints"]):
  78. metricsInfo.extend(json.loads(res["Datapoints"]))
  79. else:
  80. print res
  81. if res.has_key("NextToken") and res["NextToken"]:
  82. nextToken = res["NextToken"]
  83. continue
  84. else:
  85. break
  86. return metricsInfo
  87. if __name__ == "__main__":
  88. client = AcsClient(akId, akKey, region)
  89. # metricsName = ['job_systemCpuUtilIdle', 'job_systemCpuLoad', 'job_vmMemorySizePused', 'job_vfsFsSizePused', 'job_dataVfsFsSizePused']
  90. metricsName = listBatchMetricMeta(client, jobId)
  91. for metrics in metricsName:
  92. try:
  93. ret = getSpecJobMetricsInfo(client, jobId, metrics)
  94. except Exception,e:
  95. print "get metrics info failed, %s" % str(e)
  96. sys.exit(1)
  97. if not len(ret):
  98. continue
  99. # 可以对返回的数据进行二次聚合
  100. print ret
  • 执行示例之前,安装阿里云 SDK 库:
  • pip install aliyun_python_sdk_cms
  • pip install aliyun_python_sdk_core
  • 设置的 AK 账号必须要有 ‘AliyunCloudMonitorReadOnlyAccess’ 权限,添加权限的方式参考文档 5.2 章节;

OpenAPI 获取方式

通过 OpenAPI 可以更快的更简单的获取数据信息,只需要输入基本信息自动生成相关脚本。

openAPI_1