本文介绍实现方案的详细配置操作,包括在RDS MySQL中创建源表、在表格存储中创建结果表、在实时计算Flink中创建作业并启动和在表格存储中分析数据。
步骤一:在RDS MySQL中创建源表
- 通过DMS登录RDS MySQL。具体操作,请参见通过DMS登录RDS MySQL。
- 创建数据表consume_record。consume_record数据表作为流计算任务源表,用于接入实时计算Flink。
- 编写创数据表的SQL语句。本方案中SQL示例如下:
CREATE TABLE `consume_record` ( `consume_id` varchar(20) NOT NULL, `product_id` varchar(20) NOT NULL, `consume_time` bigint(20) NOT NULL, `consume_name` varchar(20) NOT NULL, `consume_phone` varchar(20) NOT NULL, PRIMARY KEY (`consume_id`) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8 COMMENT='消费记录数据源表';
- 单击执行(F8)。
- 编写创数据表的SQL语句。
步骤二:在表格存储中创建维表和结果表
- 登录表格存储控制台。
- 创建数据表consume_product和product。具体操作,请参见创建数据表。说明 由于表格存储的数据表是schema free的,只需要定义主键,无需定义属性列。
consume_product数据表作为流计算任务结果表,用于保存商品消费信息与商品元数据信息。表结构请参见下表。
字段名称 数据类型 是否主键 描述 consume_id STRING 是 消费ID(主键)。 product_id STRING 是 商品ID。 price DOUBLE 否 商品单价。 consume_time BIGINT(20) 否 消费时间。 consume_name STRING 否 消费者名称。 consume_phone STRING 否 消费者联系方式。 product数据表流计算任务维表,用于保存商品元数据信息。表结构请参见下表。
字段名称 数据类型 是否主键 描述 product_id STRING 是 商品ID。 price DOUBLE 否 商品单价。 product_type STRING 否 商品类别。
步骤三:在实时计算Flink中创建作业并启动
- 登录实时计算管理控制台。
- 创建作业gmv_post_aggregation。
- 在Flink全托管页签,单击目标工作空间操作列下的控制台。
- 在左侧导航栏,单击作业开发。
- 单击新建。
- 在新建文件对话框,填写作业配置信息。
作业参数 示例 说明 文件名称 flink-test 作业的名称。 说明 作业名称在当前项目中必须保持唯一。文件类型 流作业/SQL 选择流作业中的SQL。 部署目标 vvp-workload 选择作业需要部署的集群名称。Flink全托管支持Per-Job集群和Session集群两种集群模式。两种集群模式的区别说明,请参见配置开发测试环境(Session集群)。 存储位置 作业开发 指定该作业的代码文件所属的文件夹。默认存放在作业开发目录。 您还可以在现有文件夹右侧,单击
图标,新建子文件夹。
- 单击确认。
- 配置作业并上线。
- 将以下作业代码示例拷贝到作业文本编辑区并根据实际修改配置信息。
-- mysql-cdc源表。 CREATE TEMPORARY TABLE consume_record ( `consume_id` VARCHAR(20), `product_id` VARCHAR(20), `consume_time` BIGINT, `consume_name` VARCHAR(20), `consume_phone` VARCHAR(20), PRIMARY KEY(consume_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '<hostname>', 'port' = '3306', 'username' = '', 'password' = '', 'database-name' = '', 'table-name' = '' ); -- tablestore维表。 CREATE TEMPORARY TABLE product ( product_id STRING, price DOUBLE, product_type STRING, PRIMARY KEY (product_id) NOT ENFORCED ) WITH ( 'connector' = 'ots', 'endPoint' = '', 'instanceName' = '', 'tableName' = '', 'accessId' = '', 'accessKey' = '' ); -- tablestore结果表。 CREATE TEMPORARY TABLE consume_product ( consume_id STRING, product_id STRING, price DOUBLE, consume_time BIGINT, consume_name STRING, consume_phone STRING, PRIMARY KEY (consume_id,product_id) NOT ENFORCED ) WITH ( 'connector' = 'ots', 'endPoint' = '', 'instanceName' = '', 'tableName' = '', 'accessId' = '', 'accessKey' = '', 'valueColumns' = 'price,consume_time,consume_name,consume_phone' ); insert into consume_product select s.consume_id,d.product_ID as product_id,d.price, UNIX_TIMESTAMP(s.consume_time,'yy-MM-dd') as consume_time, s.consume_name,s.consume_phone from `consume_record` as s join `product` for system_time as of proctime() as d on s.product_id = d.product_ID
配置项说明请参见下表。分类 配置项 示例 描述 mysql-cdc源表配置 connector mysql-cdc MySQL连接器,必须设置为mysql-cdc。 hostname myhost 主机名称。 port 3306 主机端口。 username myflinktest 数据库账号。 password ******** 数据库密码。 database-name flinktest 数据库名称。 table-name consume_record 数据表名称。 tablestore结果表 connector ots 表格存储连接器,必须设置为ots。 endPoint https://myinstance.cn-hangzhou.ots.aliyuncs.com 实例的服务地址。更多信息,请参见服务地址。 instanceName myinstance 实例名称。 tableName consume_product 数据表名称。 accessId ************ 阿里云账号或者RAM用户的AccessKey ID和AccessKey Secret。关于获取AccessKey ID和AccessKey Secret的具体操作,请参见获取AccessKey。 accessKey **************** valueColumns price,consume_time 保存到结果表中的列以列表形式表示,多个列之间用半角逗号(,)分隔。 只有当表格存储作为结果表时才需要配置此项。
- 单击验证,进行语法检查。
- 验证通过后,单击上线。
- 将以下作业代码示例拷贝到作业文本编辑区并根据实际修改配置信息。
- 启动作业。
- 在左侧导航栏,单击作业运维。
- 单击目标作业名称操作列中的启动。
- 在作业启动配置对话框,单击确认启动。
步骤四:在表格存储中分析数据
您可以通过多元索引或者SQL查询功能进行数据分析。
通过多元索引进行数据分析的具体步骤如下:
- 为数据表consume_product创建多元索引。具体操作,请参见创建多元索引。
- 通过多元索引的统计聚合功能实现数据分析。关于统计聚合功能的更多信息,请参见统计聚合、
public void agg(SyncClient client) { //构建查询语句。 SearchRequest searchRequest = SearchRequest.newBuilder() .tableName("consume_product") //设置数据表名称。 .indexName("consume_product_index") //设置多元索引名称。 .searchQuery(SearchQuery.newBuilder() .query(QueryBuilders.matchAll()) .addGroupBy(GroupByBuilders.groupByField("groupByProductID","product_id").addSubAggregation( AggregationBuilders.sum("sumagg","price") )) .build()) .build(); SearchResponse searchResponse = syncClient.search(searchRequest); for(GroupByFieldResultItem item : searchResponse.getGroupByResults().getAsGroupByFieldResult("groupByProductID").getGroupByFieldResultItems()){ System.out.println("商品ID:"+item.getKey()+",交易总额:"+item.getSubAggregationResults().getAsSumAggregationResult("sumagg").getValue()); } }
结果示例如下:商品ID:A001,交易总额:20.0 商品ID:A002,交易总额:40.0 商品ID:A004,交易总额:20.0 商品ID:A003,交易总额:5.0 商品ID:A005,交易总额:15.0 商品ID:A006,交易总额:5.0 商品ID:A008,交易总额:5.0
通过SQL查询进行数据分析的具体步骤如下:
- 为数据表consume_product创建映射关系。具体操作,请参见创建映射关系。创建映射关系的SQL语句示例如下:
CREATE TABLE consume_product ( consume_id VARCHAR(1024), product_id VARCHAR(1024), price DOUBLE, consume_time BIGINT, consume_name MEDIUMTEXT, consume_phone MEDIUMTEXT, PRIMARY KEY (consume_id,product_id) )
- 执行以下SQL语句进行数据分析。
select product_id, sum(price) from consume_product group by product_id
结果示例如下图所示。