文档

通过函数计算节点实现GitHub实时数据分析与结果发送

更新时间:

本实践联合使用DataWorks、函数计算、MaxCompute、OSS等阿里云产品,以同步公共Github数据并分析处理,通过函数计算节点的周期性调度实现发送处理结果到指定邮箱为例,为您介绍如何通过函数计算将数据分析结果发送至指定邮箱。

注意事项

本实验中的,任务代码可以通过ETL工作流模板一键导入,直接体验。在导入模板后,您可以前往目标工作空间,并自行完成任务运维等后续操作。

说明

本文中所示案例与实际应用中的ETL模板在工作流细节上存在一定差异,但两者实现的案例效果大体相同。建议您在参考本文案例的同时,结合ETL工作流模板进行综合理解。

实践场景介绍

DataWorks为您提供了一个公共的MySQL数据源,其中存储了来自公开Github的实时数据,本实践将此数据进行同步分析,最终将分析结果通过邮件发送至指定邮箱。主要业务过程如下。

  • 通过DataWorks的数据集成功能,将MySQL上的Github实时数据同步至MaxCompute。

  • 将同步至MaxCompute的数据进行分析处理,查询获取过去1小时Github中Top10的代码语言及提交次数,并将处理结果存储于阿里云OSS。

  • 在函数计算中开发一个Python函数,函数逻辑为将OSS中的处理结果发送至指定邮箱。

  • 通过DataWorks的调度能力,周期性触发函数计算的函数调度,实现数据处理结果周期性发送至邮箱。

资源准备

进行本实践前,您需先开通涉及的阿里云产品并完成以下准备工作。

说明

OSS侧操作:创建OSS Bucket

登录OSS控制台,在Bucket列表页面单击创建Bucket,配置Bucket名称地域后单击确定,创建OSS Bucket。

image.png

