EMR Streaming SQL节点支持使用SQL语句来开发流式分析作业。本文为您介绍如何创建EMR Streaming SQL节点并进行数据开发。

前提条件

  • 您已创建阿里云EMR集群,且集群所在的安全组中入方向的安全策略包含以下策略。
    • 授权策略:允许
    • 协议类型:自定义 TCP
    • 端口范围:8898/8898
    • 授权对象:100.104.0.0/16
  • 您在DataWorks工作空间的配置页面添加了E-MapReduce计算引擎实例后,数据开发页面才会显示EMR目录。详情请参见配置工作空间
  • 准备资源组。

    购买独享调度资源组,详情请参见新增和使用独享调度资源组

  • 您需要在EMR控制台将集群HDFS配置中的hadoop.http.authentication.simple.anonymous.allowed参数设置为true,并重启hdfs、yarn组件。mer参数配置

使用限制

  • EMR Spark Streaming节点仅支持使用独享调度资源组。
  • 如果您使用的独享调度资源组和EMR集群是6月10号之前创建的,则需要提交工单升级相关组件。

创建EMR Streaming SQL节点并进行数据开发

  1. 进入数据开发页面。
    1. 登录DataWorks控制台
    2. 在左侧导航栏,单击工作空间列表
    3. 选择工作空间所在地域后,单击相应工作空间后的进入数据开发
  2. 创建业务流程
    如果您已有业务流程,则可以忽略该步骤。
    1. 鼠标悬停至新建图标,选择业务流程
    2. 新建业务流程对话框,输入业务名称
    3. 单击新建
  3. 创建EMR Streaming SQL节点。
    1. 鼠标悬停至新建图标,单击EMR > EMR Streaming SQL
      您也可以找到相应的业务流程,右键单击目标业务流程,选择新建 > EMR > EMR Streaming SQL
    2. 新建节点对话框中,输入节点名称,并选择节点类型目标文件夹
      说明 节点名称必须是大小写字母、中文、数字、下划线(_)和小数点(.),且不能超过128个字符。
    3. 单击提交,进入EMR Streaming SQL节点编辑页面。
  4. 使用EMR Streaming SQL节点进行数据开发。
    1. 选择目标EMR引擎。
      在节点编辑页面的EMR引擎实例列表,选择需要使用的目标EMR引擎。
    2. 编写作业代码。
      在EMR Streaming SQL节点的编辑页面,输入需要执行的作业代码。作业模板示例如下。
      -- dbName:数据库名。
      CREATE DATABASE IF NOT EXISTS ${dbName};
      USE ${dbName};
      
      -- 创建Log Service数据表。
      -- slsTableName:Log Service表的名称。
      -- logProjectName:LogService的项目名。
      -- logStoreName:LogService的logstore名。
      -- accessKeyId:阿里云账号的AccessKey ID。
      -- accessKeySecret:阿里云账号的AccessKey Secret。
      -- endpoint:LogService的logstore所在的Endpoint。
      -- 当显式指定LogStore的各个字段时,必须定义为STRING类型。
      -- 保留6个系统字段:(`__logProject__` STRING, `__logStore__` STRING, `__shard__` INT, `__time__` TIMESTAMP, `__topic__` STRING, `__source__` STRING)
      CREATE TABLE IF NOT EXISTS ${slsTableName} (col1 dataType[, col2 dataType])。
      USING loghub
      OPTIONS (
      sls.project = '${logProjectName}',
      sls.store = '${logStoreName}',
      access.key.id = '${accessKeyId}',
      access.key.secret = '${accessKeySecret}',
      endpoint = '${endpoint}');
      
      -- 创建HDFS数据表,需要完成表的列字段定义。
      -- hdfsTableName:HDFS表的名称。
      -- location: 存储数据路径,支持HDFS和OSS路径。
      -- 数据格式支持:delta, csv, json, orc, parquet等,默认为delta。
      CREATE TABLE IF NOT EXISTS ${hdfsTableName} (col1 dataType[, col2 dataType])
      USING delta
      LOCATION '${location}';
      
      -- 配置读表的方式,支持STREAM和BATCH,默认为BATCH。
      CREATE SCAN tmp_read_sls_table 
      ON ${slsTableName} 
      USING STREAM;
      
      -- 创建一个流式查询作业。
      CREATE STREAM ${queryName}
      OPTIONS(
      outputMode='Append',
      triggerType='ProcessingTime',
      triggerInterval='30000',
      checkpointLocation='${checkpointLocation}')
      INSERT INTO ${hdfsTableName}
      SELECT col1, col2
      FROM tmp_read_sls_table
      WHERE ${condition};
      EMR Streaming SQL的更多信息,详情请参见EMR Streaming SQL
    3. 配置调度资源组。
      • 单击工具栏中的高级运行图标,在参数对话框中选择需要使用的调度资源组。同时,您还可以根据业务需求配置自定义参数。
      • 单击确定
    4. 保存并运行任务。
      在工具栏中,单击保存图标,保存节点任务,单击运行图标,运行节点任务。
  5. 编辑高级配置
    • "USE_GATEWAY":true ,表示任务会被提交到EMR gateway上执行,默认提交到header节点。
    • "SPARK_CONF": "--conf spark.driver.memory=2g --conf xxx=xxx" ,设置spark 任务运行参数,多个参数在该key中追加。
    • “queue”:提交作业的调度队列,默认为default队列。
    • “priority”:优先级,默认为1。
    • “FLOW_SKIP_SQL_ANALYZE”:SQL语句执行方式,参数值为false表示每次执行一条SQL语句;参数值为true表示每次执行多条SQL语句。
  6. 任务调度配置。
    如果您需要周期性执行创建的节点任务,可以单击节点编辑页面右侧的配置,根据业务需求配置该节点任务的调度信息:
  7. 提交并发布节点任务。
    1. 单击工具栏中的保存图标,保存节点。
    2. 单击工具栏中的提交图标,提交节点任务。
    3. 提交新版本对话框中,输入变更描述
    4. 单击确认
    如果您使用的是标准模式的工作空间,任务提交成功后,需要将任务发布至生产环境进行发布。请单击右上方的任务发布。具体操作请参见发布任务
  8. 查看实时计算任务。
    1. 单击编辑界面右上角的运维,进入运维中心。
    2. 查看运行的实时计算任务,详情请参见实时计算任务