文档

DataWorks OpenLake一站式智能化湖仓一体数据开发

更新时间:

在本实验中,您将体验基于OpenLake House环境下的零售电子商务数据开发与分析场景,通过DataWorks实现面向多引擎协同开发、可视化工作流编排和数据目录管理等。同时实践Python编程及调试,并使用Notebook进行与AI联动的交互式数据探索与分析。

背景介绍

DataWorks简介

DataWorks是智能湖仓一体数据开发治理平台,内置阿里巴巴15年大数据建设方法论,深度适配阿里云MaxCompute、E-MapReduce、Hologres、Flink、PAI 等数十种大数据和AI计算服务,为数据仓库、数据湖、OpenLake湖仓一体数据架构提供智能化ETL开发、数据分析与主动式数据资产治理服务,助力“Data+AI”全生命周期的数据管理。自2009年起,DataWorks不断对阿里巴巴数据体系进行产品化沉淀,服务于政务、金融、零售、互联网、汽车、制造等行业,使数以万计的客户信赖并选择DataWorks进行数字化升级和价值创造。

DataWorks Copilot简介

DataWorks Copilot,DataWorks智能助手,它能够根据自然语言快速完成多种代码相关操作及DataWorks产品操作。使用DataWorks Copilot,可帮助您轻松、高效、便捷地完成数据ETL及数据分析工作,节省大量时间和精力。

DataWorks Notebook简介

DataWorks Notebook是智能化交互式数据开发和分析工具,能够面向多种数据引擎开展SQL或Python分析,即时运行或调试代码,获取可视化数据结果。同时,DataWorks Notebook能够与其他任务节点混合编排为工作流,提交至调度系统运行,助力复杂业务场景的灵活实现。

注意事项

  • 当前Data Studio产品公测地域:华东1(杭州)、华东2(上海)、华北2(北京)、华南1(深圳)。

  • 当前Data Studio对Python、Notebook的支持,需要先切换至个人开发环境。

环境准备

开通DataWorks

  1. 登录免费试用

    进入阿里云免费试用,单击页面右上方的登录/注册,根据页面提示完成账号的登录操作。

    • 账号登录:已有阿里云账号则直接登录。

    • 账号注册:若无阿里云账号,需先进行账号注册。

    • 实名认证:根据试用产品要求完成个人实名认证或企业实名认证。

  2. 成功登录后,在免费试用页面单击DataWorks Serverless 资源组免费试用套餐立即试用按钮。

  3. 在弹出的DataWorks产品购买面板选择开通地域为华东2(上海),勾选服务协议后单击确认订单并支付,按照界面指引开通DataWorks。

实验步骤

步骤一 数据目录管理

湖仓一体的数据目录管理能力,支持对DLF、MaxCompute、Hologres等进行数据目录管理及新建。

  1. Data Studio页面,单击页面左侧一级菜单image,进入数据目录功能界面。在数据目录功能界面左侧列表上找到您需要管理的元数据类型,将鼠标移至您想要了解的数据目录的名称上,单击数据目录名称右侧的image,单击级联菜单中的“打开”进入数据目录详情页功能界面。

    image

    image

  2. 数据目录详情页功能界面,单击任意表名称,可进入表详情页面。

    image

  3. Data Studio页面,单击页面左侧一级菜单image,进入数据目录功能界面。在数据目录功能界面左侧列表上找到您需要管理的元数据类型,将鼠标移至您想要了解的数据目录的名称上,单击数据目录名称右侧的image,单击级联菜单中的“新建表”进入建表功能界面。

    image

  4. 在建表功能界面,填入表名、字段名,或者直接填入建表DDL语句,单击页面顶部的发布即可。

    image

    预设代码示例

    CREATE TABLE dwd_mbr_user_info
    (
      id                BIGINT COMMENT '主键'
      ,gmt_create       DATETIME COMMENT '创建时间'
      ,gmt_modified     DATETIME COMMENT '修改时间'
      ,user_id          BIGINT COMMENT '会员数字ID'
      ,nick             STRING COMMENT '会员NICK。会员昵称'
      ,reg_fullname     STRING COMMENT '个人认证表示真实姓名,企业认证表示企业名称'
      ,reg_mobile_phone STRING COMMENT '注册时绑定手机号码'
      ,reg_email        STRING COMMENT '注册填写EMAIL(用户可以修改)'
      ,reg_gender       STRING COMMENT '注册填写性别(F女,M男,不是这两个就是未知的,说明性别保密)'
      ,reg_birthdate    DATETIME COMMENT '注册填写生日(用户可以修改)'
      ,reg_address      STRING COMMENT '注册填写地址(用户可以修改)'
      ,reg_nation_id    STRING COMMENT '注册填写国家ID(暂时为空)'
      ,reg_prov_id      STRING COMMENT '注册填写省ID'
      ,reg_city_id      STRING COMMENT '注册填写城市ID'
      ,user_regip       STRING COMMENT '注册IP'
      ,id_card_type     BIGINT COMMENT '会员认证证件类型 0:未知 1:身份证 2:企业营业执照号'
      ,id_card_number   STRING COMMENT '个人认证表示身份证号,企业认证表示企业的营业执照号,没有认证不保证准确性'
      ,user_regdate     DATETIME COMMENT '注册时间'
      ,user_active_type STRING COMMENT '用户激活方式,1 邮件;2 手机;'
      ,user_active_time DATETIME COMMENT '激活时间'
      ,vip_level        STRING COMMENT 'VIP等级'
      ,is_delete        STRING COMMENT '是否删除'
    )
    COMMENT '会员注册信息表'
    PARTITIONED BY 
    (
      ds                STRING COMMENT 'YYYYMMDD'
    )
    LIFECYCLE 7;

