EMR Serverless Spark与其他VPC间网络互通

通过网络连接功能,您可以访问自有VPC(Virtual Private Cloud)内的数据源。本文将以SparkSQL和Application JAR类型任务连接至您的自有VPC的HMS(Hive Metastore)服务为例,为您介绍如何配置并访问自有VPC内的数据源。

前提条件

已准备好数据源。本文以在EMR on ECS页面创建包含Hive服务,元数据内置MySQL的DataLake集群为例,详情请参见创建集群

使用限制

  • 当前仅支持创建一个网络连接。

  • 当前仅支持使用下列可用区的交换机。

    • 中国地区

      地域名称

      地域ID

      可用区名称

      华东1(杭州)

      cn-hangzhou

      • 杭州 可用区H

      • 杭州 可用区I

      • 杭州 可用区J

      华东2(上海)

      cn-shanghai

      • 上海 可用区F

      • 上海 可用区G

      华北2(北京)

      cn-beijing

      • 北京 可用区F

      • 北京 可用区G

      • 北京 可用区H

      • 北京 可用区K

      华南1(深圳)

      cn-shenzhen

      深圳 可用区E

    • 其他国家和地区

      地域名称

      地域ID

      可用区名称

      新加坡

      ap-southeast-1

      • 新加坡 可用区B

      • 新加坡 可用区C

      美国(弗吉尼亚)

      us-east-1

      • 弗吉尼亚 可用区A

      • 弗吉尼亚 可用区B

      德国(法兰克福)

      eu-central-1

      • 法兰克福 可用区A

      • 法兰克福 可用区B

步骤一:新增网络连接

  1. 进入网络连接页面。

    1. 登录E-MapReduce控制台

    2. 在左侧导航栏,选择EMR Serverless > Spark

    3. Spark页面,单击目标工作空间名称。

    4. EMR Serverless Spark页面,单击左侧导航栏中的网络连接

  2. 网络连接页面,单击新增网络连接

  3. 新增网络连接对话框中,配置以下信息,单击确定

    参数

    说明

    连接名称

    输入新增连接的名称。

    专有网络

    选择与EMR集群相同的专有网络。

    如果当前没有可选择的专有网络,请单击创建专有网络,前往专有网络控制台创建,详情请参见创建和管理专有网络

    交换机

    选择与EMR集群部署在同一专有网络下的相同交换机。

    如果当前可用区没有交换机,请单击虚拟交换机,前往专有网络控制台创建,详情请参见创建和管理交换机

    重要

    仅支持选择特定可用区下的交换机,详情请参见使用限制

    状态显示为已成功时,表示新增网络连接成功。

    image

步骤二:为EMR集群添加安全组规则

  1. 获取已创建网络连接中指定交换机的网段。

    您可以登录专有网络管理控制台,在交换机页面获取交换机的网段。

    image

  2. 添加安全组规则。

    1. 登录EMR on ECS控制台

    2. 集群管理页面,单击目标集群的集群ID。

    3. 基础信息页面,单击集群安全组后面的链接。

    4. 安全组规则页面,单击手动添加,填写端口范围授权对象,然后单击保存

      参数

      说明

      端口范围

      填写9083端口。

      授权对象

      填写前一步骤中获取的指定交换机的网段。

      重要

      为防止被外部的用户攻击导致安全问题,授权对象禁止填写为0.0.0.0/0

(可选)步骤三:连接Hive服务并查询表数据

如果您已有创建并配置好的Hive表,则可以跳过该步骤。

  1. 使用SSH方式登录集群的Master节点,详情请参见登录集群

  2. 执行以下命令,进入Hive命令行。

    hive
  3. 执行以下命令,创建表。

    CREATE TABLE my_table (id INT,name STRING);
  4. 执行以下命令,向表中插入数据。

    INSERT INTO my_table VALUES (1, 'John'); 
    INSERT INTO my_table VALUES (2, 'Jane');
  5. 执行以下命令,查询数据。

    SELECT * FROM my_table;

