全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 阿里云办公 培训与认证 物联网
批量计算

Python快速入门示例

更新时间:2017-06-20 14:12:52   分享:   

Python快速入门示例

本文档将介绍如何使用 Python 版 SDK 来提交一个作业,目的是统计一个日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数。

如果您还没开通批量计算服务,请先开通

步骤预览

  • 作业准备
    • 上传数据文件到OSS
    • 上传任务程序到OSS
  • 使用SDK创建(提交)作业
  • 查看结果

1. 作业准备

本作业是统计一个日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数。

该作业包含3个任务: split, count 和 merge:

  • split 任务会把日志文件分成 3 份。
  • count 任务会统计每份日志文件中“INFO”,”WARN”,”ERROR”,”DEBUG”出现的次数 (count 任务需要配置InstanceCount为3,表示同时启动3台机器运行个 count 程序)。
  • merge 任务会把 count 任务的结果统一合并起来。

DAG图例:

DAG图例

(1) 上传数据文件到OSS

下载本例子所需的数据: log-count-data.txt

将 log-count-data.txt 上传到:

oss://your-bucket/log-count/log-count-data.txt

  • your-bucket表示您自己创建的bucket,本例子假设region为: cn-shenzhen.
  • 如何上传到OSS,请参考OSS上传文档

(2) 上传任务程序到OSS

本例子的作业程序是使用python编写的, 下载本例子所需的程序: log-count.tar.gz

本例子不需要改动示例代码。直接将 log-count.tar.gz 上传到 oss,如上传到:

oss://your-bucket/log-count/log-count.tar.gz。

如何上传前面已经讲过。

  • BatchCompute 只支持以 tar.gz 为后缀的压缩包, 请注意务必用以上方式(gzip)打包, 否则将会无法解析。
  • 如果您要修改代码,可以解压后修改,然后要用下面的方法打包:

    命令如下:

    1. > cd log-count #进入目录
    2. > tar -czf log-count.tar.gz * #打包,将所有这个目录下的文件打包到 log-count.tar.gz

    可以运行这条命令查看压缩包内容:

    1. $ tar -tvf log-count.tar.gz

    可以看到以下列表:

    1. conf.py
    2. count.py
    3. merge.py
    4. split.py

2. 使用SDK创建(提交)作业

python SDK 的相关下载与安装请参阅这里

v20151111版本,提交作业需要指定集群ID或者使用匿名集群参数。本例子使用匿名集群方式进行。匿名集群需要配置2个参数, 其中:

  • 可用的镜像ID, 可以使用系统提供的Image,也可以自行制作镜像, 请看使用镜像
  • 实例规格(InstanceType,实例类型),请看 目前支持类型

在 OSS 中创建存储StdoutRedirectPath(程序输出结果)和StderrRedirectPath(错误日志)的文件路径,本例中创建的路径为

oss://your-bucket/log-count/logs/

  • 如需运行本例子,请按照上文所述的变量获取以及与上文对应的您的OSS路径对程序中注释中的变量进行修改。

Python SDK 提交程序模板如下,程序中具体参数含义请参照这里

  1. #encoding=utf-8
  2. import sys
  3. from batchcompute import Client, ClientError
  4. from batchcompute import CN_SHENZHEN as REGION
  5. from batchcompute.resources import (
  6. JobDescription, TaskDescription, DAG, AutoCluster
  7. )
  8. ACCESS_KEY_ID='' # 填写您的AK
  9. ACCESS_KEY_SECRET='' # 填写您的AK
  10. IMAGE_ID = 'img-ubuntu' #这里填写您的镜像ID
  11. INSTANCE_TYPE = 'bcs.a2.large' # 根据实际region支持的InstanceType 填写
  12. WORKER_PATH = '' # 'oss://your-bucket/log-count/log-count.tar.gz' 这里填写您上传的log-count.tar.gz的OSS存储路径
  13. LOG_PATH = '' # 'oss://your-bucket/log-count/logs/' 这里填写您创建的错误反馈和task输出的OSS存储路径
  14. OSS_MOUNT= '' # 'oss://your-bucket/log-count/' 同时挂载到/home/inputs 和 /home/outputs
  15. client = Client(REGION, ACCESS_KEY_ID, ACCESS_KEY_SECRET)
  16. def main():
  17. try:
  18. job_desc = JobDescription()
  19. # Create auto cluster.
  20. cluster = AutoCluster()
  21. cluster.InstanceType = INSTANCE_TYPE
  22. cluster.ResourceType = "OnDemand"
  23. cluster.ImageId = IMAGE_ID
  24. # Create split task.
  25. split_task = TaskDescription()
  26. split_task.Parameters.Command.CommandLine = "python split.py"
  27. split_task.Parameters.Command.PackagePath = WORKER_PATH
  28. split_task.Parameters.StdoutRedirectPath = LOG_PATH
  29. split_task.Parameters.StderrRedirectPath = LOG_PATH
  30. split_task.InstanceCount = 1
  31. split_task.AutoCluster = cluster
  32. split_task.InputMapping[OSS_MOUNT]='/home/input'
  33. split_task.OutputMapping['/home/output'] = OSS_MOUNT
  34. # Create map task.
  35. count_task = TaskDescription(split_task)
  36. count_task.Parameters.Command.CommandLine = "python count.py"
  37. count_task.InstanceCount = 3
  38. count_task.InputMapping[OSS_MOUNT] = '/home/input'
  39. count_task.OutputMapping['/home/output'] = OSS_MOUNT
  40. # Create merge task
  41. merge_task = TaskDescription(split_task)
  42. merge_task.Parameters.Command.CommandLine = "python merge.py"
  43. merge_task.InstanceCount = 1
  44. merge_task.InputMapping[OSS_MOUNT] = '/home/input'
  45. merge_task.OutputMapping['/home/output'] = OSS_MOUNT
  46. # Create task dag.
  47. task_dag = DAG()
  48. task_dag.add_task(task_name="split", task=split_task)
  49. task_dag.add_task(task_name="count", task=count_task)
  50. task_dag.add_task(task_name="merge", task=merge_task)
  51. task_dag.Dependencies = {
  52. 'split': ['count'],
  53. 'count': ['merge']
  54. }
  55. # Create job description.
  56. job_desc.DAG = task_dag
  57. job_desc.Priority = 99 # 0-1000
  58. job_desc.Name = "log-count"
  59. job_desc.Description = "PythonSDKDemo"
  60. job_desc.JobFailOnInstanceFail = True
  61. job_id = client.create_job(job_desc).Id
  62. print('job created: %s' % job_id)
  63. except ClientError, e:
  64. print (e.get_status_code(), e.get_code(), e.get_requestid(), e.get_msg())
  65. if __name__ == '__main__':
  66. sys.exit(main())

3. 查看作业状态

您可以用SDK中的 获取作业信息 方法获取作业状态:

  1. jobInfo = client.get_job(job_id)
  2. print (jobInfo.State)

State状态可能为:Waiting, Running, Finished, Failed, Stopped.

4. 查看结果

您可以登录OSS控制台 查看your-bucket 这个bucket下面的这个文件:/log-count/merge_result.json。

内容应该如下:

  1. {"INFO": 2460, "WARN": 2448, "DEBUG": 2509, "ERROR": 2583}
  • 您也可以使用OSS的SDK来获取结果。
本文导读目录
本文导读目录
以上内容是否对您有帮助?