文档

通过函数计算节点实现GitHub实时数据分析与结果发送

更新时间:

开发人员在基于GitHub开源项目进行开发时会产生海量事件,GitHub会记录每次事件的类型、详情、开发者和代码仓库等信息,并开放其中的公开事件。DataWorks提供“Github十大热门编程语言”模板,通过对GitHub中公开数据集进行加工和分析,并将分析结果以邮箱的方式发送给指定用户。运行本案例后,您将得到Github中Top10编程语言每小时被提交的次数与排行。

说明

手动配置与实际应用中的ETL模板在工作流细节上存在一定差异,但两者实现的案例效果大体相同。

案例说明

DataWorks为您提供了一个公共的MySQL数据源,存储来自Github的公开实时数据,本案例将此数据进行同步分析,最终将分析结果通过邮件发送至指定邮箱。主要业务过程如下:

  1. 通过DataWorks的数据集成功能,将MySQL上的Github实时数据同步至MaxCompute。

  2. 将同步至MaxCompute的数据进行分析处理,查询获取过去1小时Github中Top10的代码语言及提交次数,并将处理结果存储于阿里云OSS。

  3. 在函数计算中开发一个Python函数,函数逻辑为将OSS中的处理结果发送至指定邮箱。

  4. 通过DataWorks的任务调度能力,实现过去1小时Github热门编程语言数据自动更新,并将数据处理结果发送至指定邮箱。

操作步骤

ETL模板配置

本实验中的,任务代码可以通过ETL工作流模板一键导入,直接体验。在导入模板后,您可以前往目标工作空间,并自行完成任务运维等后续操作。

前置准备

进行本案例前,请务必确保已完成以下操作:

说明

步骤一:登录并进入指定工作区间

  1. 登录DataWorks控制台

  2. 在顶部左上角根据实际情况选择地域。

  3. 在左侧导航栏选择工作空间列表,单击指定工作空间名称,进入工作空间详情页。

步骤二:创建MaxCompute数据源并绑定到工作空间

  1. 在左侧导航栏选择管理中心,进入管理中心配置页面。

  2. 在左侧导航栏选择数据源

  3. 单击新增数据源,选择数据源类型为MaxCompute,根据界面提示创建MaxCompute数据源

  4. 在管理中心配置页面的左侧导航栏单击工作空间,选择指定工作空间名称。

  5. 计算引擎信息下的MaxCompute页签下单击数据开发-数据源,选择已创建的MaxCompute数据源,进行绑定操作。

步骤三:创建OSS数据源

  1. 在左侧导航栏选择管理中心,进入管理中心配置页面。

  2. 在左侧导航栏选择数据源

  3. 单击新增数据源,选择数据源类型为OSS,创建OSS数据源。

    参数

    说明

    数据源名称

    输入数据源名称,以字母开头,由大小写字母、数字、下划线(_)组成,最多60个字符。

    数据源描述

    对数据源进行简单描述,最多80个字符。

    适用环境

    本文以使用标准模式工作空间为例,此处选中开发生产

    Endpoint

    本文以上海地域为例,此处输入http://oss-cn-shanghai.aliyuncs.com。其他地域Endpoint请参见访问域名和数据中心

    Bucket

    输入OSS Bucket。如果没有可用的Bucket,请创建Bucket

    访问模式

    • RAM角色授权模式

    • Access Key模式

    选择角色

    当访问模式为RAM角色授权模式,选择RAM角色,详情请参见通过RAM角色授权模式配置数据源

    AccessKey ID

    当访问模式为Access Key模式,需输入AccessKey信息,详情请参见查看RAM用户的AccessKey信息

    AccessKey Secret

    资源组连通性

    数据集成页签下,单击已绑定的数据集成资源组操作测试连通性,等待界面提示测试完成,连通状态列显示为可连通

步骤四:配置案例

DataWorks控制台左侧导航栏选择大数据体验 > ETL工作流模板,单击Github十大热门编程语言模板,单击载入模板,配置模板参数。

参数

说明

模板名称

显示当前模板名称,即“Github十大热门编程语言”。

工作空间

选择前置准备中创建的DataWorks工作空间。

服务开通

服务开通

本文涉及以下产品服务,请确保已全部开通。如果显示未开通,请参见前置准备开通。

  • 阿里云原生大数据计算MaxCompute

  • 阿里云对象存储OSS

  • 阿里云函数计算FC

MaxCompute配置

数据源类型

显示当前数据源类型,即MaxCompute。

数据源名称

选择步骤二中创建的MaxCompute数据源。

OSS配置