(可选)步骤四:准备并上传资源文件

如果您后续使用JAR任务类型,则需提前准备好资源文件。如果使用本文的SparkSQL任务类型,则可以跳过该步骤。

  1. 在本地新建一个Maven工程。

    工程内容如下

    package com.example;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    
    public class DataFrameExample {
        public static void main(String[] args) {
            // 创建SparkSession。
            SparkSession spark = SparkSession.builder()
                    .appName("HMSQueryExample")
                    .enableHiveSupport()
                    .getOrCreate();
    
            // 执行查询。
            Dataset<Row> result = spark.sql("SELECT * FROM default.my_table");
    
            // 打印查询结果。
            result.show();
    
            // 关闭SparkSession。
            spark.stop();
        }
    }

    pom文件内容如下

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>sparkDataFrame</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <spark.version>3.3.1</spark.version>
            <scala.binary.version>2.12</scala.binary.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_${scala.binary.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
        </dependencies>
    </project>
  2. 使用mvn package命令打包,编译打包后生成sparkDataFrame-1.0-SNAPSHOT.jar文件。

  3. 在EMR Serverless Spark页面的目标工作空间下,单击左侧的文件管理

  4. 文件管理页面,单击上传文件

  5. 上传本地打包好的sparkDataFrame-1.0-SNAPSHOT.jar文件。

步骤五:新建并运行任务

JAR任务

  1. 在EMR Serverless Spark页面,单击左侧的数据开发

  2. 单击新建

  3. 输入名称,类型选择Application(批任务) > JAR,单击确定

  4. 在新建的任务开发中,配置以下信息,其余参数无需配置,然后单击运行

    参数

    说明

    主jar资源

    选择前一步骤中上传的资源文件。例如,sparkDataFrame-1.0-SNAPSHOT.jar。

    Main Class

    提交Spark任务时所指定的主类。本文示例填写为com.example.DataFrameExample

    Spark配置

    配置以下信息。

    spark.hadoop.hive.metastore.uris thrift://*.*.*.*:9083
    spark.hadoop.hive.imetastoreclient.factory.class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClientFactory
    spark.emr.serverless.network.service.name <yourConnectionName>

    其中,以下信息请您根据实际情况替换:

    • *.*.*.*为HMS服务的内网IP地址。本示例为EMR集群Master节点的内网IP,您可以在EMR集群的节点管理页面中查看。

    • <yourConnectionName>为您在步骤一中新增的网络连接的名称。

  5. 运行任务后,在下方的运行记录区域,单击任务操作列的详情

  6. 任务历史开发任务页面的日志探查页签,您可以查看相关的日志信息。

SparkSQL任务

  1. 创建并启动SQL会话,详情请参见管理SQL会话

    Spark配置参数需要配置以下信息。

    spark.hadoop.hive.metastore.uris thrift://*.*.*.*:9083
    spark.hadoop.hive.imetastoreclient.factory.class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClientFactory
    spark.emr.serverless.network.service.name <yourConnectionName>

    其中,以下信息请您根据实际情况替换:

    • *.*.*.*为HSM服务的内网IP地址。本示例为EMR集群Master节点的内网IP,您可以在EMR集群的节点管理页面中查看。

    • <yourConnectionName>为您在步骤一中新增的网络连接的名称。

  2. 在EMR Serverless Spark页面,单击左侧的数据开发

  3. 单击新建

  4. 输入名称,类型选择SQL > SparkSQL,单击确定

  5. 在新建的任务开发中,选择Catalog、数据库和已启动的SQL会话实例,输入以下命令,并单击运行

    SELECT * FROM default.my_table;
    说明

    当您计划将基于外部Metastore的SQL代码部署到工作流时,请确保您的SQL语句以db.table_name的形式指定表名,并且务必在界面右上方“Catalog”选项中选取一个默认库,其格式应为catalog_id.default

    下方的运行结果区域会向您展示返回信息。

    image