本文通过案例为您介绍如何使用实时计算Flink版完成时态势感知和订单地理分布。
背景信息
实时态势感知和订单地理分布有助于企业及时的优化产品品类的分配和发布。以下是某食品电商企业通过实时计算Flink版完成产品的实时态势感知和订单地理分布的案例。案例

说明 为了聚焦核心逻辑,订单数据格式做了大量精简,只保留了与案例有关的属性。
- 创建数据存储
在电商系统里,订单与订单地址是分开存储(下单人可以给多个下单地址),所以在订单创建时并没有收货地址,只有在订单提交时才真正的知道收货地址。订单地址里保存的是城市的id(city_id),为了获取地理信息,还需要一张城市表(存储城市地理信息)。目标是按日统计不同地域订单(总销售额)的分布情况。
- 订单
CREATE TABLE source_order ( id VARCHAR,-- 订单ID。 seller_id VARCHAR, --卖家ID。 account_id VARCHAR,--买家ID。 receive_address_id VARCHAR,--收货地址ID。 total_price VARCHAR,--订单金额。 pay_time VARCHAR --订单支付时间。 ) WITH ( type='datahub', endPoint='http://dh-cn-hangzhou.aliyun-inc.com', project='yourProjectName',--您的project。 topic='yourTopicName',--您的topic。 roleArn='yourRoleArn',--您的roleArn。 batchReadSize='500' );
- 订单地址
CREATE TABLE source_order_receive_address ( id VARCHAR,--收货地址ID。 full_name VARCHAR,--收货人全名。 mobile_number VARCHAR,--收货人手机号。 detail_address VARCHAR,--收货详细地址。 province VARCHAR,--收货省份。 city_id VARCHAR,--收货城市。 create_time VARCHAR --创建时间。 ) WITH ( type='datahub', endPoint='http://dh-cn-hangzhou.aliyun-inc.com', project='yourProjectName',--您的project。 topic='yourTopicName',--您的topic。 roleArn='yourRoleArn',--您的roleArn。 batchReadSize='500' );
- 城市表
CREATE TABLE dim_city ( city_id varchar, city_name varchar,--城市名。 province_id varchar,--所属省份ID。 zip_code varchar,--邮编。 lng varchar,--经度。 lat varchar,--纬度。 PRIMARY KEY (city_id), PERIOD FOR SYSTEM_TIME --定义为维表。 ) WITH ( type= 'rds', url = 'yourDatabaseURL',--您的数据库URL。 tableName = 'yourTableName',--您的表名。 userName = 'yourDatabaseName',--您的用户名。 password = 'yourDatabasePassword'--您的密码。 );
- 按日统计不同地域订单(总销售额)的分布。
CREATE TABLE result_order_city_distribution ( summary_date varchar,--统计日期。 city_id bigint,--城市ID。 city_name varchar,--城市名。 province_id bigint,--所属省份ID。 gmv double,--总销售额。 lng varchar,--经度。 lat varchar,--纬度。 primary key (summary_date,city_id) ) WITH ( type= 'rds', url = 'yourDatabaseURL',--您的数据库URL。 tableName = 'yourTableName',--您的表名。 userName = 'yourDatabaseName',--您的用户名。 password = 'yourDatabasePassword'--您的密码。 );
- 订单
- 编辑业务逻辑
INSERT INTO result_order_city_distribution SELECT d.summary_date, cast(d.city_id as BIGINT), e.city_name , cast(e.province_id as BIGINT), d.gmv, e.lng, e.lat from ( SELECT DATE_FORMAT(a.pay_time,'yyyyMMdd') as summary_date , b.city_id as city_id , round(sum(cast(a.total_price as double)),2) as gmv from source_order as a join source_order_receive_address as b on a.receive_address_id =b.id group by DATE_FORMAT(a.pay_time,'yyyyMMdd'),b.city_id --双流JOIN,并根据日期和城市ID得到销售额分。 )d join dim_city FOR SYSTEM_TIME AS OF PROCTIME() as e on d.city_id = e.city_id -- JOIN维表,补齐城市地理信息,得到最终结果。 ;
DEMO示例以及源代码
根据上文介绍的实时态势感知和订单地理分布解决方案,阿里云为您创建了一个包含完整链路的DEMO示例,您可以参考DEMO示例,注册上下游数据,制定自己的解决方案。单击DEMO代码进行下载完整示例。使用示例时,上下游存储请注意以下两点:
- DataHub作为源表
- RDS作为结果表
在文档使用中是否遇到以下问题
更多建议
匿名提交