数据源类型

显示当前数据源类型,即OSS。

数据源名称

选择步骤三中创建的OSS数据源。

Bucket名称

显示步骤三创建OSS数据源时配置的Bucket。

选择文件夹

选择上述Bucket下的目录,用于存放加工后的数据。

  • 如果选择文件夹为空,则会在上述Bucket下创建与DataWorks工作空间同名的文件夹,用于存放加工后的数据。

  • 如果选择文件夹不为空,则会在选择文件夹的路径下创建与DataWorks工作空间同名的文件夹,用于存放加工后的数据。

完整路径

显示存放加工后数据的OSS路径。

函数计算配置

函数计算配置

首次配置时,单击一键创建应用,本案例将在以下产品中创建对应的数据:

  • 函数计算:在函数计算中创建名为“DataWorks_cn-shanghai_Application_**********”的应用,名为“DataWorks_cn-shanghai_Service_**********”的服务以及名为SendMail的函数。

  • DataWorks:在数据集成模块中创建名为“github_events_share”的MySQL数据源,在数据开发模块中创建名为“*****_Github十大热门编程语言”的业务流程。

案例参数配置

服务器地址

发送端服务器,格式为smtp.***.com,例如:smtp.163.com。

说明

以163邮箱为例,获取服务器地址和端口号,请参见客户端设置

端口号

发送端服务器端口,例如:上述发送邮件服务器端口号为465。

用户名

发送端用户名,例如:发送邮箱为dw***_sender@163.com,则用户名为dw***_sender。

密码

发送端密码,即发送端邮箱密码。

邮件发送地址

发送端邮箱,例如:dw***_sender@163.com。

邮件接收地址

接收端邮箱,例如:dw***_receiver@126.com。

载入方式

首次配置时,本文将为您创建“Github十大热门编程语言”业务流程,如果后续配置且该业务流程已经存在,则按以下方式载入:

  • 同名覆盖:如果已存在同名对象(任务、资源、函数、数据源、物理表),则覆盖并更新其配置内容。

  • 同名跳过:如果已存在同名对象(任务、资源、函数、数据源、物理表),则取消创建。

参数配置完成后,单击确认,进入业务流程页面。

步骤五:调试工作流

  1. 单击业务流程页面顶部的运行按钮,调试运行整个业务流程。

  2. 当界面提示运行完成后,您可登录收取数据处理结果的邮箱查看邮件。

手动配置

资源准备

进行本实践前,您需先开通涉及的阿里云产品并完成以下准备工作。

说明

OSS侧操作:创建OSS Bucket

登录OSS控制台,在Bucket列表页面单击创建Bucket,配置Bucket名称地域后单击确定,创建OSS Bucket。

image.png

