加工数据

本文为您介绍如何将同步至StarRocks的用户信息表ods_user_info_d_starrocks及访问日志数据ods_raw_log_d_starrocks,通过DataWorks的StarRocks节点加工得到目标用户画像数据,阅读本文后,您可以了解如何通过DataWorks+StarRocks产品组合来计算和分析已同步的数据,完成数仓简单数据加工场景。

前提条件

开始本实验前,请先完成同步数据中的操作。

  • 已通过数据集成将存储于MySQL的用户基本信息ods_user_info_d同步至StarRocks的ods_user_info_d_starrocks表。

  • 已通过数据集成将存储于OSS的网站访问日志user_log.txt同步至StarRocks的ods_raw_log_d_starrocks表。

章节目标

本小节将对同步到StarRocks上的ods_user_info_d_starrocksods_raw_log_d_starrocks进行加工处理,并生成基本用户画像表。

  1. 将同步至StarRocks的日志表ods_raw_log_d_starrocks将日志信息字段拆分为多个字段,并生成新的明细日志表dwd_log_info_di_starrocks

  2. 利用明细日志表dwd_log_info_di_starrocks和用户表ods_user_info_d_starrocks的uid字段进行关联,生成汇总用户日志表dws_user_info_all_di_starrocks

  3. dws_user_info_all_di_starrocks表字段较多,直接应用于数据消费,表数据较多,所以将其进一步加工为ads_user_info_1d_starrocks表。

进入数据开发

登录DataWorks控制台,切换至目标地域后,单击左侧导航栏的数据开发与治理 > 数据开发,在下拉框中选择对应工作空间后单击进入数据开发

操作步骤

步骤一:设计工作流程

步骤二:创建函数

步骤三:配置StarRocks节点

步骤四:运行任务

步骤五:调度任务

步骤六:数据回溯

步骤一:设计工作流程

同步数据阶段,已经成功将数据同步至StarRocks,接下来的流程的目标是对数据进行进一步加工,以输出基本用户画像数据。

  • 各层级节点以及工作逻辑。

    在业务流程画布中单击新建节点,创建以下节点,以供加工数据使用。

    节点分类

    节点类型

    节点名称

    (以最终产出表命名)

    代码逻辑

    数据库

    imageStarRocks

    dwd_log_info_di_starrocks

    使用内置函数,自定义函数等完成原始日志ods_raw_log_d_starrocks数据拆分写入dwd_log_info_di_starrocks表多个字段。

    数据库

    imageStarRocks

    dws_user_info_all_di_starrocks

    用户基本信息初步加工后的日志数据进行汇总,合并为一张表。

    数据库

    imageStarRocks

    ads_user_info_1d_starrocks

    进一步加工产出基本用户画像。

  • 流程DAG图。

    将节点组件拖拽至业务流程画布,并通过拉线设置节点上下游依赖的方式,设计数据加工阶段的业务流程。

    image

步骤二:创建函数

本案例通过上传函数资源,将函数资源注册为数据源中的函数,实现在流程中使用该函数对数据进行处理,详情可参见,创建资源及函数

说明

在StarRocks内使用Java UDF功能注册函数前,需要在实例配置页面,设置FE配置项enable_udfTRUE,并重启实例使配置项生效。配置详情可参见:参数配置

若未配置,则会报如下错误:

FAILED: Getting analyzing error. Detail message: UDF is not enabled in FE, please configure enable_udf=true in fe/conf/fe.conf or .

上传函数

本案例已为您提供好自定义函数所用资源ip2region-starrocks.jar,请将其下载至本地后,登录对象存储OSS控制台,上传至您的OSS Bucket路径下。

说明

创建OSS Bucket 可参见创建存储空间,在创建好的存储空间里可上传该jar包,此操作会产生OSS存储费用。