函数计算侧操作:创建并开发函数

  1. 登录函数计算控制台创建服务并为服务添加OSS权限。

    由于后续开发的函数代码逻辑需要读取OSS Bucket中的数据并将数据发送至指定邮箱,因此需给函数计算的服务授予OSS的权限。

    1. 服务及函数页面的左上角切换地域,并单击创建服务,配置服务名称后单击确定

    2. 单击创建好的服务,单击左侧服务详情页签,在角色配置区域单击服务角色AliyunFcDefaultRole,进入RAM访问控制的角色页面。

    3. 单击新增授权,选择系统策略中的AliyunOSSReadOnlyAccess权限,根据界面提示进行添加。完成后即给函数计算服务授予了OSS的读权限。

  2. 创建函数并开发函数逻辑。

    1. 创建函数。

      回到函数计算中创建的服务页面,单击左侧函数管理页签,单击创建函数,配置函数名称,并选择运行环境Python3.9,其他参数可保持默认值,完成后单击创建

    2. 为函数环境安装相关依赖包。

      说明

      本实践需使用oss2阿里云二方包和pandas开源三方包。其中oss2包 python3.9 runtime内置支持无需手动安装,您需参考以下步骤手动安装panadas包。

      单击创建好的函数,在函数页面的函数配置页签中,单击配置区域后的编辑,单击添加层,选择添加官方公共层后,选择Pandas1.x,完成后单击确定

      image.png

    3. 回到函数页面后,单击函数代码页签,当WebIDE的Python环境加载完成后,复制以下代码至Index.py文件中,并修改其中的OSS内网Endpoint参数、邮箱相关参数。

      # -*- coding: utf-8 -*-
      import logging
      import json
      import smtplib
      import oss2
      import pandas as pd
      from email.mime.text import MIMEText
      from email.mime.multipart import MIMEMultipart
      from email.mime.base import MIMEBase
      from email.mime.text import MIMEText
      from email.utils import COMMASPACE
      from email import encoders
      
      def handler(event, context):
        evts = json.loads(event)
        bucket_name = evts["bucketName"]
        file_path = evts["filePath"]
      
        auth = oss2.StsAuth(context.credentials.access_key_id, context.credentials.access_key_secret, context.credentials.security_token)
        endpoint = 'https://oss-{}-internal.aliyuncs.com'.format(context.region)
        bucket = oss2.Bucket(auth, endpoint, bucket_name)
        file_name = file_path
        for obj in oss2.ObjectIteratorV2(bucket, prefix=file_path):
          if not obj.key.endswith('/'):
            file_name = obj.key
      
        csv_file = bucket.get_object(file_name)
      
        logger = logging.getLogger()
        logger.info('event: %s', evts)
        mail_host = 'smtp.***.com'                 ## 邮箱服务地址
        mail_port = '465';                         ## 邮箱smtp协议端口号
        mail_username = 'sender_****@163.com'      ## 身份认证用户名:填完整的邮箱名
        mail_password = 'EWEL******KRU'            ## 身份认证密码:填邮箱 SMTP 授权码
        mail_sender = 'sender_****@163.com'        ## 发件人邮箱地址
        mail_receivers = ['receiver_****@163.com'] ## 收件人邮箱地址
      
        message = MIMEMultipart('alternative')
        message['Subject'] = 'Github数据加工结果'
        message['From'] = mail_sender
        message['To'] = mail_receivers[0]
      
        html_message = generate_mail_content(evts, csv_file)
        message.attach(html_message)
      
        # Send email
        smtpObj = smtplib.SMTP_SSL(mail_host + ':' + mail_port)
        smtpObj.login(mail_username,mail_password)
        smtpObj.sendmail(mail_sender,mail_receivers,message.as_string())
        smtpObj.quit()
        return 'mail send success'
      
      def generate_mail_title(evt):
        mail_title=''
        if 'mailTitle' in evt.keys():
            mail_content=evt['mailTitle']
        else:
            logger = logging.getLogger()
            logger.error('msg not present in event')
        return mail_title
      
      def generate_mail_content(evts, csv_file):
        headerList = ['Github Repos', 'Stars']
         # Read csv file content
        dumped_file = pd.read_csv(csv_file, names=headerList)
        # Convert DataFrame to HTML table
        table_html = dumped_file.to_html(header=headerList,index=False)
        # Convert DataFrame to HTML table
        table_html = dumped_file.to_html(index=False)
        mail_title=generate_mail_title(evts)
      
        # Email body
        html = f"""
        <html>
        <body>
        <h2>{mail_title}</h2>
        <p>Here are the top 10 languages on GitHub in the past hour:</p>
        {table_html}
        </body>
        </html>
        """
      
        # Attach HTML message
        html_message = MIMEText(html, 'html')
        return html_message
      说明

      示例代码中使用到了bucketNamefilePathmailTitle这三个变量,此三个变量的取值后续通过DataWorks的函数计算节点同步取值,无需在代码中修改。

      待修改参数

      配置指导

      OSS内网Endpoint

      (第20行)

      根据您当前操作的地域,将其中的'https://oss-{}-internal.aliyuncs.com'替换为OSS的内网Endpoint取值。

      以上海地域为例,需修改参数为'https://oss-cn-shanghai-internal.aliyuncs.com'

      各地域的OSS内网Endpoint信息请参见访问域名和数据中心

      邮箱相关参数

      (31~36行)

      根据实际业务需要:

      • 修改31~35行为后续发送邮件邮箱服务地址、smtp协议端口号、邮箱用户名及密码等信息。

      • 修改36行为后续接收邮件的邮箱地址。

      说明

      您可在您使用的邮箱帮助文档中查看如何获取相关取值。以163邮箱为例,您可以参考SMTP服务器是什么如何使用授权码获取相关信息。

    4. 完成代码开发后,单击部署代码