函数计算侧操作:创建并开发函数

  1. 登录函数计算控制台创建服务并为服务添加OSS权限。

    由于后续开发的函数代码逻辑需要读取OSS Bucket中的数据并将数据发送至指定邮箱,因此需给函数计算的服务授予OSS的权限。

    1. 服务及函数页面的左上角切换地域,并单击创建服务,配置服务名称后单击确定

    2. 单击创建好的服务,单击左侧服务详情页签,在角色配置区域单击编辑,选择服务角色AliyunFcDefaultRole,单击保存,回到服务详情页签,在角色配置区域单击服务角色AliyunFcDefaultRole,进入RAM访问控制的角色页面。

    3. 单击新增授权,选择系统策略中的AliyunOSSReadOnlyAccess权限,根据界面提示进行添加。完成后即给函数计算服务授予了OSS的读权限。

  2. 创建函数并开发函数逻辑。

    1. 创建函数。

      回到函数计算中创建的服务页面,单击左侧函数管理页签,单击创建函数,配置函数名称,并选择运行环境Python3.9,其他参数可保持默认值,完成后单击创建

    2. 为函数环境安装相关依赖包。

      说明

      本实践需使用oss2阿里云二方包和pandas开源三方包。其中oss2包 python3.9 runtime内置支持无需手动安装,您需参考以下步骤手动安装panadas包。

      单击创建好的函数,在函数页面的函数配置页签中,单击配置区域后的编辑,单击添加层,选择添加官方公共层后,选择Pandas1.x,完成后单击确定

      image.png

    3. 回到函数页面后,单击函数代码页签,当WebIDE的Python环境加载完成后,复制以下代码至Index.py文件中,并修改其中的OSS内网Endpoint参数、邮箱相关参数。

      # -*- coding: utf-8 -*-
      import logging
      import json
      import smtplib
      import oss2
      import pandas as pd
      from email.mime.text import MIMEText
      from email.mime.multipart import MIMEMultipart
      from email.mime.base import MIMEBase
      from email.mime.text import MIMEText
      from email.utils import COMMASPACE
      from email import encoders
      
      def handler(event, context):
        evts = json.loads(event)
        bucket_name = evts["bucketName"]
        file_path = evts["filePath"]
      
        auth = oss2.StsAuth(context.credentials.access_key_id, context.credentials.access_key_secret, context.credentials.security_token)
        endpoint = 'https://oss-{}-internal.aliyuncs.com'.format(context.region)
        bucket = oss2.Bucket(auth, endpoint, bucket_name)
        file_name = file_path
        for obj in oss2.ObjectIteratorV2(bucket, prefix=file_path):
          if not obj.key.endswith('/'):
            file_name = obj.key
      
        csv_file = bucket.get_object(file_name)
      
        logger = logging.getLogger()
        logger.info('event: %s', evts)
        mail_host = 'smtp.***.com'                 ## 邮箱服务地址
        mail_port = '465';                         ## 邮箱smtp协议端口号
        mail_username = 'sender_****@163.com'      ## 身份认证用户名:填完整的邮箱名
        mail_password = 'EWEL******KRU'            ## 身份认证密码:填邮箱 SMTP 授权码
        mail_sender = 'sender_****@163.com'        ## 发件人邮箱地址
        mail_receivers = ['receiver_****@163.com'] ## 收件人邮箱地址
      
        message = MIMEMultipart('alternative')
        message['Subject'] = 'Github数据加工结果'
        message['From'] = mail_sender
        message['To'] = mail_receivers[0]
      
        html_message = generate_mail_content(evts, csv_file)
        message.attach(html_message)
      
        # Send email
        smtpObj = smtplib.SMTP_SSL(mail_host + ':' + mail_port)
        smtpObj.login(mail_username,mail_password)
        smtpObj.sendmail(mail_sender,mail_receivers,message.as_string())
        smtpObj.quit()
        return 'mail send success'
      
      def generate_mail_title(evt):
        mail_title=''
        if 'mailTitle' in evt.keys():
            mail_content=evt['mailTitle']
        else:
            logger = logging.getLogger()
            logger.error('msg not present in event')
        return mail_title
      
      def generate_mail_content(evts, csv_file):
        headerList = ['Github Repos', 'Stars']
         # Read csv file content
        dumped_file = pd.read_csv(csv_file, names=headerList)
        # Convert DataFrame to HTML table
        table_html = dumped_file.to_html(header=headerList,index=False)
        # Convert DataFrame to HTML table
        table_html = dumped_file.to_html(index=False)
        mail_title=generate_mail_title(evts)
      
        # Email body
        html = f"""
        <html>
        <body>
        <h2>{mail_title}</h2>
        <p>Here are the top 10 languages on GitHub in the past hour:</p>
        {table_html}
        </body>
        </html>
        """
      
        # Attach HTML message
        html_message = MIMEText(html, 'html')
        return html_message
      说明

      示例代码中使用到了bucketNamefilePathmailTitle这三个变量,此三个变量的取值后续通过DataWorks的函数计算节点同步取值,无需在代码中修改。

      待修改参数

      配置指导

      OSS内网Endpoint

      (第20行)

      根据您当前操作的地域,将其中的'https://oss-{}-internal.aliyuncs.com'替换为OSS的内网Endpoint取值。

      以上海地域为例,需修改参数为'https://oss-cn-shanghai-internal.aliyuncs.com'

      各地域的OSS内网Endpoint信息请参见访问域名和数据中心

      邮箱相关参数

      (31~36行)

      根据实际业务需要:

      • 修改31~35行为后续发送邮件邮箱服务地址、smtp协议端口号、邮箱用户名及密码等信息。

      • 修改36行为后续接收邮件的邮箱地址。

      说明

      您可在您使用的邮箱帮助文档中查看如何获取相关取值。以163邮箱为例,您可以参考SMTP服务器是什么如何使用授权码获取相关信息。

    4. 完成代码开发后,单击部署代码