步骤二 工作流编排

工作流(Workflow)支持以业务视角通过可视化拖拽的方式编排多种不同类型的数据开发节点,调度时间等通用参数无需单独配置,可以帮助您轻松管理复杂的任务工程。

  1. Data Studio页面,单击页面左侧一级菜单image,进入数据开发功能界面。在数据开发功能界面左侧列表上找到项目目录,单击项目目录右侧的image,单击级联菜单中的“新建工作流”进入工作流编辑功能界面。

    image

  2. 在进入工作流编辑功能界面前,请先输入工作流名称,按Enter键,等待即可。

    预设工作流名称零售电子商务业务总览

    image

  3. 在进入工作流编辑功能界面后,单击画布中央的拖拽或点击添加节点,在弹框中输入节点名称,指定节点类型,单击确认

    • 预设节点名称零售电子商务总览

    • 预设节点类型虚拟节点

    image

  4. 从工作流编辑功能界面左侧的节点类型列表中找到自己需要的节点类型,并将其拖至画布中后,松开,在弹框中输入节点名称,单击确认。

    预设节点名称及类型

    节点类型

    节点名称

    数据集成-离线同步

    ods_mbr_user_info

    MaxCompute-MaxCompute SQL

    dim_ec_mbr_user_info

    MaxCompute-MaxCompute SQL

    dws_ec_mbr_cnt_nd

    Notebook

    ads_ec_kpi_report

    image

    image

  5. 从工作流编辑功能界面右侧的画布上,找到需要建立依赖关系的两个节点,鼠标hover到其中一个节点下边缘的中间位置,当出现 + 后,开始拖动鼠标,将箭头拖动至另外一个节点后松开。

    image

    image

  6. 依次创建所需节点,并完成依赖建立后,单击保存。

    image

  7. 保存成功后,可按需单击画布上方的布局方式,对画布进行自动布局。

    image

  8. 从工作流画布右侧找到并单击调度配置,在调度配置面板中,依次配置工作流的调度参数及节点依赖。单击调度参数中的添加参数,参数名输入框中输入bizdate,在参数值下拉列表中选择$[yyyymmdd-1]

    image

  9. 单击新增依赖,在依赖对象中输入ads_ec_ec360_gmv_kpi_overview后,回车,等待结果返回,在返回的依赖对象list中选中所需依赖的对象,单击保存即可。

    image

  10. 单击工作流画布上方的发布,页面右下方会出现发布操作界面,按照上线发布操作界面中步骤依次进行检查和确认即可。

    image

步骤三 多引擎协同开发

