本文将结合电商场景下的用户流失预测任务,介绍在AnalyticDB PostgreSQL 7.0版中如何使用pgml搭建离线和实时预测任务流。
前提条件
内核版本为v7.1.1.0及以上的AnalyticDB PostgreSQL 7.0版实例。
实例资源类型为存储弹性模式。
已经安装pgml插件。
说明pgml暂不支持白屏化安装,如有需要请提交工单联系工作人员协助安装。如有卸载插件需求,也请提交工单联系工作人员协助卸载。
背景介绍
pgml的设计理念是让模型更靠近数据。AI/ML功能将模型加载到PostgreSQL后端进程,以UDF的形式对其进行训练、微调、推理。训练、微调和推理后的模型存储在heap表中,无需单独设计高可用或高可靠方案,运维简单,方便用户使用。pgml得益于存计算一体化,减少数据传输损耗,能够高效完成模型的训练及服务部署。AI/ML训练和推理流程如下。
数据介绍和分析
数据集介绍
本文所使用的数据集为Ecommerce Customer Churn Analysis and Prediction,其中包含若干客户历史行为数据,且已标注客户是否在未来流失的标签。分析和预测客户流失可以帮助企业制定更有效的策略以提升客户留存率。数据集中包含了以下字段。
字段名称 | 描述 |
CustomerID | 客户唯一标识ID。 |
Churn | 客户流失标志。 |
Tenure | 客户使用时长。 |
PreferredLoginDevice | 客户的首选登录设备。 |
CityTier | 客户所在城市类别。 |
WarehouseToHome | 从仓库到客户家的距离。 |
PreferredPaymentMode | 客户的首选付款方式。 |
Gender | 客户性别。 |
HourSpendOnApp | 客户在移动应用或网站上花费的时间(单位:小时)。 |
NumberOfDeviceRegistered | 注册在客户名下的设备总数。 |
PreferedOrderCat | 上个月客户的首选订单类别。 |
SatisfactionScore | 客户对服务的满意度评分。 |
MaritalStatus | 客户婚姻状况。 |
NumberOfAddress | 特定客户添加的地址总数。 |
Complain | 客户上月是否投诉。 |
OrderAmountHikeFromlastYear | 与去年相比,客户订单金额的增长率。 |
CouponUsed | 客户上个月使用的优惠券总数。 |
OrderCount | 客户上个月的订单总数。 |
DaySinceLastOrder | 客户最后一次订单距离今天的天数。 |
CashbackAmount | 客户上个月的返现金额。 |
数据导入
创建数据表。
CREATE TABLE raw_data_table ( CustomerID INTEGER, Churn INTEGER, Tenure FLOAT, PreferredLoginDevice TEXT, CityTier INTEGER, WarehouseToHome FLOAT, PreferredPaymentMode TEXT, Gender TEXT, HourSpendOnApp FLOAT, NumberOfDeviceRegistered INTEGER, PreferedOrderCat TEXT, SatisfactionScore INTEGER, MaritalStatus TEXT, NumberOfAddress INTEGER, Complain INTEGER, OrderAmountHikeFromlastYear FLOAT, CouponUsed FLOAT, OrderCount FLOAT, DaySinceLastOrder FLOAT, CashbackAmount FLOAT );
下载数据集,并使用
COPY
命令导入CSV格式的数据源,请根据实际情况填写路径。COPY raw_data_table FROM '/path/to/dataset.csv' DELIMITER ',' CSV HEADER;
说明此处使用psql工具进行数据导入。若您使用其他SDK,可参照相应文档使用
COPY
或INSERT
方式进行导入。
数据分析
检查数据集空值分布。
DO $$
DECLARE
r RECORD;
SQL TEXT := '';
BEGIN
FOR r IN
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'raw_data_table'
LOOP
SQL := SQL ||
'SELECT ''' || r.column_name || ''' AS column_name, COUNT(*) FILTER (WHERE ' || r.column_name || ' IS NULL) AS null_count FROM raw_data_table UNION ALL ';
END LOOP;
SQL := LEFT(SQL, length(SQL) - 11);
FOR r IN EXECUTE SQL LOOP
RAISE NOTICE 'Column: %, Null Count: %', r.column_name, r.null_count;
END LOOP;
END $$;
示例结果如下。
NOTICE: Column: customerid, Null Count: 0
NOTICE: Column: churn, Null Count: 0
NOTICE: Column: tenure, Null Count: 264
NOTICE: Column: preferredlogindevice, Null Count: 0
NOTICE: Column: citytier, Null Count: 0
NOTICE: Column: warehousetohome, Null Count: 251
NOTICE: Column: preferredpaymentmode, Null Count: 0
NOTICE: Column: gender, Null Count: 0
NOTICE: Column: hourspendonapp, Null Count: 255
NOTICE: Column: numberofdeviceregistered, Null Count: 0
NOTICE: Column: preferedordercat, Null Count: 0
NOTICE: Column: satisfactionscore, Null Count: 0
NOTICE: Column: maritalstatus, Null Count: 0
NOTICE: Column: numberofaddress, Null Count: 0
NOTICE: Column: complain, Null Count: 0
NOTICE: Column: orderamounthikefromlastyear, Null Count: 265
NOTICE: Column: couponused, Null Count: 256
NOTICE: Column: ordercount, Null Count: 258
NOTICE: Column: daysincelastorder, Null Count: 307
NOTICE: Column: cashbackamount, Null Count: 0
对于含空值的字段,需检查其语义及数据分布,以便在后续训练时确认预处理填充策略和特征工程。检查详情如下。
创建分析函数以检查字段是否含空值。
CREATE OR REPLACE FUNCTION print_column_statistics(table_name TEXT, column_name TEXT) RETURNS VOID AS $$ DECLARE SQL TEXT; distinct_count INTEGER; min_value NUMERIC; max_value NUMERIC; avg_value NUMERIC; median_value NUMERIC; r RECORD; BEGIN SQL := 'SELECT COUNT(DISTINCT ' || column_name || ') AS distinct_count, MIN(' || column_name || ') AS min_value, MAX(' || column_name || ') AS max_value, AVG(' || column_name || ') AS avg_value, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY ' || column_name || ') AS median_value FROM ' || table_name; EXECUTE SQL INTO r; distinct_count := r.distinct_count; min_value := r.min_value; max_value := r.max_value; avg_value := r.avg_value; median_value := r.median_value; RAISE NOTICE 'Distinct Count: %', distinct_count; IF distinct_count < 40 THEN SQL := 'SELECT ' || column_name || ' AS col, COUNT(*) AS count FROM ' || table_name || ' GROUP BY ' || column_name || ' ORDER BY count DESC'; FOR r IN EXECUTE SQL LOOP RAISE NOTICE '%: %', r.col, r.count; END LOOP; END IF; RAISE NOTICE 'Min Value: %, Max Value: %, Avg Value: %, Median Value: %', min_value, max_value, avg_value, median_value; END; $$ LANGUAGE plpgsql;
使用该函数检查字段中是否含空值。
SELECT print_column_statistics('raw_data_table', 'tenure');
检查结果如下。
NOTICE: Distinct Count: 36 NOTICE: 1: 690 NOTICE: 0: 508 NOTICE: <NULL>: 264 NOTICE: 8: 263 NOTICE: 9: 247 NOTICE: 7: 221 NOTICE: 10: 213 NOTICE: 5: 204 ... NOTICE: Min Value: 0, Max Value: 61, Avg Value: 10.1898993663809, Median Value: 9
模型训练
数据预处理
从上述分析步骤可知,本次数据集中Tenure
、WareHousetohome
、HourSpendOnApp
、OrderAmountHikeFromLastYear
、CouponUsed
、OrderCount
及DaySinceLastOrder
字段存在空值。为了避免影响模型的性能,需对空值进行数据处理。以下是各字段中的数据分析及空值的处理方法。
Tenure
字段中,正常值为正偏分布,使用中位数填充空值。WareHousetohome
字段中,距离的分布存在极端值(例如,距离仓库比较远的客户),使用中位数填充使数据分布更集中。HourSpendOnApp
字段中,分布相对对称,可使用均值填充。OrderAmountHikeFromLastYear
字段中,总体分布平稳,可使用均值填充。CouponUsed
字段中,推测缺失部分为未使用过优惠券,使用零值填充。OrderCount
字段中,推测缺失部分为未下单,使用零值填充。DaySinceLastOrder
中,空值往往表示长时间未活跃,使用该字段中的最大值进行填充。
结合机器学习使用文档,可以得到预处理参数如下。
{
"tenure": {"impute": "median"},
"warehousetohome": {"impute": "median"},
"hourspendonapp": {"impute": "mean"},
"orderamounthikefromlastyear": {"impute": "mean"},
"couponused": {"impute": "zero"},
"ordercount": {"impute": "zero"},
"daysincelastorder": {"impute": "max"}
}
CityTier
和Complain
字段类型为INTEGER,但实际上为标签类型,数值并无大小之分。可以考虑将其转换为TEXT类型,并使用one-hot等方式进行编码。
创建训练视图
根据上述的数据处理结果,可创建对应的视图以避免对原始数据表的物理修改,同时方便后续添加更多的特征。
CREATE OR REPLACE VIEW train_data_view AS
SELECT
Churn::TEXT,
Tenure,
PreferredLoginDevice,
CityTier::TEXT,
WarehouseToHome,
PreferredPaymentMode,
Gender,
HourSpendOnApp,
NumberOfDeviceRegistered,
PreferedOrderCat,
SatisfactionScore,
MaritalStatus,
NumberOfAddress,
Complain::TEXT,
OrderAmountHikeFromlastYear,
CouponUsed,
OrderCount,
DaySinceLastOrder,
CashbackAmount
FROM
raw_data_table;
特征工程
特征工程是机器学习和数据挖掘中一项关键的步骤。该步骤可对原始数据进行处理和转换、提供额外的信息输入、实现模型的收敛、提升整体性能等。
特征名称 | 描述 |
AvgCashbkPerOrder | 订单平均返现金额。计算方式:CashbackAmount/OrderCount。 |
AvgHourSpendPerOrder | 订单平均浏览时间。计算方式:HourSpendOnApp/OrderCount。 |
CouponUsedPerOrder | 订单平均优惠券使用数。计算方式:CouponUsed/OrderCount。 |
LogCashbackAmount | 返现金额对数化处理。计算方式:log(1+LogCashbackAmount)。 |
根据上述新增特征,可重新创建视图。
CREATE OR REPLACE VIEW train_data_view AS
SELECT
Churn::TEXT,
Tenure,
PreferredLoginDevice,
CityTier::TEXT,
WarehouseToHome,
PreferredPaymentMode,
Gender,
HourSpendOnApp,
NumberOfDeviceRegistered,
PreferedOrderCat,
SatisfactionScore,
MaritalStatus,
NumberOfAddress,
Complain::TEXT,
OrderAmountHikeFromlastYear,
CouponUsed,
OrderCount,
DaySinceLastOrder,
CashbackAmount,
CashbackAmount/OrderCount AS AvgCashbkPerOrder,
HourSpendOnApp/OrderCount AS AvgHourSpendPerOrder,
CouponUsed/OrderCount AS CouponUsedPerOrder,
log(1+CashbackAmount) AS LogCashbackAmount
FROM
raw_data_table;
模型训练与选择
通过
pgml.train
接口,可以使用不同模型快速拟合数据,并验证其效果。以下列出XGBoost模型和bagging模型的拟合过程和输出结果,其余的模型由于篇幅限制在此省略。使用XGBoost模型拟合数据。
SELECT * FROM pgml.train( project_name => 'Customer Churn Prediction Project', --训练名称 task => 'classification', --任务类型 relation_name => 'train_data_view', -- 数据源 y_column_name => 'churn', -- 预测类别字段名 preprocess => '{ "tenure": {"impute": "median"}, "warehousetohome": {"impute": "median"}, "hourspendonapp": {"impute": "mean"}, "orderamounthikefromlastyear": {"impute": "mean"}, "couponused": {"impute": "zero"}, "ordercount": {"impute": "zero"}, "daysincelastorder": {"impute": "max"}, "avgcashbkperorder": {"impute": "zero"}, "avghourspendperorder": {"impute": "zero"}, "couponusedperorder": {"impute": "zero"}, "logcashbackamount": {"impute": "min"} }', -- 预处理方法 algorithm => 'xgboost', -- 模型类型 runtime => 'python', -- 指定python运行环境 test_size => 0.2 -- 测试集比例 );
输出拟合指标如下。
-- 输出拟合指标: -- {f1": 0.9543147, "precision": 0.96907216, "recall": 0.94, "accuracy": 0.9840142, ...}
使用bagging模型拟合数据。
-- bagging regression SELECT * FROM pgml.train( project_name => 'Customer Churn Prediction Project', --训练名称 task => 'classification', --任务类型 relation_name => 'train_data_view', -- 数据源 y_column_name => 'churn', -- 预测类别字段名 preprocess => '{ "tenure": {"impute": "median"}, "warehousetohome": {"impute": "median"}, "hourspendonapp": {"impute": "mean"}, "orderamounthikefromlastyear": {"impute": "mean"}, "couponused": {"impute": "zero"}, "ordercount": {"impute": "zero"}, "daysincelastorder": {"impute": "max"}, "avgcashbkperorder": {"impute": "zero"}, "avghourspendperorder": {"impute": "zero"}, "couponusedperorder": {"impute": "zero"}, "logcashbackamount": {"impute": "min"} }', -- 预处理方法 algorithm => 'bagging', -- 模型类型 runtime => 'python', -- 指定python运行环境 test_size => 0.2 -- 测试集比例 );
输出拟合指标如下。
-- 输出拟合指标: -- {"f1": 0.9270833, "precision": 0.96216214, "recall": 0.89447236}
通过替换
algorithm
参数,可验证不同模型在该数据集上的拟合能力,支持的模型列表请参见参数列表。通过分析测试集的F1指标可知,XGBoost模型表现优于其他模型,因此本次选择该模型作为后续操作的基准。使用网格参数搜索策略筛选出最合适的模型超参数,并使用5折交叉验证增强结论可信度,搜索策略的超参数有以下三个。
超参数名称
描述
n_estimators
要构建树的数量。增加树的数量通常可以提高模型性能,但也会增加计算成本。取值范围不定,例如设置范围从100到500五个取值,以寻找最佳的平衡点。
eta
学习率。决定每棵树对最终预测的贡献程度。较小的学习率会使训练过程更稳定,但可能需要更多的树(
n_estimators
)来收敛。取值范围不定,例如可设置0.05、0.1 和 0.2,以在训练稳定性和效率之间找到合适的平衡点。max_depth
每棵树的最大深度。较高的深度能够捕捉更多的特征交互,但也容易导致过度拟合。取值范围不定,例如设置为深度16和32。
SELECT * FROM pgml.train( project_name => 'Customer Churn Prediction Project', --训练名称 task => 'classification', --任务类型 relation_name => 'train_data_view', -- 数据源 y_column_name => 'churn', -- 预测类别字段名 preprocess => '{ "tenure": {"impute": "median"}, "warehousetohome": {"impute": "median"}, "hourspendonapp": {"impute": "mean"}, "orderamounthikefromlastyear": {"impute": "mean"}, "couponused": {"impute": "zero"}, "ordercount": {"impute": "zero"}, "daysincelastorder": {"impute": "max"}, "avgcashbkperorder": {"impute": "zero"}, "avghourspendperorder": {"impute": "zero"}, "couponusedperorder": {"impute": "zero"}, "logcashbackamount": {"impute": "min"} }', -- 预处理方法 algorithm => 'xgboost', -- 模型类型 search_args => '{ "cv": 5 }', -- 启用5折交叉验证 SEARCH => 'grid', -- 网格搜索策略 search_params => '{ "max_depth": [4, 6, 8, 16], "n_estimators": [100, 200, 300, 400, 500, 1000, 2000], "eta": [0.05, 0.1, 0.2] }', hyperparams => '{ "nthread": 16, "alpha": 0, "lambda": 1 }', runtime => 'python', -- 指定python运行环境 test_size => 0.2 -- 测试集比例 );
示例结果如下。
-- 搜索结果 -- ... (省略详细输出) INFO: Best Hyperparams: { "alpha": 0, "lambda": 1, "nthread": 16, "eta": 0.1, "max_depth": 6, "n_estimators": 1000 } INFO: Best f1 Metrics: Number(0.9874088168144226)
从结果可得出,当超参数取值为
{"eta": 0.2, "max_depth": 16, "n_estimators": 400}
时,模型拟合能力最强。由于验证时使用了K-fold交叉验证(即search_args => '{ "cv": 5 }
),因此模型所使用的训练数据为数据集中的80%。使用最佳超参数在全量数据上训练模型并验证结果。
SELECT * FROM pgml.train( project_name => 'Customer Churn Prediction Project', --训练名称 task => 'classification', --任务类型 relation_name => 'train_data_view', -- 数据源 y_column_name => 'churn', -- 预测类别字段名 preprocess => '{ "tenure": {"impute": "median"}, "warehousetohome": {"impute": "median"}, "hourspendonapp": {"impute": "mean"}, "orderamounthikefromlastyear": {"impute": "mean"}, "couponused": {"impute": "zero"}, "ordercount": {"impute": "zero"}, "daysincelastorder": {"impute": "max"}, "avgcashbkperorder": {"impute": "zero"}, "avghourspendperorder": {"impute": "zero"}, "couponusedperorder": {"impute": "zero"}, "logcashbackamount": {"impute": "min"} }', -- 预处理方法 algorithm => 'xgboost', -- 模型类型 hyperparams => '{ "max_depth": 6, "n_estimators": 1000, "eta": 0.1, "nthread": 16, "alpha": 0, "lambda": 1 }', runtime => 'python', -- 指定python运行环境 test_size => 0.2 -- 测试集比例 );
示例结果如下。
-- 输出结果 INFO: Training Model { id: 170, task: classification, algorithm: xgboost, runtime: python } INFO: Hyperparameter searches: 1, cross validation folds: 1 INFO: Hyperparams: { "eta": 0.1, "alpha": 0, "lambda": 1, "nthread": 16, "max_depth": 6, "n_estimators": 1000 } INFO: Metrics: {"roc_auc": 0.9751001, "log_loss": 0.19821791, "f1": 0.99258476, "precision": 0.9936373, "recall": 0.9915344, "accuracy": 0.9875666, "mcc": 0.95414394, "fit_time": 0.9980099, "score_time": 0.0085158} INFO: Comparing to deployed model f1: Some(0.9874088168144226) INFO: Deploying model id: 170 project | task | algorithm | deployed -----------------------------------+----------------+-----------+---------- Customer Churn Prediction Project | classification | xgboost | t
在测试集上模型性能可达到F1=0.99258476。
模型部署
选取部署模型
默认情况下,pgml会自动部署project内训练过程中F1值最高的模型(针对分类任务),可以使用pgml.deployments
表检查当前的部署情况。
SELECT d.id, d.project_id, d.model_id, p.name, p.task FROM pgml.deployments d
JOIN pgml.projects p on d.project_id = p.id;
示例结果如下。
id | project_id | model_id | name | task
----+------------+----------+-----------------------------------+----------------
61 | 2 | 170 | Customer Churn Prediction Project | classification
如果您需要指定部署其他模型,请参见部署。
模型使用
在线推理
在线推理适用于需要实时交互式响应的场景。例如数据分析师进行个案分析时,需要根据用户的历史行为立即返回预测结果。
-- 单条在线推理
SELECT pgml.predict('Customer Churn Prediction Project',
( 4, 'Mobile Phone'::TEXT, 3, 6,
'Debit Card'::TEXT, 'Female'::TEXT, 3, 3,
'Laptop & Accessory'::TEXT, 2,
'Single'::TEXT, 9 ,
'1'::TEXT, 11, 1, 1, 5, 159.93,
159.93, 3, 1, 2.206637011283536
));
示例结果如下。
-- 预测输出
predict
---------
0
(1 row)
离线推理
离线推理适合需要处理数据量较大,吞吐率优先于RT的场景,能够提高计算资源的使用率。
-- 创建视图
CREATE OR REPLACE VIEW predict_data_view AS
SELECT
CustomerID,
Churn::TEXT,
Tenure,
PreferredLoginDevice,
CityTier::TEXT,
WarehouseToHome,
PreferredPaymentMode,
Gender,
HourSpendOnApp,
NumberOfDeviceRegistered,
PreferedOrderCat,
SatisfactionScore,
MaritalStatus,
NumberOfAddress,
Complain::TEXT,
OrderAmountHikeFromlastYear,
CouponUsed,
OrderCount,
DaySinceLastOrder,
CashbackAmount,
CashbackAmount/OrderCount AS AvgCashbkPerOrder,
HourSpendOnApp/OrderCount AS AvgHourSpendPerOrder,
CouponUsed/OrderCount AS CouponUsedPerOrder,
log(1+CashbackAmount) AS LogCashbackAmount
FROM
raw_data_table;
-- ====================================
-- 批量在线推理
-- 创建批量推理视图
SELECT CustomerID, pgml.predict('Customer Churn Prediction Project', (
"tenure",
"preferredlogindevice",
"citytier",
"warehousetohome",
"preferredpaymentmode",
"gender",
"hourspendonapp",
"numberofdeviceregistered",
"preferedordercat",
"satisfactionscore",
"maritalstatus",
"numberofaddress",
"complain",
"orderamounthikefromlastyear",
"couponused",
"ordercount",
"daysincelastorder",
"cashbackamount",
"avgcashbkperorder",
"avghourspendperorder",
"couponusedperorder",
"logcashbackamount"
)) FROM predict_data_view limit 20;
示例结果如下。
-- 预测结果
customerid | predict
------------+---------
50005 | 0
50009 | 0
50012 | 0
50013 | 0
50019 | 0
50020 | 0
50022 | 0
50023 | 0
50026 | 0
50031 | 1
50039 | 1
50040 | 0
50043 | 1
50045 | 1
50047 | 0
50048 | 1
50050 | 1
50051 | 1
50052 | 1
50053 | 0
(20 rows)