DataWorks侧操作:创建数据源并绑定计算引擎

  1. 创建MySQL数据源。

    本实践使用的公共Github数据存储在公共的MySQL数据库中,您需要先创建一个MySQL数据源,用于后续同步数据至MaxCompute时对接MySQL数据库。

    1. 进入数据源页面。

      1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的管理中心,在下拉框中选择对应工作空间后单击进入管理中心

      2. 进入工作空间管理中心页面后,单击左侧导航栏的数据源,进入数据源页面。

    2. 单击新增数据源,选择数据源类型为MySQL,根据界面提示配置数据源名称等参数,核心参数如下。

      参数

      说明

      数据源类型

      选择连接串模式

      数据源名称

      自定义。本文以github_events_share为例。

      JDBC URL

      配置为:jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/github_events_share

      重要

      该数据源仅支持数据同步场景去读取使用,其他模块不支持。

      用户名

      配置为:workshop

      密码

      配置为:workshop#2017

      此密码仅为本教程示例,请勿在实际业务中使用。

      认证选项

      无认证。

      资源组连通性

      单击数据集成公共资源组后的测试连通性,等待界面提示测试完成,连通状态为可连通

  2. 创建MaxCompute数据源。

    后续需将Github数据同步至MaxCompute,因此您需创建一个MaxCompute数据源。

    1. 进入数据源页面。

      1. 登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的管理中心,在下拉框中选择对应工作空间后单击进入管理中心

      2. 进入工作空间管理中心页面后,单击左侧导航栏的数据源,进入数据源页面。

    2. 单击新增数据源,选择数据源类型为MaxCompute,根据界面提示配置数据源名称、对应的MaxCompute项目等参数,详细请参见创建MaxCompute数据源

  3. 绑定MaxCompute数据源为计算引擎。

    后续需创建一个MaxCompute的SQL任务进行数据处理,因此您需要将MaxCompute数据源绑定为DataWorks的计算引擎,便于后续创建ODPS SQL节点进行SQL任务开发。

    1. 进入管理中心页面。

      登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的管理中心,在下拉框中选择对应工作空间后单击进入管理中心。

    2. 单击计算引擎信息页签,在MaxCompute页签下单击数据开发-数据源,选择上述步骤中创建好的MaxCompute数据源。进行绑定操作。绑定后,才能基于数据源的连接信息读取该数据源的数据,进行后续操作。

      说明

      当数据源信息发生变更时,若当前界面数据更新不及时,请刷新当前页面更新缓存数据。