本案例OSS环境存储路径如下:

  • Bucket名称:test(OSS Bucket名称不允许重名,请自行定义该Bucket名称)。

  • 资源存储路径:dataworks_starrocks/ip2region-starrocks.jar

  • 完整路径:https://test.oss-cn-shanghai-internal.aliyuncs.com/dataworks_starrocks/ip2region-starrocks.jar请参考如下格式获得完整资源存储路径。

    https://${Bucket名称}.oss-cn-shanghai-internal.aliyuncs.com/${资源存储路径}

    说明
    • 自定义函数所在的Bucket网络地址使用ECS的经典网络访问(内网)地址。

    • 使用内网地址时需要OSS Bucket地域和DataWorks工作空间所在地域相同,本案例均在华东2(上海)地域。

    image

注册函数

  1. 新建临时查询文件。

    数据开发页面的左侧导航栏,单击image,进入临时查询面板。右键单击临时查询,选择新建节点 > StarRocks

  2. 编辑并运行代码。

    CREATE FUNCTION getregion(string)
    RETURNS string
    PROPERTIES ( 
        "symbol" = "com.starrocks.udf.sample.Ip2Region", 
        "type" = "StarrocksJar",
        "file" = "此处请替换文件存储在oss的完整路径,文件路径获取请参见步骤上传资源"
    );

    确认函数是否注册成功。

    select getregion('您本机ip');

步骤三:配置StarRocks节点

配置dwd_log_info_di_starrocks节点

在业务流程面板,双击StarRocks数据库dwd_log_info_di_starrocks节点,进入dwd_log_info_di_starrocks节点的编辑页面,编写处理上游表ods_raw_log_d_starrocks字段的SQL代码,并将其写入dwd_log_info_di_starrocks表中。

1. 配置代码

双击dwd_log_info_di_starrocks节点,进入节点配置页面,编写如下语句。

CREATE TABLE IF NOT EXISTS dwd_log_info_di_starrocks (
    uid STRING COMMENT '用户ID',
    ip STRING COMMENT 'ip地址',
    TIME STRING COMMENT '时间yyyymmddhh:mi:ss',
    status STRING COMMENT '服务器返回状态码',
    bytes STRING COMMENT '返回给客户端的字节数',
    region STRING COMMENT '地域,根据ip得到',
    method STRING COMMENT 'http请求类型',
    url STRING COMMENT 'url',
    protocol STRING COMMENT 'http协议版本号',
    referer STRING COMMENT '来源url',
    device STRING COMMENT '终端类型 ',
    identity STRING COMMENT '访问类型 crawler feed user unknown',
    dt DATE NOT NULL COMMENT '时间'
) DUPLICATE KEY(uid) 
COMMENT '用户行为分析案例-网站访问日志明细表' 
PARTITION BY(dt) 
PROPERTIES ("replication_num" = "1");

-- 本示例按字段dt动态分区,为避免节点重跑数据重复写入,通过以下命令实现每次加工前删除已有目标分区。
ALTER TABLE dwd_log_info_di_starrocks DROP PARTITION IF EXISTS p${var} FORCE;

--场景:以下SQL使用函数getregion对原始日志数据中的ip进行解析,并通过正则等方式,将原始数据拆解为可分析字段写入并写入dwd_log_info_di_starrocks表。
--补充:
--     1. 在DataWorks节点中使用自定义函数前,您需要先注册函数。
--     2. DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
--        在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
INSERT INTO dwd_log_info_di_starrocks 
SELECT 
    uid
    , ip  
    , time
    , status
    , bytes 
    , getregion(ip) as region--使用自定义UDF通过IP得到地域
    ,REGEXP_EXTRACT(request, '([^ ]+)', 1) AS method
    ,REGEXP_EXTRACT(request, '^[^ ]+ (.*) [^ ]+$', 1) AS url
    ,REGEXP_EXTRACT(request, '([^ ]+)$', 1) AS protocol
    ,REGEXP_EXTRACT(referer, '^[^/]+://([^/]+)', 1) AS referer
  , CASE
    WHEN LOWER(agent) REGEXP 'android' THEN 'android'
    WHEN LOWER(agent) REGEXP 'iphone' THEN 'iphone'
    WHEN LOWER(agent) REGEXP 'ipad' THEN 'ipad'
    WHEN LOWER(agent) REGEXP 'macintosh' THEN 'macintosh'
    WHEN LOWER(agent) REGEXP 'windows phone' THEN 'windows_phone'
    WHEN LOWER(agent) REGEXP 'windows' THEN 'windows_pc'
    ELSE 'unknown'