Data Studio支持数据集成、MaxCompute、Hologres、EMR、Flink、Python、Notebook、ADB等数十种不同引擎类型的节点的数仓开发,支持复杂的调度依赖,提供开发、生产环境隔离的研发模式。本实验以创建Flink SQL Stream节点为例。

  1. Data Studio页面,单击页面左侧一级菜单image,进入数据开发功能界面。在数据开发功能界面左侧列表上找到项目目录,单击项目目录右侧的image,单击级联菜单中的“Flink SQL Stream”进入节点编辑功能界面。在进入节点编辑功能界面前,请先输入节点名称,键盘敲击回车键,等待即可。

    预设节点名称ads_ec_page_visit_log

    image

  2. 在节点编辑功能界面,将预设Flink SQL Stream代码粘贴到代码编辑器中。

    image

    预设Flink SQL Stream代码

    CREATE TEMPORARY VIEW log_ri_base
    AS 
    SELECT 
      visit_time
      ,substring(visit_time,1,8) as stat_date
      ,substring(visit_time,9,2) as stat_hour
      ,visitor_id
      ,item_id
      ,cate_id
      ,ext_pv
    FROM vvp_ec_ads.dws_log_all_itm_ipv_ri
    WHERE
      bc_type IN ('b', 'z')
      AND coalesce(cate_id, '') <> ''
      AND visitor_type = 'uid'
      and coalesce(item_id, '') <> ''
      AND substring(visit_time,1,8) >= '${bizdate}'
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_day
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,cate_id
      ,item_id
    FROM log_ri_base
    GROUP BY stat_date
      ,cate_id
      ,item_id
    ;
    
    
    CREATE TEMPORARY VIEW itm_log_hh_00
    AS
    SELECT
      sum(ext_pv) as pv
      ,count(DISTINCT visitor_id) FILTER (WHERE ext_pv>0) as uv
      ,stat_date
      ,stat_hour
      ,item_id
      ,cate_id
    FROM log_ri_base
    GROUP BY stat_date
      ,stat_hour
      ,cate_id
      ,item_id
    ;
    
    BEGIN STATEMENT SET;
    
    INSERT INTO vvp_ec_ads.ads_ec_log
    SELECT
      a.stat_date
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1d
      ,a.uv as mbr_ipv_uv_1d
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_day a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_dt_rtcdm.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    --写入
    INSERT INTO vvp_ec_ads.ads_ec_log_hh
    
    SELECT
      a.stat_date
      ,a.stat_hour
      ,cast(a.item_id as varchar) as item_id
      ,a.cate_id
      ,b.cate_name
      ,cast(b.industry_id as varchar) as industry_id
      ,cast(b.xcat1_id as varchar) as xcat1_id
      ,cast(b.xcat2_id as varchar) as xcat2_id
      ,cast(b.xcat3_id as varchar) as xcat3_id
      ,cast(b.cate_level1_id as varchar) as cate_level1_id
      ,cast(b.cate_level2_id as varchar) as cate_level2_id
      ,cast(b.is_sw as varchar) as is_sw
      ,a.pv as mbr_ipv_1h
      ,a.uv as mbr_ipv_uv_1h
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as log_gmt_modify
      ,DATE_FORMAT(CURRENT_TIMESTAMP,'yyyy-MM-dd HH:mm:ss') as gmt_modify
    FROM itm_log_hh_00 a
    JOIN ec.dim_tm_cate_360_2pt_ri FOR SYSTEM_TIME AS OF PROCTIME() AS b
    ON vvp_ec_ads.DateAddOrSub(a.stat_date, -2) = b.stat_date 
        AND a.cate_id = b.cate_id
    ;
    
    END;
  3. 在节点编辑功能界面,单击代码编辑器右侧的实时配置,配置Flink资源信息、脚本参数及Flink运行参数,具体值见下图。

    image

    image

    预设Flink SQL Stream实时配置-专家模式代码

    {
      "nodes": [
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 1,
          "type": "StreamExecTableSourceScan",
          "desc": "Source: vvp_dt_rtcdm_dwd_tb_trd_ord_pay_nrt_ri[71980]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 2,
          "type": "StreamExecCalc",
          "desc": "Calc[71981]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 3,
          "type": "StreamExecLookupJoin",
          "desc": "LookupJoin[71982]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "0"
          },
          "id": 4,
          "type": "StreamExecCalc",
          "desc": "Calc[71983]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 6,
          "state": [
            {
              "userDefined": false,
              "name": "groupAggregateState",
              "index": 0,
              "ttl": "36 h"
            }
          ],
          "type": "StreamExecGroupAggregate",
          "desc": "GroupAggregate[71985]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 7,
          "type": "StreamExecCalc",
          "desc": "Calc[71986]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "1"
          },
          "id": 8,
          "type": "StreamExecSink",
          "desc": "ConstraintEnforcer[71987]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "2"
          },
          "id": 10,
          "state": [
            {
              "userDefined": false,
              "name": "sinkMaterializeState",
              "index": 0,
              "ttl": "36 h"
            }
          ],
          "type": "StreamExecSink",
          "desc": "SinkMaterializer[71987]"
        },
        {
          "profile": {
            "parallelism": 256,
            "maxParallelism": 32768,
            "minParallelism": 1,
            "group": "2"
          },
          "id": 11,
          "type": "StreamExecSink",
          "desc": "Sink: vvp_dt_ads_tb_dev_ads_tb_idec_seckill_cate_bc_trd_flow_htr_000[71987]"
        }
      ],
      "vertices": {
        "2d95a2974e3b3137fd533ecfd3490bc5": [
          10,
          11
        ],
        "717c7b8afebbfb7137f6f0f99beb2a94": [
          1,
          2,
          3,
          4
        ],
        "44b79c13fdb45883c7f21ee510155f4d": [
          6,
          7,
          8
        ]
      },
      "edges": [
        {
          "mode": "PIPELINED",
          "source": 1,
          "strategy": "FORWARD",
          "target": 2
        },
        {
          "mode": "PIPELINED",
          "source": 2,
          "strategy": "FORWARD",
          "target": 3
        },
        {
          "mode": "PIPELINED",
          "source": 3,
          "strategy": "FORWARD",
          "target": 4
        },
        {
          "mode": "PIPELINED",
          "source": 4,
          "strategy": "HASH",
          "target": 6
        },
        {
          "mode": "PIPELINED",
          "source": 6,
          "strategy": "FORWARD",
          "target": 7
        },
        {
          "mode": "PIPELINED",
          "source": 7,
          "strategy": "FORWARD",
          "target": 8
        },
        {
          "mode": "PIPELINED",
          "source": 8,
          "strategy": "HASH",
          "target": 10
        },
        {
          "mode": "PIPELINED",
          "source": 10,
          "strategy": "FORWARD",
          "target": 11
        }
      ],
      "ssgProfiles": [
        {
          "managed": {},
          "name": "0",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "992 mb",
          "extended": {}
        },
        {
          "managed": {
            "STATE_BACKEND": "512 mb"
          },
          "name": "1",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "480 mb",
          "extended": {}
        },
        {
          "managed": {
            "STATE_BACKEND": "512 mb"
          },
          "name": "2",
          "cpu": 0.25,
          "offHeap": "32 mb",
          "heap": "480 mb",
          "extended": {}
        }
      ]
    

    预设Flink SQL Stream实时配置-Flink运行参数-其他配置

    blob.fetch.backlog: 1000
    taskmanager.debug.memory.log-interval: 5000
  4. 完成实时配置后,单击代码编辑器上方的保存,单击代码编辑器上方的发布,页面右下方会出现发布操作界面,按照上线发布操作界面中步骤依次进行检查和确认即可。

    image

