Using EMR

更新时间: 2026-06-05 02:50:10

Connect EMR to an Object Storage Service (OSS) data lake by using the JindoFS cache mode.

Background

EMR can connect to an OSS data lake by using either the JindoFS cache mode or the JindoFS block storage mode.

  • The cache mode provides compatibility with native OSS by storing files as objects. To improve access efficiency within the EMR cluster, frequently accessed files are cached locally. This approach preserves the original file format, ensuring full compatibility with other OSS clients. For more information, see JindoFS cache mode usage instructions.

  • The block storage mode delivers maximum efficiency for data reads, writes, and metadata access. In this mode, data is stored as blocks in OSS, and a local cache accelerates operations. A local Namespace service manages metadata to ensure high-performance access. For more information, see JindoFS block storage mode usage instructions.

Prerequisites

  • You have created an EMR cluster. For more information, see Create a cluster.

    When you create the cluster, take note of the following:

    • The EMR cluster and the OSS bucket must belong to the same Alibaba Cloud account. For best results, they should also be in the same region.

    • During cluster creation, enable Assign Public Network IP and Log on to Cluster in SSH Mode. These options connect the cluster to the public network, allowing you to log in to the server remotely with a shell.

    • The bigboot and smartdata services are required for subsequent configurations. If they are not selected by default, make sure to select them.

  • You have created a Data Lake Delivery task. For more information, see Quick start.

Procedure

  1. Connect to OSS and enable caching using the JindoFS cache mode in EMR. For more information, see JindoFS cache mode usage instructions.

    This feature uses local disks to cache frequently accessed data blocks. By default, this feature is disabled, and all read operations access data directly from OSS. When caching is enabled, the Jindo service automatically manages the local cache and clears it based on a high-water mark. Configure the cache ratio based on your requirements.

  2. Start Spark SQL.

    1. Use a remote login tool, such as PuTTY, to log on to the EMR header server.

    2. Run the following command to start Spark SQL.

      spark-sql --master yarn --num-executors 5 --executor-memory 1g --executor-cores 2
  3. Use a SQL statement to create an external table that points to the OSS data directory.

    Use the SQL statement obtained from the Table Store console. The following SQL statement is for reference only.

    CREATE EXTERNAL TABLE  lineitem (l_orderkey bigint,l_linenumber bigint,l_receiptdate string,l_returnflag string,l_tax double,l_shipmode string,l_suppkey bigint,l_shipdate string,l_commitdate string,l_partkey bigint,l_quantity double,l_comment string,l_linestatus string,l_extendedprice double,l_discount double,l_shipinstruct string) PARTITIONED BY (`year` int, `month` int) STORED AS PARQUET  LOCATION  'jfs://test/' ;

    On the instance's Deliver Data to OSS page, in the Actions column for the delivery task, click View Statement to Create Table to view and copy the SQL statement.

  4. Run the following SQL statement to load the data partitions from the OSS data source.

    In the command, lineitem is the name of the external table that you created.

    msck repair table lineitem;
    20/09/22 15:17:04 INFO [main] SparkSQLQueryListener: execution is called
    20/09/22 15:17:04 INFO [main] SparkSQLQueryListener: Spark user root executed on 1600759024916 with spark sql successfully.
    Time taken: 1.377 seconds
    20/09/22 15:17:04 INFO [main] SparkSQLCLIDriver: Time taken: 1.377 seconds
    spark-sql> msck repair table lineitem;
    20/09/22 15:17:20 INFO [main] AlterTableRecoverPartitionsCommand: Recover all the partitions in jfs://test/
    20/09/22 15:17:20 INFO [main] AbstractJindoFileSystem: Jboot log name is /var/log/bigboot/jboot-INFO-1600759040539-
    20/09/22 15:17:20 INFO [main] OssStore: Filesystem support for magic committers is enabled, write buffer size 1048576
    20/09/22 15:17:21 INFO [main] FsStats: cmd=listStatus, src=jfs://test/, dst=null, size=1, parameter=, time-in-ms=444, version=2.7.301
    20/09/22 15:17:21 INFO [main] FsStats: cmd=listStatus, src=jfs://test/year=2020, dst=null, size=2, parameter=, time-in-ms=151, version=2.7.301
    20/09/22 15:17:21 INFO [main] AlterTableRecoverPartitionsCommand: Found 2 partitions in jfs://test/
    20/09/22 15:17:21 INFO [main] FsStats: cmd=listStatus, src=jfs://test/year=2020/month=8, dst=null, size=21, parameter=, time-in-ms=163, version=2.7
    20/09/22 15:17:21 INFO [main] FsStats: cmd=listStatus, src=jfs://test/year=2020/month=9, dst=null, size=21, parameter=, time-in-ms=86, version=2.7
    20/09/22 15:17:21 INFO [main] AlterTableRecoverPartitionsCommand: Finished to gather the fast stats for all 2 partitions.
    20/09/22 15:17:22 INFO [main] AlterTableRecoverPartitionsCommand: Recovered all partitions (2).
    20/09/22 15:17:22 INFO [main] SparkSQLQueryListener: command is called
    20/09/22 15:17:22 INFO [main] SparkSQLQueryListener: Spark user root executed on 1600759042070 with spark sql successfully.
    20/09/22 15:17:22 INFO [main] SparkSQLQueryListener: execution is called
    20/09/22 15:17:22 INFO [main] SparkSQLQueryListener: Spark user root executed on 1600759042100 with spark sql successfully.
    Time taken: 1.693 seconds
    20/09/22 15:17:22 INFO [main] SparkSQLCLIDriver: Time taken: 1.693 seconds
    spark-sql>
  5. Query the data.

    select * from lineitem limit 1;
    20/09/22 15:18:51 INFO [main] SparkSQLQueryListener: execution is called
    20/09/22 15:18:51 INFO [main] SparkSQLQueryListener: Spark user root executed on 1600759131254 with spark sql successfully.
    20/09/22 15:18:51 INFO [main] FsStats: cmd=getFileStatus, src=jfs://test/_index, dst=null, size=-1, parameter=null, time-in-ms=22, version=2.7.301
    20/09/22 15:18:51 INFO [main] PrunedInMemoryFileIndex: It took 1 ms to list leaf files for 2 paths.
    20/09/22 15:18:51 INFO [main] SparkSQLQueryListenerHelper: Partitioned table:default.lineitem;cols:l_orderkey,l_linenumber,l_receiptdate,l_returnflag,l_tax,l_shipmode,l_suppkey,l_shipdate,_commitdate,l_partkey,l_quantity,l_comment,l_linestatus,l_extendedprice,l_discount,l_shipinstruct;parts:year=2020/month=8,year=2020/month=9;paths:jfs://test/year=2020/month=8,jfs://test/year=2020/month=9.
    20/09/22 15:18:51 INFO [main] NativeClient: JindoTable put 2 records.
    44095908	1	1996-09-19	N	0.03	SHIP	5928453 1996-08-28	1996-06-19	145353442	10.0	lly ironic theo O	14881.8 0.08	TAKE BACK RETURN
    020	8
    Time taken: 6.22 seconds, Fetched 1 row(s)
    20/09/22 15:18:51 INFO [main] SparkSQLCLIDriver: Time taken: 6.22 seconds, Fetched 1 row(s)
    spark-sql>
上一篇: Data lake-based computing and analysis 下一篇: Data visualization
阿里云首页 表格存储 相关技术圈