DataWorks侧操作:创建业务流程并开发数据处理任务

  1. 进入数据开发页面。

    登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据建模与开发 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

  2. 创建业务流程。

    单击左上角的新建 > 新建业务流程,配置业务名称后单击新建

  3. 创建业务节点并配置依赖关系。

    image.png

    1. 双击创建的业务名称,打开业务流程页面。

    2. 在业务流程页面单击新建节点,拖拽离线同步节点进业务流程页面,配置节点名称后单击确认,创建一个离线同步节点。

    3. 重复上述步骤,再创建一个ODPS SQL节点、函数计算节点。

  4. 配置离线同步节点。

    1. 双击业务流程中创建的离线同步节点,进入离线同步节点页面。

    2. 配置离线同步任务的网络与资源。

      image.png

      配置项

      配置说明

      数据来源

      选择数据来源为MySQL,数据源选择上述步骤创建的MySQL数据源。

      数据去向

      选择数据去向为MaxCompute,数据源选择已创建的MaxCompute数据源。

      我的资源

      选择右下角的更多选项 > 公共资源组(调试资源组)

      完成后单击下一步,根据界面提示完成网络连通测试。

    3. 配置离线同步任务,核心参数如下,其他参数可保持默认。

      配置项

      配置说明

      数据来源

      • :在下拉框中选择github_public_event。

      • 数据过滤:配置为

        created_at >'${day1} ${hour1}' and created_at<'${day2} ${hour2}' 

      数据去向

      • :单击一键生成目标表结构,在弹框中单击新建表

      • 分区信息:配置为pt=${day_hour}

    4. 单击页面右侧的调度配置,配置调度参数,核心参数如下,其他参数可保持默认。

      配置项

      配置说明

      调度参数

      单击加载代码中的参数,新增以下五个参数并配置参数的取值逻辑如下:

      • day1:$[yyyy-mm-dd-1/24]

      • hour1:$[hh24-1/24]

      • day2:$[yyyy-mm-dd]

      • hour2:$[hh24]

      • day_hour:$[yyyymmddhh24]

      时间属性

      • 调度周期:配置为小时

      • 重跑属性:配置为运行成功或失败后皆可重跑

      调度依赖

      勾选使用工作空间根节点

    5. 单击右上角的保存按钮,保存节点配置。

  5. 配置ODPS SQL节点。

    1. 双击业务流程中创建的ODPS SQL节点,进入ODPS SQL节点页面。

    2. 将以下示例代码贴入节点中。

      重要

      以下示例代码创建了一个OSS外部表,用于存储处理后的数据。如果您是首次使用OSS外部表,您还需对当前操作账号进行授权,否则后续业务流程运行会报错,授权操作请参见OSS的STS模式授权

      -- 1. 创建odps的oss外部表用于接收Github公共数据集数据加工结果。
      -- 本案例创建的oss外表为odps_external,存放于在步骤1创建的OSS Bucket,本案例OSS Bucket名为xc-bucket-demo2,您需要根据实际情况进行修改。
      
      CREATE EXTERNAL TABLE IF NOT EXISTS odps_external(
      	language STRING COMMENT 'repo全名:owner/Repository_name',
      	num STRING COMMENT '提交次数'
      ) 
      partitioned by (
      direction string
      )
      STORED BY  'com.aliyun.odps.CsvStorageHandler' 
      WITH SERDEPROPERTIES(
      	'odps.text.option.header.lines.count'='0',
      	 'odps.text.option.encoding'='UTF-8',
      	 'odps.text.option.ignore.empty.lines'='false',
      	 'odps.text.option.null.indicator'='') 
      LOCATION 'oss://oss-cn-shanghai-internal.aliyuncs.com/${YOUR_BUCKET_NAME}/odps_external/';
      
      -- 2. 对同步MaxCompute的GitHub数据加工后写入MaxCompute的oss外表。
      -- 查询获取过去1小时Github中Top10的代码语言及提交次数
      SET odps.sql.unstructured.oss.commit.mode=true;
      INSERT INTO TABLE odps_external partition (direction='${day_hour}') 
      SELECT
          language,
          COUNT(*) AS num
      FROM
          github_public_event
      WHERE language IS NOT NULL AND pt='${day_hour}' 
      GROUP BY
          language
      ORDER BY num DESC
      limit 10; 
    3. 单击页面右侧的调度配置,配置调度参数,核心参数如下,其他参数可保持默认。

      配置项

      配置说明

      调度参数

      单击加载代码中的参数,新增以下几个参数并配置参数的取值逻辑:

      • YOUR_BUCKET_NAME:参数值为上述步骤中创建的OSS Bucket名称。

      • day_hour:$[yyyymmddhh24]

      时间属性

      • 调度周期:配置为小时

      • 重跑属性:配置为运行成功或失败后皆可重跑

    4. 单击右上角的保存按钮,保存节点配置。

  6. 配置函数计算节点。

    1. 双击业务流程中创建的函数计算节点,进入函数计算节点页面。

    2. 配置函数计算节点任务。

      配置项

      配置说明

      选择服务

      选择上述步骤在函数计算控制台中创建的服务。

      选择函数

      选择上述步骤在函数计算控制台中创建的函数。

      调用方式

      选择同步

      变量

      配置为以下内容。

      {
          "bucketName": "${YOUR_BUCKET_NAME}",
          "filePath": "odps_external/direction=${day_hour}/",
          "mailTitle":"过去1小时Github中Top10的代码语言及其提交次数"
      }
    3. 单击页面右侧的调度配置,配置调度参数,核心参数如下,其他参数可保持默认。

      配置项

      配置说明

      调度参数

      单击新增参数,新增以下几个参数并配置参数的取值逻辑如下:

      • YOUR_BUCKET_NAME:参数值为上述步骤中创建的OSS Bucket名称。

      • day_hour:$[yyyymmddhh24]

      时间属性

      • 调度周期:配置为小时

      • 重跑属性:配置为运行成功或失败后皆可重跑

    4. 单击右上角的保存按钮,保存节点配置。

DataWorks侧:调试工作流

  1. 在DataWorks数据开发页面双击创建的业务名称,打开业务流程页面。

  2. 单击顶部的运行按钮,调试运行整个业务流程。

  3. 当界面提示运行完成后,您可登录收取数据处理结果的邮箱查看邮件。

DataWorks侧:提交发布工作流

(可选)后续如果您希望周期性同步数据至MaxCompute进行处理,并周期性发送处理结果到指定邮箱,您需要将业务流程提交发布至DataWorks的运维中心。

  1. 在数据开发页面,双击创建的业务名称,打开业务流程页面。

  2. 单击业务流程页面的提交按钮,根据界面提示将业务流程提交发布至运维中心,操作详情请参见发布任务

    后续业务流程即会根据配置的调度周期,周期性运行。

后续步骤:释放资源

如果您使用的是免费试用资源,或后续您不需要继续使用此实践的云产品,可释放对应的云产品资源,避免产生额外费用。

  • 本页导读 (1)
文档反馈