本文为您介绍如何 Spark SQL 开发一个流式处理作业。

作业模板

说明 :建议在EMR-3.23.0(含)后续版本中使用这个模板。
-- dbName:需要将表建在哪个数据库下
CREATE DATABASE IF NOT EXISTS ${dbName};
USE ${dbName};

-- 创建SLS数据表
-- slsTableName:SLS表的名称
-- logProjectName:LogService的项目名
-- logStoreName:LogService的logstore名
-- accessKeyId:阿里云accessKeyId
-- accessKeySecret:阿里云accessKeySecret
-- endpoint:LogService的logstore所在endpoint,可以参考访问域名和数据中心
-- 当显式指定LogStore的各个字段时,必须定义为STRING类型
-- 保留6个系统字段:(`__logProject__` STRING, `__logStore__` STRING, `__shard__` INT, `__time__` TIMESTAMP, `__topic__` STRING, `__source__` STRING)
CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType])
USING loghub
OPTIONS (
sls.project = '${logProjectName}',
sls.store = '${logStoreName}',
access.key.id = '${accessKeyId}',
access.key.secret = '${accessKeySecret}',
endpoint = '${endpoint}');

-- 创建HDFS数据表,需要完成表的列字段定义
-- hdfsTableName:HDFS表的名称
-- location: 存储数据路劲,支持HDFS和OSS路径
-- 数据格式支持:csv, json, orc, parquet等,默认为parquet
CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
USING PARQUET
LOCATION '${location}';

-- 配置读表的方式,支持STREAM和BATCH,默认为BATCH。
CREATE SCAN tmp_read_sls_table 
ON ${slsTableName} 
USING STREAM;

-- 创建一个流式查询作业
CREATE STREAM ${queryName}
OPTIONS(
outputMode='Append',
triggerType='ProcessingTime',
triggerInterval='30000',
checkpointLocation='${checkpointLocation}')
INSERT INTO ${hdfsTableName}
SELECT col1, col2
FROM tmp_read_sls_table
WHERE ${condition};