DataWorks侧操作:创建数据源并绑定计算引擎

  1. 创建MySQL数据源。

    本实践使用的公共Github数据存储在公共的MySQL数据库中,您需要先创建一个MySQL数据源,用于后续同步数据至MaxCompute时对接MySQL数据库。

    1. 进入数据源页面。

      1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的管理中心,在下拉框中选择对应工作空间后单击进入管理中心

      2. 进入工作空间管理中心页面后,单击左侧导航栏的数据源,进入数据源页面。

    2. 单击新建数据源,选择数据源类型为MySQL,根据界面提示配置数据源名称等参数,核心参数如下。

      参数

      说明

      数据源类型

      选择连接串模式

      数据源名称

      自定义。本文以github_events_share为例。

      JDBC URL

      配置为:jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/github_events_share

      重要

      该数据源仅支持数据同步场景去读取使用,其他模块不支持。

      用户名

      配置为:workshop

      密码

      配置为:workshop#2017

      此密码仅为本教程示例,请勿在实际业务中使用。

      认证选项

      无认证。

  2. 创建MaxCompute数据源。

    后续需将Github数据同步至MaxCompute,因此您需创建一个MaxCompute数据源。

    1. 进入数据源页面。

      1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的管理中心,在下拉框中选择对应工作空间后单击进入管理中心

      2. 进入工作空间管理中心页面后,单击左侧导航栏的数据源,进入数据源页面。

    2. 单击新建数据源,选择数据源类型为MaxCompute,根据界面提示配置数据源名称、对应的MaxCompute项目等参数,详细请参见创建MaxCompute数据源

  3. 绑定MaxCompute数据源为计算引擎。

    后续需创建一个MaxCompute的SQL任务进行数据处理,因此您需要将MaxCompute数据源绑定为DataWorks的计算引擎,便于后续创建ODPS SQL节点进行SQL任务开发。

    1. 进入管理中心页面。

      登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的管理中心,在下拉框中选择对应工作空间后单击进入管理中心

    2. 单击计算引擎信息页签,在MaxCompute页签中单击增加实例,选择上述步骤中创建好的MaxCompute数据源。

