Spark Connector

更新时间:
复制 MD 格式

MaxCompute open storage allows Spark to use a connector to call the Storage API and read data directly from MaxCompute. This approach simplifies the data reading process and improves access performance. Integrating Spark with MaxCompute data storage provides efficient, flexible, and powerful data processing and analysis capabilities.

Scope

  • When a third-party engine accesses MaxCompute:

    • You can read data from standard tables, partitioned tables, clustered tables, Delta Tables, and materialized views.

    • You cannot read data from MaxCompute foreign tables or logical views.

  • The connector does not support reading the JSON data type.

  • For open storage (pay-as-you-go), the default request concurrency limit is 1,000 per tenant. The transmission rate for each concurrent request is 10 MB/s.

Procedure

  1. Activate MaxCompute and create a MaxCompute project.

  2. Install Git.

  3. Purchase an exclusive resource group for Data Transmission Service (subscription) or activate open storage (pay-as-you-go) resources.

  4. Deploy a Spark development environment.

    Click Spark to download a Spark package for versions Spark 3.2.x - Spark 3.5.x, and then decompress the package to a local folder.

    1. To build the Spark development environment on a Linux operating system, see Build a Linux development environment.

    2. To build the Spark development environment on a Windows operating system, see Build a Windows development environment.

  5. Download and compile the Spark connector. Currently, only Spark versions from 3.2.x to 3.5.x are supported. This topic uses Spark 3.3.1 as an example.

    Use the git clone command to download the Spark connector package. Ensure that Git is installed in your environment. Otherwise, an error occurs when you run the command.

    ## Download the Spark connector.
    git clone https://github.com/aliyun/aliyun-maxcompute-data-collectors.git
    
    ## Switch to the spark-connector folder.
    cd aliyun-maxcompute-data-collectors/spark-connector 
    
    ## Compile the connector.
    mvn clean package
    
    ## Location of the datasource JAR package.
    datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar
    
    ## Copy the datasource JAR package to the $SPARK_HOME/jars/ folder.
    cp datasource/target/spark-odps-datasource-3.3.1-odps0.43.0.jar $SPARK_HOME/jars/
  6. Configure MaxCompute account access information.

    In the conf folder of your Spark installation, create a spark-defaults.conf file:

    cd $SPARK_HOME/conf
    vim spark-defaults.conf

    Add the following account information to the spark-defaults.conf file:

    ## Configure the account in spark-defaults.conf.
    spark.hadoop.odps.project.name=doc_test
    spark.hadoop.odps.access.id=L********************
    spark.hadoop.odps.access.key=*******************
    spark.hadoop.odps.end.point=http://service.cn-beijing.maxcompute.aliyun.com/api
    spark.hadoop.odps.tunnel.quota.name=ot_xxxx_p#ot_xxxx
    ## Configure the MaxCompute catalog.
    spark.sql.catalog.odps=org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog 
    spark.sql.extensions=org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions
  7. Access MaxCompute through the Spark connector.

    1. Run the following command in the bin folder of your Spark installation to start the Spark SQL client:

      cd $SPARK_HOME/bin
      spark-sql
    2. Query the tables in the MaxCompute project:

      SHOW tables in odps.doc_test;

      doc_test is a sample MaxCompute project name. Replace it with the name of your MaxCompute project.

    3. Create a table:

      CREATE TABLE odps.doc_test.mc_test_table (name STRING, num BIGINT);
    4. Read data from the table:

      SELECT * FROM odps.doc_test.mc_test_table;
    5. Create a partitioned table:

       CREATE TABLE odps.doc_test.mc_test_table_pt (name STRING, num BIGINT) PARTITIONED BY (pt1 STRING, pt2 STRING);
    6. Read data from the partitioned table:

      SELECT * FROM odps.doc_test.mc_test_table_pt;

      The following output is returned:

      test1   1       2018    0601
      test2   2       2018    0601
      Time taken: 1.312 seconds, Fetched 2 row(s)
    7. Delete the table:

      DROP TABLE IF EXISTS odps.doc_test.mc_test_table;