Lindorm宽表引擎是面向海量半结构化、结构化数据设计的分布式存储,适用于元数据、订单、账单、画像、社交、feed流、日志等场景,兼容HBase、Phoenix(SQL)。本文主要介绍如何通过DLA Serverless Spark访问Lindorm的宽表SQL(Phoenix)。

前提条件

  • 已开通宽表SQL服务。具体操作请参见开通宽表SQL
  • 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参见开通OSS服务
  • 准备DLA Spark访问Lindorm实例宽表引擎所需的安全组ID和交换机ID。具体操作请参见配置数据源网络
  • 前往Lindorm控制台,把要访问的Lindorm实例VPC网段加入到访问控制白名单中。具体操作请参见设置白名单
  • 在宽表SQL服务中已创建表并插入数据。假设本文档创建的表名为us_population。参考命令样例如下:
    #建表语句:
    CREATE TABLE IF NOT EXISTS us_population (
    state CHAR(2) NOT NULL,
    city VARCHAR NOT NULL,
    population BIGINT
    CONSTRAINT my_pk PRIMARY KEY (state, city));
    #插入数据语句:
    UPSERT INTO us_population VALUES('NY','New York',8143197);
    UPSERT INTO us_population VALUES('CA','Los Angeles',3844829);
    UPSERT INTO us_population VALUES('IL','Chicago',2842518);
    UPSERT INTO us_population VALUES('TX','Houston',2016582);
    UPSERT INTO us_population VALUES('PA','Philadelphia',1463281);
    UPSERT INTO us_population VALUES('AZ','Phoenix',1461575);
    UPSERT INTO us_population VALUES('TX','San Antonio',1256509);
    UPSERT INTO us_population VALUES('CA','San Diego',1255540);
    UPSERT INTO us_population VALUES('TX','Dallas',1213825);
    UPSERT INTO us_population VALUES('CA','San Jose',912332);

操作步骤

  1. 准备以下测试代码和依赖包来访问宽表SQL,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
    测试代码示例:
    package com.aliyun.spark
    
    import org.apache.spark.sql.SparkSession
    
    object SparkOnLindormForPhoenix {
    
      def main(args: Array[String]): Unit = {
        //queryServerAddress为Lindorm集群宽表SQL服务访问地址,格式为:http://xxx:8765。
        val queryServerAddress = args(0)
        //宽表引擎的用户名和密码。
        val user = args(1)
        val password = args(2)
        //Phoenix侧的表名,需要在Phoenix侧提前创建。
        val phoenixTableName = args(3)
        //Spark侧的表名。
        val sparkTableName = args(4)
    
        val sparkSession = SparkSession
          .builder()
          .appName("scala spark on Phoenix5.x test")
          .getOrCreate()
    
        //如果存在的话就删除表。
        sparkSession.sql(s"drop table if exists $sparkTableName")
    
        val driver = "org.apache.phoenix.queryserver.client.Driver"
        val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF"
        val createCmd = "CREATE TABLE " +
          sparkTableName +
          " USING org.apache.spark.sql.jdbc\n" +
          "OPTIONS (\n" +
          "  'driver' '" + driver + "',\n" +
          "  'url' '" + url + "',\n" +
          "  'user' '" + user + "',\n" +
          "  'password' '" + password + "',\n" +
          "  'dbtable' '" + phoenixTableName + "',\n" +
          "  'fetchsize' '" + 100 + "'\n" +
          ")"
        println(" createCmd: \n" + createCmd)
        sparkSession.sql(createCmd)
        val querySql = "select * from " + sparkTableName + " limit 1"
        sparkSession.sql(querySql).show
        sparkSession.stop()
      }
    }
    宽表SQL依赖的pom文件:
            <dependency>
                <groupId>com.aliyun.phoenix</groupId>
                <artifactId>ali-phoenix-shaded-thin-client</artifactId>
                <version>5.2.2-HBase-2.x-SNAPSHOT</version>
            </dependency>
  2. 登录Data Lake Analytics管理控制台
  3. 在页面左上角,选择Lindorm宽表SQL所在实例的相同地域。
  4. 单击左侧导航栏中的Serverless Spark > 作业管理
  5. 作业编辑页面,单击创建作业
  6. 创建作业页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
    3
  7. 单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
    {
        "args": [
            "http://ld-xxx-proxy-phoenix.lindorm.rds.aliyuncs.com:8765",  #宽表SQL的“客户端访问地址”-> “私网”。注意地址要以:“http://”开头。
            "xxx1",  #访问Lindorm的用户名。
            "xxx2",  #访问Lindorm的密码。
            "us_population",  #Phoenix的表名。
            "spark_on_lindorm_phoenix"  #Spark中创建映射Phoenix表的表名。
        ],
        "file": "oss://spark_test/jars/lindorm/spark-examples-0.0.1-SNAPSHOT.jar",  ##测试代码的OSS路径。
        "name": "lindorm-for-phoenix-test",
        "jars": [
            "oss://spark_test/jars/lindorm/ali-phoenix-shaded-thin-client-5.2.2-HBase-2.x-SNAPSHOT.jar"  ##测试代码依赖包的OSS路径。
        ],
        "className": "com.aliyun.spark.SparkOnLindormForPhoenix",
        "conf": {
            "spark.driver.resourceSpec": "small",  #表示driver的规格,有small、medium、large、xlarge之分。
            "spark.executor.instances": 2,  #表示executor的个数。
            "spark.executor.resourceSpec": "small",  ##表示executor的规格,有small、medium、large、xlarge之分。
            "spark.dla.eni.enable": "true",  #开启访问用户VPC网络的权限。当您需要访问用户VPC网络内的数据时,需要开启此选项。
            "spark.dla.eni.vswitch.id": "vsw-xxx",  #可访问Lindorm的交换机id。
            "spark.dla.eni.security.group.id": "sg-xxx"  #可访问Lindorm的安全组id。
        }
    }

执行结果

作业运行成功后,在任务列表中单击操作 > 日志,查看作业日志。出现如下日志说明作业运行成功:作业日志