云数据库HBase是面向大数据领域的一站式NoSQL服务,适用于GB至PB级的大规模吞吐、检索、分析工作负载,是为淘宝推荐、支付宝账单、花呗风控等众多阿里巴巴核心服务提供支撑的数据库。本文主要介绍如何通过DLA Serverless Spark 对接云数据库HBase标准版2.0版本的HBase SQL服务Phoenix。

前提条件

  • 已开通HBase SQL服务。具体操作请参见HBase SQL(Phoenix) 5.x 使用说明
  • 已经开通对象存储OSS(Object Storage Service)服务。具体操作请参见开通OSS服务
  • 准备DLA Spark访问HBase集群SQL服务所需的安全组ID和交换机ID。具体操作请参见配置数据源网络
  • 前往HBase管理控制台,把要访问的HBase集群VPC网段加入到访问控制白名单中。具体操作请参见设置白名单和安全组
  • 在HBase SQL服务中已创建表并插入数据。假设本文档创建的表名为us_population。参考命令样例如下:
    #建表语句:
    CREATE TABLE IF NOT EXISTS us_population (
    state CHAR(2) NOT NULL,
    city VARCHAR NOT NULL,
    population BIGINT
    CONSTRAINT my_pk PRIMARY KEY (state, city));
    #插入数据语句:
    UPSERT INTO us_population VALUES('NY','New York',8143197);
    UPSERT INTO us_population VALUES('CA','Los Angeles',3844829);
    UPSERT INTO us_population VALUES('IL','Chicago',2842518);
    UPSERT INTO us_population VALUES('TX','Houston',2016582);
    UPSERT INTO us_population VALUES('PA','Philadelphia',1463281);
    UPSERT INTO us_population VALUES('AZ','Phoenix',1461575);
    UPSERT INTO us_population VALUES('TX','San Antonio',1256509);
    UPSERT INTO us_population VALUES('CA','San Diego',1255540);
    UPSERT INTO us_population VALUES('TX','Dallas',1213825);
    UPSERT INTO us_population VALUES('CA','San Jose',912332);

操作步骤

  1. 准备以下测试代码和依赖包来访问HBase SQL,并将此测试代码和依赖包分别编译打包生成jar包上传至您的OSS。
    测试代码示例:
    package com.aliyun.spark
    
    import org.apache.spark.sql.SparkSession
    
    object SparkOnHBase2xForPhoenix {
    
      def main(args: Array[String]): Unit = {
        //queryServerAddress为HBase集群SQL服务访问地址,格式为:http://xxx:8765
        val queryServerAddress = args(0)
        //Phoenix侧的表名,需要在Phoenix侧提前创建。
        val phoenixTableName = args(1)
        //Spark侧的表名。
        val sparkTableName = args(2)
    
        val sparkSession = SparkSession
          .builder()
          .appName("scala spark on Phoenix5.x test")
          .getOrCreate()
    
        //如果存在的话就删除表
        sparkSession.sql(s"drop table if exists $sparkTableName")
    
        val driver = "org.apache.phoenix.queryserver.client.Driver"
        val url = "jdbc:phoenix:thin:url=" + queryServerAddress + ";serialization=PROTOBUF"
        val createCmd = "CREATE TABLE " +
          sparkTableName +
          " USING org.apache.spark.sql.jdbc\n" +
          "OPTIONS (\n" +
          "  'driver' '" + driver + "',\n" +
          "  'url' '" + url + "',\n" +
          "  'dbtable' '" + phoenixTableName + "',\n" +
          "  'fetchsize' '" + 100 + "'\n" +
          ")"
        println(" createCmd: \n" + createCmd)
        sparkSession.sql(createCmd)
        val querySql = "select * from " + sparkTableName + " limit 1"
        sparkSession.sql(querySql).show
        sparkSession.stop()
      }
    }
    HBase SQL依赖的pom文件:
            <dependency>
                <groupId>com.aliyun.phoenix</groupId>
                <artifactId>ali-phoenix-shaded-thin-client</artifactId>
                <version>5.2.2-HBase-2.x-SNAPSHOT</version>
            </dependency>
  2. 登录Data Lake Analytics管理控制台
  3. 在页面左上角,选择HBase集群HBase SQL服务所在的地域。
  4. 单击左侧导航栏中的Serverless Spark > 作业管理
  5. 作业编辑页面,单击创建作业
  6. 创建作业页面,按照页面提示进行参数配置后,单击确定创建Spark作业。
    3
  7. 单击Spark作业名,在Spark作业编辑框中输入以下作业内容,并按照以下参数说明进行参数值替换。保存并提交Spark作业。
    {
        "args": [
            "http://hb-xxx-proxy-phoenix.hbase.rds.aliyuncs.com:8765",  #HBase SQL服务的客户端访问地址,可以是“负载均衡连接”或者“单点 QueryServer连”。
            "us_population",  #Phoenix的表名。
            "spark_on_hbase2x_phoenix"  #Spark中创建映射Phoenix表的表名。
        ],
        "file": "oss://spark_test/jars/hbase2x/spark-examples-0.0.1-SNAPSHOT.jar",  #测试代码的OSS路径。
        "name": "hbase2x-for-phoenix-test",
        "jars": [
            "oss://spark_test/jars/hbase2x/ali-phoenix-shaded-thin-client-5.2.2-HBase-2.x-SNAPSHOT.jar"  #测试代码依赖包的OSS路径。
        ],
        "className": "com.aliyun.spark.SparkOnHBase2xForPhoenix",
        "conf": {
            "spark.driver.resourceSpec": "small",  #表示driver的规格,有small、medium、large、xlarge之分。
            "spark.executor.instances": 2,  #表示executor的个数。
            "spark.executor.resourceSpec": "small",  #表示executor的规格,有small、medium、large、xlarge之分。
            "spark.dla.eni.enable": "true",  #开启访问用户VPC网络的权限。当您需要访问用户VPC网络内的数据时,需要开启此选项。
            "spark.dla.eni.vswitch.id": "vsw-xxx",  #可访问HBase 的交换机id。
            "spark.dla.eni.security.group.id": "sg-xxx"  #可访问HBase的安全组id。
        }
    }

执行结果

作业运行成功后,在任务列表中单击操作 > 日志,查看作业日志。出现如下日志说明作业运行成功:作业日志