END AS device
  , CASE
    WHEN LOWER(agent) REGEXP '(bot|spider|crawler|slurp)' THEN 'crawler'
    WHEN LOWER(agent) REGEXP 'feed' OR REGEXP_EXTRACT(request, '^[^ ]+ (.*) [^ ]+$', 0) REGEXP 'feed' THEN 'feed'
    WHEN NOT (LOWER(agent) REGEXP '(bot|spider|crawler|feed|slurp)') 
         AND agent REGEXP '^(Mozilla|Opera)' 
         AND NOT (REGEXP_EXTRACT(request, '^[^ ]+ (.*) [^ ]+$', 0) REGEXP 'feed') THEN 'user'
    ELSE 'unknown'
END AS identity,
 cast('${var}' as DATE )as dt
  FROM (
    SELECT
      SPLIT_PART(CAST(col AS VARCHAR(65533)), '##@@', 1)  AS ip
    , SPLIT_PART(CAST(col AS VARCHAR(65533)), '##@@', 2)  AS uid
    , SPLIT_PART(CAST(col AS VARCHAR(65533)), '##@@', 3)  AS time
    , SPLIT_PART(CAST(col AS VARCHAR(65533)), '##@@', 4)  AS request
    , SPLIT_PART(CAST(col AS VARCHAR(65533)), '##@@', 5)  AS status
    , SPLIT_PART(CAST(col AS VARCHAR(65533)), '##@@', 6)  AS bytes
    , SPLIT_PART(CAST(col AS VARCHAR(65533)), '##@@', 7)  AS referer
    , SPLIT_PART(CAST(col AS VARCHAR(65533)), '##@@', 8)  AS agent
FROM
    ods_raw_log_d_starrocks
WHERE
    dt = '${var}'
) a;

2. 配置调度

单击dwd_log_info_di_starrocks节点编辑页右侧调度配置。配置详情如下:

配置项

配置内容

图示

调度参数

调度参数项中单击新增参数,添加:

  • 参数名:var

  • 参数值:$[yyyymmdd-1]

image

调度依赖

调度依赖单击从代码解析输入输出,通过代码血缘快速为节点设置依赖关系。

详情请参见调度依赖配置指引

image

说明

实践属性的配置,配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程虚拟节点workshop_start_starrocks的定时调度时间控制,即每日00:30后才会调度。

3. 保存配置

本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

配置dws_user_info_all_di_starrocks节点

在业务流程面板中,双击StarRocks数据库dws_user_info_all_di_starrocks节点,进入dws_user_info_all_di_starrocks节点的编辑页面,编写合并上游表dwd_log_info_di_starrocksods_user_info_d_starrocks的SQL代码,写入dws_user_info_all_di_starrocks表中。

编辑代码

双击dws_user_info_all_di_starrocks节点,进入节点配置页面,编写如下语句。

CREATE TABLE IF NOT EXISTS dws_user_info_all_di_starrocks (
    uid STRING COMMENT '用户ID',
    gender STRING COMMENT '性别',
    age_range STRING COMMENT '年龄段',
    zodiac STRING COMMENT '星座',
    region STRING COMMENT '地域,根据ip得到',
    device STRING COMMENT '终端类型 ',
    identity STRING COMMENT '访问类型 crawler feed user unknown',
    method STRING COMMENT 'http请求类型',
    url STRING COMMENT 'url',
    referer STRING COMMENT '来源url',
    TIME STRING COMMENT '时间yyyymmddhh:mi:ss',
    dt DATE NOT NULL COMMENT '时间'
) DUPLICATE KEY(uid) 
COMMENT '用户行为分析案例-用户网站访问信息宽表' 
PARTITION BY(dt) 
PROPERTIES ("replication_num" = "1");

