/* 请使用Spark SQL的语法编写SQL,表的引用方式为 alias.table_name */
/*创建oss t_order表引用,添加dt作为分区字段*/
CREATE TABLE oss.t_order (
id bigint COMMENT '主键',
product_id bigint COMMENT '产品id',
gmt_create timestamp COMMENT '创建时间',
gmt_modified timestamp COMMENT '修改时间',
customer_id bigint COMMENT '客户id',
price decimal(38,8) COMMENT '价格',
status string COMMENT '订单状态',
province string COMMENT '交易省份',
dt string comment '业务日期分区'
) partitioned by (dt) COMMENT '订单表';
insert overwrite oss.t_order partition(dt='${bizdate}')
select id, product_id, gmt_create, gmt_modified, customer_id, price, status, province
from demo_id.t_order o
where o.gmt_create>= '${bizdate}' and o.gmt_create< '${today}';
/*创建oss t_product表引用*/
CREATE TABLE oss.t_product (
id bigint COMMENT '主键',
name string COMMENT '产品名',
type string COMMENT '产品类别',
gmt_create timestamp COMMENT '创建时间',
gmt_modified timestamp COMMENT '修改时间'
) COMMENT '产品表';
/*全量同步商品表数据*/
insert overwrite oss.t_product
select id, name, type, gmt_create, gmt_modified
from demo_id.t_product;
/*创建oss t_order_report_daily表引用,以dt字段作为分区字段*/
CREATE TABLE oss.t_order_report_daily(
dt string comment '业务日期',
product_type string comment '商品类别',
order_cnt bigint comment '订单数',
order_amt decimal(38, 8) comment '订单金额'
) partitioned by (dt) comment '订单统计日表';
/*按分区插入数据*/
insert overwrite oss.t_order_report_daily partition(dt='${bizdate}')
select
p.type as product_type,
count(*) order_cnt,
sum(price) order_amt
from oss.t_product p join oss.t_order o on o.product_id= p.id
where o.gmt_create>= '${bizdate}'
and o.gmt_create< '${today}'
group by product_type;
OSS支持4种文件存储格式:CSV、Parquet、ORC、JSON,默认使用CSV格式。您可以在CREATE TABLE语句中通过USING
指定。
例如,将表的存储格式指定为Parquet:
CREATE TABLE oss.t_order (
id bigint COMMENT '主键',
product_id bigint COMMENT '产品id',
gmt_create timestamp COMMENT '创建时间',
gmt_modified timestamp COMMENT '修改时间',
customer_id bigint COMMENT '客户id',
price decimal(38,8) COMMENT '价格',
status string COMMENT '订单状态',
province string COMMENT '交易省份',
dt string comment '业务日期分区'
) USING PARQUET partitioned by (dt) COMMENT '订单表';