步骤四 进入个人开发环境

个人开发环境,支持自定义容器镜像,支持对接用户NAS,支持对接Git,支持Python编程与Notebook。

Data Studio页面,单击页面顶部image,在下拉菜单中选中您需要进入的个人开发环境,等待页面返回即可。

image

步骤五 Python编程与调试

DataWorks深度集成DSW,在进入个人开发环境后,Data Studio支持Python语言的编写、调试、运行及调度。

  1. Data Studio页面,且已进入个人开发环境,单击个人目录右侧的image,在左侧列表上会新增一个未命名的文件,输入预设文件名称,鼠标敲击回车键,等待文件生成即可。

    预设文件名称ec_item_rec.py

    image

  2. 在Python文件编辑页面的代码编辑器中,先输入预设的Python代码,再单击代码编辑器上方的运行Python文件,在页面下方的终端中查询运行结果。

    image

    image

    预设Python代码

    import pandas as pd
    from surprise import Dataset, Reader, SVD
    from surprise.model_selection import train_test_split
    from surprise import accuracy
    
    # 创建示例数据
    data_dict = {
        'user_id': [1, 1, 1, 2, 2, 2, 3, 3, 4],
        'item_id': [101, 102, 103, 101, 104, 105, 102, 105, 101],
        'rating': [5, 3, 4, 2, 4, 5, 4, 5, 3]
    }
    
    # 将数据转换为DataFrame
    df = pd.DataFrame(data_dict)
    
    # 使用Surprise库准备数据集
    reader = Reader(rating_scale=(1, 5))
    data = Dataset.load_from_df(df[['user_id', 'item_id', 'rating']], reader)
    
    # 分割数据集为训练集和测试集
    trainset, testset = train_test_split(data, test_size=0.2)
    
    # 使用SVD算法进行推荐
    model = SVD()
    model.fit(trainset)
    
    # 进行预测
    predictions = model.test(testset)
    
    # 计算RMSE
    rmse = accuracy.rmse(predictions)
    print(f'RMSE: {rmse:.2f}')
    
    # 获取某个用户的推荐商品
    def get_recommendations(user_id, model, all_items, n=3):
        item_ids = all_items['item_id'].unique()
        user_item_col = all_items[(all_items['user_id'] == user_id)]['item_id']
        unseen_items = [item for item in item_ids if item not in user_item_col.values]
    
        # 预测未见商品的评分
        predictions = []
        for item in unseen_items:
            pred = model.predict(user_id, item)
            predictions.append((item, pred.est))
    
        # 按照预测评分进行排序
        predictions.sort(key=lambda x: x[1], reverse=True)
        return predictions[:n]
    
    # 获取商品推荐
    all_items = df
    user_id = 1  # 需要获取推荐的用户ID
    recommendations = get_recommendations(user_id, model, all_items)
    
    print(f'推荐给用户 {user_id} 的商品:')
    for item_id, score in recommendations:
        print(f'商品ID: {item_id}, 预测评分: {score:.2f}')

    Python环境安装

    pip install pandas scikit-surprise
  3. 单击Python文件编辑页面的代码编辑器上方的调试Python文件,在代码编辑器中代码行号左侧可以单击生成断点,代码编辑器左侧面板上方单击image进行代码调试。

    image