-- 本示例按字段dt动态分区,为避免节点重跑数据重复写入,通过以下命令实现每次加工前删除已有目标分区。
ALTER TABLE dws_user_info_all_di_starrocks DROP PARTITION IF EXISTS p${var} FORCE;


-- 场景:将加工后的日志数据dwd_log_info_di_starrocks 与用户基本信息数据ods_user_info_d_starrocks汇总写入dws_user_info_all_di_starrocks表。
-- 补充:DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
--      在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
INSERT INTO dws_user_info_all_di_starrocks 
SELECT 
    IFNULL(a.uid, b.uid) AS uid,
    b.gender,
    b.age_range,
    b.zodiac,
    a.region,
    a.device,
    a.identity,
    a.method,
    a.url,
    a.referer,
    a.time,
    a.dt
FROM dwd_log_info_di_starrocks a
LEFT JOIN ods_user_info_d_starrocks b
ON a.uid = b.uid
WHERE a.dt = '${var}';

配置调度

单击dws_user_info_all_di_starrocks节点编辑页面右侧的调度配置。配置详情如下:

配置项

配置内容

图示

调度参数

调度参数项中单击新增参数,添加:

  • 参数名:var

  • 参数值:$[yyyymmdd-1]

image

调度依赖

调度依赖单击从代码解析输入输出,通过代码血缘快速为节点设置依赖关系。

详情请参见调度依赖配置指引

image

说明

实践属性的配置,配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程虚拟节点workshop_start_starrocks的定时调度时间控制,即每日00:30后才会调度。

3. 保存配置

本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

配置ads_user_info_1d_starrocks节点

在业务流程面板中,双击StarRocks数据库中的ads_user_info_1d_starrocks节点,以进入该节点的编辑页面。在此编写SQL代码,对上游的dws_user_info_all_di_starrocks表进行加工处理,并将结果写入ads_user_info_1d_starrocks表中。

1. 编辑代码

CREATE TABLE IF NOT EXISTS ads_user_info_1d_starrocks (
uid STRING COMMENT '用户ID',
region STRING COMMENT '地域,根据ip得到',
device STRING COMMENT '终端类型 ',
pv BIGINT COMMENT 'pv',
gender STRING COMMENT '性别',
age_range STRING COMMENT '年龄段',
zodiac STRING COMMENT '星座',
dt DATE NOT NULL COMMENT '时间'
) DUPLICATE KEY(uid) 
COMMENT '用户行为分析案例-用户画像数据' 
PARTITION BY(dt) 
PROPERTIES ("replication_num" = "1");

-- 本示例按字段dt动态分区,为避免节点重跑数据重复写入,通过以下命令实现每次加工前删除已有目标分区。
ALTER TABLE ads_user_info_1d_starrocks DROP PARTITION IF EXISTS p${var} FORCE;

-- 场景:以下SQL用于对用户访问信息宽表dws_user_info_d_all_di_starrocks 进一步加工产出基本的用户画像数据写入ads_user_info_1d_starrocks 表。
-- 补充:DataWorks提供调度参数,可实现调度场景下,将每日增量数据写入目标表对应业务分区。
-- 在实际开发场景下,您可通过${变量名}格式定义代码变量,并在调度配置页面通过为变量赋值调度参数的方式,实现调度场景下代码动态入参。
INSERT INTO ads_user_info_1d_starrocks 
SELECT 
uid,
MAX(region) AS region,
MAX(device) AS device,
COUNT(*) AS pv,
MAX(gender) AS gender,
MAX(age_range) AS age_range,
MAX(zodiac) AS zodiac,
dt
FROM dws_user_info_all_di_starrocks
WHERE dt = '${var}'
GROUP BY uid, dt;


select * FROM dws_user_info_all_di_starrocks
WHERE dt = '${var}';

2. 配置调度

