本文介绍实现方案的详细配置操作,包括创建数据表、实时流计算、离线批计算和DataV展示。
步骤一:创建数据表
使用远程登录工具登录EMR Header服务器。
执行以下命令,启动SQL客户端。
SQL客户端用于批流的SQL计算,其中emr-datasources_shaded_*.jar为准备工作中下载的EMR最新版SDK包。
streaming-sql --driver-class-path emr-datasources_shaded_*.jar --jars emr-datasources_shaded_*.jar --master yarn-client --num-executors 8 --executor-memory 2g --executor-cores 2
创建原始订单数据表(OrderSource表)的外表order_source。
外表order_source用于流批处理的SQL执行。
DROP TABLE IF EXISTS order_source; CREATE TABLE order_source USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderSource", tunnel.id="2b7bbf3d-d6c4-4cea-89fe-71998bccaf19", catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "OrderId": {"col": "OrderId", "type": "string"}, "price": {"cols": "price", "type": "double"}, "timestamp": {"cols": "timestamp", "type": "long"}}}' );
参数
说明
endpoint
表格存储实例的访问地址。
access.key.id
阿里云账号的AccessKey ID。
access.key.secret
阿里云账号的AccessKey Secret。
instance.name
表格存储的实例名。
table.name
表格存储的表名。
tunnel.id
表格存储的增量通道ID,用于实时的增量SQL,批量SQL时非必须。
catalog
表的字段Schema定义,示例代码中对应的四个列分别为UserId(主键)、OrderId(主键)、price、timestamp,数据类型分别为string、string、double、long。
步骤二:实时流计算
实时流计算将实时统计一个窗口周期时间内的订单数和订单金额统计,并将聚合结果写回表格存储的数据表中。
创建流计算的Sink外表order_stream_sink(对应表格存储中的OrderStreamSink表)。
执行实时流计算SQL命令进行实时聚合,实时得到聚合结果,并将聚合结果实时写回表格存储的OrderStreamSink表中。
Sink表的各参数含义和Source表一致,其中catalog字段的内容有所不同,Sink表的catalog中四个字段分别为有begin(开始时间,主键列,格式为2019-11-27 14:54:00)、end(结束时间,主键列)、count(订单数)、totalPrice(订单总金额),数据类型分别为string、string、long、double。
-- 创建Sink表order_stream_sink对应表格存储的OrderStreamSink表(主键为begin和end两列)。 DROP TABLE IF EXISTS order_stream_sink; CREATE TABLE order_stream_sink USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderStreamSink", catalog='{"columns": {"begin": {"col": "begin", "type": "string"}, "end": {"col": "end", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}' ); -- 在order_source表上创建视图order_source_stream_view。 CREATE SCAN order_source_stream_view ON order_source USING STREAM OPTIONS ("maxoffsetsperchannel"="10000"); -- 在视图order_source_stream_view上运行STREAM SQL作业,以下样例会按30s粒度进行订单数和订单金额的聚合。 -- 聚合结果将写回表格存储的OrderStreamSink表。 CREATE STREAM job1 options( checkpointLocation='/tmp/spark/cp/job1', outputMode='update' ) INSERT INTO order_stream_sink SELECT CAST(window.start AS String) AS begin, CAST(window.end AS String) AS end, count(*) AS count, CAST(sum(price) AS Double) AS totalPrice FROM order_source_stream_view GROUP BY window(to_timestamp(timestamp / 1000), "30 seconds");
聚合结果样例如下图所示。
步骤三:离线批计算
离线批计算将进行原始订单数据的总金额和用户维度总金额的离线聚合。
创建两张Sink表(OrderTotalSink表和OrderBatchSink表)分别存放历史总金额和用户维度总金额的聚合数据。
-- 批计算任务。 -- 用户维度结果表OrderBatchSink(主键UserId,属性列count、totalPrice)。 -- 总数据维度结果表OrderTotalSink(主键Count,属性列totalPrice)。 DROP TABLE IF EXISTS order_batch_sink; CREATE TABLE order_batch_sink USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderBatchSink", tunnel.id="", catalog='{"columns": {"UserId": {"col": "UserId", "type": "string"}, "count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}' ); DROP TABLE IF EXISTS order_total_sink; CREATE TABLE order_total_sink USING tablestore OPTIONS( endpoint="http://vehicle-test.cn-hangzhou.vpc.tablestore.aliyuncs.com", access.key.id="", access.key.secret="", instance.name="vehicle-test", table.name="OrderTotalSink", tunnel.id="", catalog='{"columns": {"count": {"col": "count", "type": "long"}, "totalPrice": {"col": "totalPrice", "type": "double"}}}' );
在Source表(order_source表)上执行批计算SQL命令,得到聚合结果,并将聚合结果分别写回表格存储的Sink表(OrderTotalSink表和OrderBatchSink表)中。
执行批计算SQL语句更新用户维度的聚合结果。
-- SQL命令。 INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId; -- 实际运行。 spark-sql> INSERT INTO order_batch_sink SELECT UserId, count(*) AS count, sum(price) AS totalPrice FROM order_source GROUP BY UserId; Time taken: 5.107 seconds
用户维度的聚合结果样例如下图所示。
执行批计算SQL命令更新总数据维度的结果。
-- SQL命令。 INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source; -- 实际运行。 spark-sql> INSERT INTO order_total_sink SELECT count(*) AS count, sum(price) AS totalPrice FROM order_source; Time taken: 4.272 seconds
总数据维度的聚合结果样例如下图所示。
步骤四:DataV展示
通过在DataV中添加Tablestore数据源,实现在DataV的大屏上展示实时流处理或离线批处理聚合结果。添加Tablestore数据源的详细操作请参见添加Tablestore数据源。