步骤六 Notebook数据探索

新建Notebook

  1. 进入Data Studio > 数据开发

  2. 个人目录中,选择目标文件夹,单击按钮。

  3. 输入Notebook名称,单击回车键 页面空白位置,使Notebook名称生效。

  4. 在个人目录中单击Notebook名称,即可打开并进入Notebook编辑页面。

Notebook使用

说明

如下内容为独立操作步骤,不分先后次序,可以按需体验。

  • Notebook多引擎开发

    EMR Spark SQL

    1. 在DataWorks Notebook中单击image按钮,新建SQL Cell

    2. 在SQL Cell中,输入以下语句,完成dim_ec_mbr_user_info 表的查询。

      dim_ec_mbr_user_info

      -- 说明:基于「会员信息源表」和「地区源表」,查询某电商平台的会员基础信息。
      USE openlake_win.default;
      SELECT  user.user_id AS user_id
              ,user.nick AS nick
              ,user.gmt_create AS gmt_modified
              ,user.gmt_modified AS gmt_create
              ,user.reg_fullname AS reg_fullname
              ,user.reg_mobile_phone AS reg_mobile_phone
              ,user.reg_email AS reg_email
              ,user.reg_gender AS reg_gender
              ,user.reg_gender_name AS reg_gender_name
              ,user.reg_birthdate AS reg_birthdate
              ,user.reg_address AS reg_address
              ,user.reg_nation_id AS reg_nation_id
              ,user.reg_nation_name AS reg_nation_name
              ,user.reg_prov_id AS reg_prov_id
              ,area_prov.name AS reg_prov_name
              ,user.reg_city_id AS reg_city_id
              ,area_city.name AS reg_city_name
              ,user.user_regip AS user_regip
              ,user.id_card_type AS id_card_type
              ,user.id_card_type_name AS id_card_type_name
              ,user.id_card_number AS id_card_number
              ,null as id_gender
              ,null as id_bday
              ,(2024 - CAST(SUBSTR(user.id_card_number,7,4) AS INT)) AS id_age
              ,user.user_regdate AS user_regdate
              ,user.user_active_type AS user_active_type
              ,user.user_active_name AS user_active_name
              ,user.user_active_time AS user_active_time
              ,user.vip_level AS vip_level
              ,user.vip_level_name AS vip_level_name
              ,user.is_delete AS is_delete
      FROM    (
                  SELECT  id    -- 主键
                          ,gmt_create    -- 创建时间
                          ,gmt_modified    -- 修改时间
                          ,user_id    -- 会员数字ID
                          ,nick    -- 会员NICK。会员昵称
                          ,reg_fullname    -- 个人认证表示真实姓名,企业认证表示企业名称
                          ,reg_mobile_phone    -- 注册时绑定手机号码
                          ,reg_email    -- 注册填写EMAIL(用户可以修改)
                          ,reg_gender    -- 注册填写性别(F女,M男,不是这两个就是未知的,说明性别保密)
                          ,CASE    WHEN reg_gender='F' THEN '女'
                                   WHEN reg_gender='M' THEN '男' 
                                   ELSE '未知' 
                           END AS reg_gender_name    -- 注册填写性别(F女,M男,不是这两个就是未知的,说明性别保密)
                          ,reg_birthdate    -- 注册填写生日(用户可以修改)
                          ,reg_address    -- 注册填写地址(用户可以修改)
                          ,reg_nation_id    -- 注册填写国家ID(暂时为空)
                          ,CASE    WHEN reg_nation_id='cn' THEN '中国' 
                                   ELSE '海外' 
                           END AS reg_nation_name
                          ,reg_prov_id    -- 注册填写省ID
                          ,reg_city_id    -- 注册填写城市ID
                          ,user_regip    -- 注册IP
                          ,id_card_type    -- 会员认证证件类型 0:未知 1:身份证 2:企业营业执照号
                          ,CASE    WHEN id_card_type=0 THEN '未知'
                                   WHEN id_card_type=1 THEN '身份证'
                                   WHEN id_card_type=2 THEN '企业营业执照号' 
                                   ELSE '异常' 
                           END AS id_card_type_name
                          ,id_card_number    -- 个人认证表示身份证号,企业认证表示企业的营业执照号,没有认证不保证准确性
                          ,user_regdate    -- 注册时间
                          ,user_active_type    -- 用户激活方式
                          ,CASE    WHEN user_active_type='email' THEN '邮箱'
                                   WHEN user_active_type='mobile_phone' THEN '手机' 
                                   ELSE '异常' 
                           END AS user_active_name    -- 用户激活方式
                          ,user_active_time    -- 激活时间
                          ,cast(vip_level AS BIGINT) AS vip_level    -- VIP等级
                          ,CASE    WHEN vip_level>0 AND vip_level<=3 THEN '初级'
                                   WHEN vip_level>3 AND vip_level<=6 THEN '中级'
                                   WHEN vip_level>6 AND vip_level<=10 THEN '高级' 
                                   WHEN vip_level>10  THEN '特级' 
                           ELSE '异常'
                           END  AS vip_level_name
                          ,is_delete    -- 是否删除
                  FROM    ods_mbr_user_info
              ) AS USER
      LEFT JOIN (
                    SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                    FROM    ods_t_area
                ) AS area_prov
      ON      user.reg_prov_id = area_prov.id 
      LEFT JOIN    (
                  SELECT  id,pid,name,shortname,longitude,latitude,level,sort
                  FROM    ods_t_area
              ) AS area_city
      ON      user.reg_city_id = area_city.id
      ;
    3. 在SQL Cell右下角,选择SQL Cell类型为EMR Spark SQL,选择计算资源为openlake_serverless_spark。

      image

    4. 单击运行按钮,等待运行完成,查看数据结果。

    StarRocks SQL

    1. 在DataWorks Notebook中单击image按钮,新建SQL Cell,如下图:

    2. 在SQL Cell中,输入以下语句,完成dws_ec_trd_cate_commodity_gmv_kpi_fy 表的查询。

      dws_ec_trd_cate_commodity_gmv_kpi_fy

      -- 说明:基于「交易下单事实明细表」和「商品基础信息维度表」查询"财年_订单支付成功金额、财年_成交金额完成度"等数据指标
      USE `openlake_win`.`default`;
      select   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name, round(10*sum(t1.total_fee),4) as pay_ord_amt_fy, round((10*sum(t1.total_fee)/30000000),4) as kpi_gmv_rate_fy
      from    (
                  select  DATE_FORMAT(a.gmt_create,'yyyymmdd') as stat_date
                          ,a.sub_order_id, a.buyer_id, a.item_id, a.biz_type, a.pay_status, a.total_fee/100 as total_fee, b.cate_id, b.cate_name, b.commodity_id, b.commodity_name 
                  from    `openlake_win`.`default`.dwd_ec_trd_create_ord_di a
                  left outer join (
                                      select  distinct item_id, cate_id, cate_name, commodity_id, commodity_name, shop_id, shop_nick
                                      from    `openlake_win`.`default`.dim_ec_itm_item_info
                                  ) b
                  on      a.item_id = b.item_id
                  and     a.shop_id = b.shop_id
              ) t1
      where   t1.pay_status in ('2')
      and     t1.biz_type in ('2','3','4')
      group by   t1.cate_id, t1.cate_name, t1.commodity_id, t1.commodity_name
      ;
    3. 在SQL Cell右下角,选择SQL Cell类型为StarRocks SQL,选择计算资源为openlake_starrocks。

      image

    4. 单击运行按钮,等待运行完成,查看数据结果。

    Hologres SQL

    1. 在DataWorks Notebook中单击image按钮,新建SQL Cell

    2. 在SQL Cell中,输入以下语句,完成dws_ec_mbr_cnt_std 表的查询。

      dws_ec_mbr_cnt_std

      -- 说明:「会员基础信息维度表」的数据进行计算转换得到“存量会员数”等数据指标,获得历史截至当日_存量会员数_cube统计情况
      SELECT    IF(grouping(reg_prov_id) = 0, reg_prov_id, '-9999') as reg_prov_id
              , IF(grouping(reg_prov_name) = 0, reg_prov_name, '全部') as reg_prov_name
              , IF(grouping(reg_gender) = 0, reg_gender, '-9999') as reg_gender
              , IF(grouping(reg_gender_name) = 0, reg_gender_name, '全部') as reg_gender_name
              , IF(grouping(age_tag) = 0, age_tag, '-9999') as age_tag
              , IF(grouping(user_active_type) = 0, user_active_type, '-9999') as user_active_type
              , IF(grouping(user_active_name) = 0, user_active_name, '全部') as user_active_name
              , IF(grouping(vip_level) = 0, vip_level, '-9999') as vip_level
              , IF(grouping(vip_level_name) = 0, vip_level_name, '全部') as vip_level_name 
              , count(distinct user_id) as mbr_cnt
      from (
          select    reg_prov_id
                  , reg_prov_name
                  , reg_gender
                  , reg_gender_name
                  , case when cast(substr(reg_birthdate,1,4) as int)>=2010 and cast(substr(reg_birthdate,1,4) as int)<2020 then '10后' 
                          when cast(substr(reg_birthdate,1,4) as int)>=2000 and cast(substr(reg_birthdate,1,4) as int)<2010 then '00后' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1990 and cast(substr(reg_birthdate,1,4) as int)<2000 then '90后' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1980 and cast(substr(reg_birthdate,1,4) as int)<1990 then '80后' 
                          when cast(substr(reg_birthdate,1,4) as int)>=1970 and cast(substr(reg_birthdate,1,4) as int)<1980 then '70后' 
                          else '其他' 
                    end as age_tag
                  , user_active_type
                  , user_active_name
                  , vip_level
                  , vip_level_name 
                  , user_id
          from    openlake_win.default.dim_ec_mbr_user_info
      ) _main       
      group by 
      grouping sets(
          (reg_prov_id, reg_prov_name)
         ,(reg_gender, reg_gender_name)
         ,(age_tag)
         ,(user_active_type, user_active_name)
         ,(vip_level, vip_level_name)
         ,()
      );
    3. 在SQL Cell右下角,选择SQL Cell类型为Hologres SQL,选择计算资源为openlake_hologres。

      image

    4. 单击运行按钮,等待运行完成,查看数据结果。

    MaxCompute SQL

    1. 在DataWorks Notebook中单击image按钮,新建SQL Cell

    2. 在SQL Cell中,输入以下语句,完成dws_ec_mbr_cnt_std 表的查询。

      dws_ec_mbr_cnt_std

      -- 说明:查询轻度汇总层「历史截至当日_存量会员数_cube统计表」
      set odps.task.major.version=flighting;
      set odps.namespace.schema=true;
      set odps.sql.allow.namespace.schema=true;
      set odps.service.mode=only;
      set odps.sql.unstructured.data.split.size=1;
      
      SELECT * 
      FROM openlake_win.default.dws_ec_mbr_cnt_std 
      LIMIT 200;
    3. 在SQL Cell右下角,选择SQL Cell类型为MaxCompute SQL,选择计算资源为openlake_maxcompute。

      image

    4. 单击运行按钮,等待运行完成,查看数据结果。

  • Notebook交互式数据
    1. 在DataWorks Notebook中单击image按钮,新建Python Cell

    2. 在Python Cell右上角,单击image按钮,呼出DataWorks Copilot智能编程助手。

    3. 在DataWorks Copilot输入框中,输入以下需求,用于生成一个查询会员年龄的ipywidgets交互组件。

      说明

      需求描述:使用Python,生成一个会员年龄的滑动条组件,取值范围从1到100,默认值为20,实时监测组件取值的变化,并将值保存到全局变量query_age中。

    4. 查看DataWorks Copilot生成的Python代码,单击接受按钮。

      image

    5. 单击Python Cell的运行按钮,等待运行完成,查看交互组件的生成(运行Copilot生成的代码,或预设代码);同时,能够在交互组件中滑动选择目标年龄。

      image

    6. 在DataWorks Notebook中单击按钮,新建SQL Cell

    7. 在SQL Cell中,输入以下查询语句,包含Python中定义的会员年龄变量${query_age}。

      SELECT * FROM openlake_win.default.dim_ec_mbr_user_info
      WHERE CAST(id_age AS INT) >= ${query_age};
    8. 在SQL Cell右下角,选择SQL Cell类型为Hologres SQL,选择计算资源为openlake_hologres。

      image

    9. 单击运行按钮,等待运行完成,查看数据结果。

    10. 在运行结果中,单击image按钮,生成可视化图表。

  • Notebook模型开发与训练
    1. 在DataWorks Notebook中单击image按钮,新建SQL Cell

    2. 在SQL Cell中,输入以下语句,完成ods_trade_order表的查询。

      SELECT * FROM openlake_win.default.ods_trade_order;
    3. 将SQL查询结果写入DataFrame变量中,单击df位置,自定义DataFrame变量名称(例如:df_ml)。

      image

    4. 单击SQL Cell的运行按钮,等待运行完成,查看数据结果。

    5. 在DataWorks Notebook中单击image按钮,新建Python Cell

    6. 在Python Cell中,输入以下语句,使用Pandas完成数据清洗和处理,并存入DataFrame的新变量df_ml_clean中。

      import pandas as pd
      
      def clean_data(df_ml):
          # 生成新的一列:预估订单总额 = 商品单价 * 商品数量
          df_ml['predict_total_fee'] = df_ml['item_price'].astype(float).values * df_ml['buy_amount'].astype(float).values
          # 将列 'total_fee' 重命名为 'actual_total_fee'
          df_ml = df_ml.rename(columns={'total_fee': 'actual_total_fee'})
          return df_ml
      
      df_ml_clean = clean_data(df_ml.copy())
      df_ml_clean.head()
    7. 单击Python Cell的运行按钮,等待运行完成,查看数据清理结果。

    8. 在DataWorks Notebook中单击image按钮,再次新建Python Cell

    9. 在Python Cell中,输入以下语句,构建一个线性回归的机器学习模型,并进行训练和测试。

      import pandas as pd  
      from sklearn.model_selection import train_test_split  
      from sklearn.linear_model import LinearRegression  
      from sklearn.metrics import mean_squared_error  
        
      # 获取商品价格及总费用
      X = df_ml_clean[['predict_total_fee']].values  
      y = df_ml_clean['actual_total_fee'].astype(float).values  
      
      # 准备数据  
      X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, random_state=42)  
      
      # 创建并训练模型  
      model = LinearRegression()  
      model.fit(X_train, y_train)  
        
      # 预测和评估  
      y_pred = model.predict(X_test)  
      for index, (x_t, y_pre, y_t) in enumerate(zip(X_test, y_pred, y_test)):
          print("[{:>2}] input: {:<10} prediction:{:<10} gt: {:<10}".format(str(index+1), f"{x_t[0]:.3f}", f"{y_pre:.3f}", f"{y_t:.3f}"))
      
      # 计算均方误差MSE
      mse = mean_squared_error(y_test, y_pred)  
      print("均方误差(MSE):", mse)
    10. 单击运行按钮,等待运行完成,查看模型训练的测试结果。