单击ads_user_info_1d_starrocks节点编辑页右侧的调度配置。配置详情如下:

配置项

配置内容

图示

调度参数

调度参数项中单击新增参数,添加:

  • 参数名:var

  • 参数值:$[yyyymmdd-1]

image

调度依赖

调度依赖单击从代码解析输入输出,通过代码血缘快速为节点设置依赖关系。

详情请参见调度依赖配置指引

image

说明

实践属性的配置,配置调度周期为日,无需单独配置当前节点定时调度时间,当前节点每日调起时间由业务流程虚拟节点workshop_start_starrocks的定时调度时间控制,即每日00:30后才会调度。

3. 保存配置

本案例其他必填配置项,您可按需自行配置,配置完成后,在节点代码编辑页面,单击工具栏中的image.png按钮,保存当前配置。

步骤四:运行任务

运行业务流程

  1. 进入业务流程面板。

    双击业务流程下的用户画像分析_StarRocks版,进入业务流程画布。image

  2. 运行业务流程。

    在业务流程画布,单击工具栏中的image图标,将按照上下游依赖关系运行数据继承阶段的业务流程。

  3. 查看任务运行状态。

    节点处于image状态,即代表同步执行过程无问题。

  4. 查看任务执行日志。

    右键画布中的dwd_log_info_di_starrocks节点、dws_user_info_all_di_starrocks节点、ads_user_info_1d_starrocks节点,选择查看日志,即可查看详细的同步过程。image

查看流程结果

  1. 新建临时查询文件,详情可参见创建临时查询

    在数据开发页面的左侧导航栏,单击image,进入临时查询面板。右键单击临时查询,选择新建节点 > StarRocks

  2. 查询加工结果表。

    执行如下SQL语句,确认用户基本信息数据,以及用户网站访问日志数据是否已经从测试库同步至您的StarRocks。

    --查询语句中的分区列需要更新为业务日期。例如,任务运行的日期为20240102,则业务日期为20240101,即任务运行日期的前一天。
    SELECT * from ads_user_info_1d_starrocks  where dt=业务日期; 

步骤五:调度任务

在完成前几步的数据处理并确定任务可正常运行后,需要将任务提交并发布到生产环境,详情请参见发布中心

提交至开发环境

在业务流程面板工具栏中,单击image按钮,提交整个业务流程中的任务,并单击确认

发布至生产环境

提交后任务将进入开发环境。由于开发环境的任务不会自动调度,您需要发布任务至生产环境。

在业务流程页面,单击工具栏中的image图标,进入发布页面,或单击菜单数据开发与运维 > 任务发布进入创建发布包页面,在创建发布包页面,有以下两种发布方式可选。

  • 单个条目发布:单击单个任务条目右侧的发布按钮进行发布。image

  • 批量打包发布:

    • 选择多个条目,并单击下方发布选中项进行发布。

    • 选择多个条目,单击添加到待发布,再单击右上角的待发布列表,批量将待发布任务发布至生产环境。image

说明

批量发布目标任务,您需要同步发布该业务流程涉及的资源与函数。

步骤六:数据回溯

在实际开发场景下,您可以通过生产环境执行补数据并查看补数据实例(新版)操作实现历史数据回刷,具体操作如下。

  1. 进入运维中心。

    任务发布成功后,单击右上角的运维中心,或单击菜单全部产品 > 数据开发与运维 > 运维中心(工作流)进入运维页面。

  2. 针对周期任务执行补数据操作。

    1. 在左侧导航栏,单击周期任务运维 > 周期任务,进入周期任务页面,单击进入起始根虚拟workshop_start_starrocks,进入DAG图内。image

    2. 右键单击workshop_start_starrocks节点,选择补数据 > 当前节点及下游节点image

    3. 选中workshop_start_starrocks节点的所有下游节点,输入业务日期,单击确定,自动跳转至补数据实例页面。image

后续步骤

在完成任务发布后,整个业务流程已经完成,即可查看已创建的表详情或消费对应数据表。详情可参见管理数据API数据服务