DataWorks侧操作:创建业务流程并开发数据处理任务

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据建模与开发 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 创建业务流程。

    单击左上角的新建 > 新建业务流程,配置业务名称后单击新建

  3. 创建业务节点并配置依赖关系。

    image.png

    1. 双击创建的业务名称,打开业务流程页面。

    2. 在业务流程页面拖拽离线同步节点进业务流程页面,配置节点名称后单击确认,创建一个离线同步节点。

    3. 重复上述步骤,再创建一个ODPS SQL节点、函数计算节点。

  4. 配置离线同步节点。

    1. 双击业务流程中创建的离线同步节点,进入离线同步节点页面。

    2. 配置离线同步任务的网络与资源。

      image.png

      配置项

      配置说明

      数据来源

      选择数据来源为MySQL,数据源选择上述步骤创建的MySQL数据源。

      数据去向

      选择数据去向为MaxCompute,数据源选择默认的odps_first。

      我的资源

      选择右下角的更多选项 > 公共资源组(调试资源组)

      完成后单击下一步,根据界面提示完成网络连通测试。

    3. 配置离线同步任务,核心参数如下,其他参数可保持默认。

      配置项

      配置说明

      数据来源

      • :在下拉框中选择github_public_event。

      • 数据过滤:配置为

        created_at >'${day1} ${hour1}' and created_at<'${day2} ${hour2}' 

      数据去向

      • :单击一键生成目标表结构,在弹框中单击新建表

      • 分区信息:配置为pt=${day_hour}

    4. 单击页面右侧的调度配置,配置调度参数,核心参数如下,其他参数可保持默认。

      配置项

      配置说明

      调度参数

      单击加载代码中的参数,新增以下五个参数并配置参数的取值逻辑如下:

      • day1:$[yyyy-mm-dd-1/24]

      • hour1:$[hh24-1/24]

      • day2:$[yyyy-mm-dd]

      • hour2:$[hh24]

      • day_hour:$[yyyymmddhh24]

      时间属性

      • 调度周期:配置为小时

      • 重跑属性:配置为运行成功或失败后皆可重跑

      调度依赖

      勾选使用工作空间根节点

    5. 单击右上角的保存按钮,保存节点配置。

  5. 配置ODPS SQL节点。

    1. 双击业务流程中创建的ODPS SQL节点,进入ODPS SQL节点页面。

    2. 将以下示例代码贴入节点中。

      重要

      以下示例代码创建了一个OSS外部表,用于存储处理后的数据。如果您是首次使用OSS外部表,您还需对当前操作账号进行授权,否则后续业务流程运行会报错,授权操作请参见OSS的STS模式授权

      -- 1. 创建odps的oss外部表用于接收Github公共数据集数据加工结果。
      -- 本案例创建的oss外表为odps_external,存放于在步骤1创建的OSS Bucket,本案例OSS Bucket名为xc-bucket-demo2,您需要根据实际情况进行修改。
      
      CREATE EXTERNAL TABLE IF NOT EXISTS odps_external(
      	language STRING COMMENT 'repo全名:owner/Repository_name',
      	num STRING COMMENT '提交次数'
      ) 
      partitioned by (
      direction string
      )
      STORED BY  'com.aliyun.odps.CsvStorageHandler' 
      WITH SERDEPROPERTIES(
      	'odps.text.option.header.lines.count'='0',
      	 'odps.text.option.encoding'='UTF-8',
      	 'odps.text.option.ignore.empty.lines'='false',
      	 'odps.text.option.null.indicator'='') 
      LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/${YOUR_BUCKET_NAME}/odps_external/';
      
      -- 2. 对同步MaxCompute的GitHub数据加工后写入MaxCompute的oss外表。
      -- 查询获取过去1小时Github中Top10的代码语言及提交次数
      SET odps.sql.unstructured.oss.commit.mode=true;
      INSERT INTO TABLE odps_external partition (direction='${day_hour}') 
      SELECT
          language,
          COUNT(*) AS num
      FROM
          github_public_event
      WHERE language IS NOT NULL AND pt='${day_hour}' 
      GROUP BY
          language
      ORDER BY num DESC
      limit 10; 
    3. 单击页面右侧的调度配置,配置调度参数,核心参数如下,其他参数可保持默认。

      配置项

      配置说明

      调度参数

      单击加载代码中的参数,新增以下几个参数并配置参数的取值逻辑:

      • YOUR_BUCKET_NAME:参数值为上述步骤中创建的OSS Bucket名称。

      • day_hour:$[yyyymmddhh24]

      时间属性

      • 调度周期:配置为小时

      • 重跑属性:配置为运行成功或失败后皆可重跑

    4. 单击右上角的保存按钮,保存节点配置。

  6. 配置函数计算节点。

    1. 双击业务流程中创建的函数计算节点,进入函数计算节点页面。

    2. 配置函数计算节点任务。

      配置项

      配置说明

      选择服务

      选择上述步骤在函数计算控制台中创建的服务。

      选择函数

      选择上述步骤在函数计算控制台中创建的函数。

      调用方式

      选择同步

      变量

      配置为以下内容。

      {
          "bucketName": "${YOUR_BUCKET_NAME}",
          "filePath": "odps_external/direction=${day_hour}/",
          "mailTitle":"过去1小时Github中Top10的代码语言及其提交次数"
      }
    3. 单击页面右侧的调度配置,配置调度参数,核心参数如下,其他参数可保持默认。

      配置项

      配置说明

      调度参数

      单击新增参数,新增以下几个参数并配置参数的取值逻辑如下:

      • YOUR_BUCKET_NAME:参数值为上述步骤中创建的OSS Bucket名称。

      • day_hour:$[yyyymmddhh24]

      时间属性

      • 调度周期:配置为小时

      • 重跑属性:配置为运行成功或失败后皆可重跑

    4. 单击右上角的保存按钮,保存节点配置。

DataWorks侧:调试工作流

  1. 在DataWorks数据开发页面双击创建的业务名称,打开业务流程页面。

  2. 单击顶部的运行按钮,调试运行整个业务流程。

  3. 当界面提示运行完成后,您可登录收取数据处理结果的邮箱查看邮件。

DataWorks侧:提交发布工作流

(可选)后续如果您希望周期性同步数据至MaxCompute进行处理,并周期性发送处理结果到指定邮箱,您需要将业务流程提交发布至DataWorks的运维中心。

  1. 在数据开发页面,双击创建的业务名称,打开业务流程页面。

  2. 单击业务流程页面的提交按钮,根据界面提示将业务流程提交发布至运维中心,操作详情请参见发布任务

    后续业务流程即会根据配置的调度周期,周期性运行。

后续步骤:释放资源

如果您使用的是免费试用资源,或后续您不需要继续使用此实践的云产品,可释放对应的云产品资源,避免产生额外费用。

  • 本页导读 (1)
文档反馈