使用EMR

更新时间:
复制 MD 格式

使用EMRJindoFS缓存模式连接OSS数据湖。

背景信息

您可以使用EMRJindoFS缓存模式或者JindoFS块模式连接OSS数据湖。

  • 缓存模式(Cache)主要兼容原生OSS存储方式,文件以对象的形式存储在OSS上,每个文件根据实际访问情况会在本地进行缓存,提升EMR集群内访问OSS的效率,同时兼容了OSS原有文件形式,数据访问上能够与其他OSS客户端完全兼容。详情请参见JindoFS缓存模式使用说明

  • 块存储模式(Block)提供了最为高效的数据读写能力和元数据访问能力。数据以Block形式存储在后端存储OSS上,本地提供缓存加速,元数据则由本地Namespace服务维护,提供高效的元数据访问性能。详情请参见JindoFS块存储模式使用说明

前提条件

  • 已创建EMR集群,详情请参见创建集群

    创建集群时,请注意如下事项:

    • 创建EMR集群和OSS属于同一个阿里云账号,且建议EMR集群和OSS Bucket处于同一地域。

    • 创建集群时,请打开挂载公网远程登录开关,将集群挂载到公网,用于Shell远程登录服务器。

    • bigbootsmartdata为后续配置相关服务,如果默认未选中,请选中bigbootsmartdata服务。

  • 已创建数据投递任务,详情请参见快速入门

操作步骤

  1. 使用EMRjindoFS缓存模式连接OSS和启用缓存,详情请参见JindoFS缓存模式使用说明

    启用缓存会利用本地磁盘对访问的热数据块进行缓存,默认状态为禁用,即所有OSS读取都直接访问OSS上的数据。缓存启用后,Jindo服务会自动管理本地缓存备份,通过水位清理本地缓存,请根据需求配置一定的比例用于缓存。

  2. 启动spark SQL。

    1. 通过远程登录工具(例如PuTTY)登录EMR Header服务器。

    2. 执行如下命令运行Spark SQL。

      spark-sql --master yarn --num-executors 5 --executor-memory 1g --executor-cores 2
  3. 使用SQL语句创建指向OSS数据目录的外表。

    请使用通过表格存储控制台获取的SQL语句,如下SQL语句示例仅供参考。

    CREATE EXTERNAL TABLE  lineitem (l_orderkey bigint,l_linenumber bigint,l_receiptdate string,l_returnflag string,l_tax double,l_shipmode string,l_suppkey bigint,l_shipdate string,l_commitdate string,l_partkey bigint,l_quantity double,l_comment string,l_linestatus string,l_extendedprice double,l_discount double,l_shipinstruct string) PARTITIONED BY (`year` int, `month` int) STORED AS PARQUET  LOCATION  'jfs://test/' ;

    在实例的数据湖投递页面,单击投递任务操作列的建表语句,可以查看和复制SQL语句。

  4. 执行如下SQL语句,加载OSS数据源中实际的数据分区。

    其中lineitem为创建的外表名称。

    msck repair table lineitem;
    20/09/22 15:17:04 INFO [main] SparkSQLQueryListener: execution is called
    20/09/22 15:17:04 INFO [main] SparkSQLQueryListener: Spark user root executed on 1600759024916 with spark sql successfully.
    Time taken: 1.377 seconds
    20/09/22 15:17:04 INFO [main] SparkSQLCLIDriver: Time taken: 1.377 seconds
    spark-sql> msck repair table lineitem;
    20/09/22 15:17:20 INFO [main] AlterTableRecoverPartitionsCommand: Recover all the partitions in jfs://test/
    20/09/22 15:17:20 INFO [main] AbstractJindoFileSystem: Jboot log name is /var/log/bigboot/jboot-INFO-1600759040539-
    20/09/22 15:17:20 INFO [main] OssStore: Filesystem support for magic committers is enabled, write buffer size 1048576
    20/09/22 15:17:21 INFO [main] FsStats: cmd=listStatus, src=jfs://test/, dst=null, size=1, parameter=, time-in-ms=444, version=2.7.301
    20/09/22 15:17:21 INFO [main] FsStats: cmd=listStatus, src=jfs://test/year=2020, dst=null, size=2, parameter=, time-in-ms=151, version=2.7.301
    20/09/22 15:17:21 INFO [main] AlterTableRecoverPartitionsCommand: Found 2 partitions in jfs://test/
    20/09/22 15:17:21 INFO [main] FsStats: cmd=listStatus, src=jfs://test/year=2020/month=8, dst=null, size=21, parameter=, time-in-ms=163, version=2.7
    20/09/22 15:17:21 INFO [main] FsStats: cmd=listStatus, src=jfs://test/year=2020/month=9, dst=null, size=21, parameter=, time-in-ms=86, version=2.7
    20/09/22 15:17:21 INFO [main] AlterTableRecoverPartitionsCommand: Finished to gather the fast stats for all 2 partitions.
    20/09/22 15:17:22 INFO [main] AlterTableRecoverPartitionsCommand: Recovered all partitions (2).
    20/09/22 15:17:22 INFO [main] SparkSQLQueryListener: command is called
    20/09/22 15:17:22 INFO [main] SparkSQLQueryListener: Spark user root executed on 1600759042070 with spark sql successfully.
    20/09/22 15:17:22 INFO [main] SparkSQLQueryListener: execution is called
    20/09/22 15:17:22 INFO [main] SparkSQLQueryListener: Spark user root executed on 1600759042100 with spark sql successfully.
    Time taken: 1.693 seconds
    20/09/22 15:17:22 INFO [main] SparkSQLCLIDriver: Time taken: 1.693 seconds
    spark-sql>
  5. 查询数据。

    select * from lineitem limit 1;
    20/09/22 15:18:51 INFO [main] SparkSQLQueryListener: execution is called
    20/09/22 15:18:51 INFO [main] SparkSQLQueryListener: Spark user root executed on 1600759131254 with spark sql successfully.
    20/09/22 15:18:51 INFO [main] FsStats: cmd=getFileStatus, src=jfs://test/_index, dst=null, size=-1, parameter=null, time-in-ms=22, version=2.7.301
    20/09/22 15:18:51 INFO [main] PrunedInMemoryFileIndex: It took 1 ms to list leaf files for 2 paths.
    20/09/22 15:18:51 INFO [main] SparkSQLQueryListenerHelper: Partitioned table:default.lineitem;cols:l_orderkey,l_linenumber,l_receiptdate,l_returnflag,l_tax,l_shipmode,l_suppkey,l_shipdate,_commitdate,l_partkey,l_quantity,l_comment,l_linestatus,l_extendedprice,l_discount,l_shipinstruct;parts:year=2020/month=8,year=2020/month=9;paths:jfs://test/year=2020/month=8,jfs://test/year=2020/month=9.
    20/09/22 15:18:51 INFO [main] NativeClient: JindoTable put 2 records.
    44095908	1	1996-09-19	N	0.03	SHIP	5928453 1996-08-28	1996-06-19	145353442	10.0	lly ironic theo O	14881.8 0.08	TAKE BACK RETURN
    020	8
    Time taken: 6.22 seconds, Fetched 1 row(s)
    20/09/22 15:18:51 INFO [main] SparkSQLCLIDriver: Time taken: 6.22 seconds, Fetched 1 row(s)
    spark-sql>