访问Hive数据源
云原生数据仓库 AnalyticDB MySQL 版Spark支持通过Thrift协议和JDBC协议访问Hive数据。您也可以对Hive数据源启用Kerberos认证,确保只有经过Kerberos认证的客户端能够访问集群、正常提交作业,以此保证数据安全性。本文以E-MapReduce集群的Hive服务为例,介绍访问Hive数据源的方法。
前提条件
AnalyticDB for MySQL集群的产品系列为企业版、基础版或湖仓版。
已创建数据库账号。
如果是通过阿里云账号访问,只需创建高权限账号。具体操作,请参见创建高权限账号。
如果是通过RAM用户访问,需要创建高权限账号和普通账号并且将RAM用户绑定到普通账号上。具体操作,请参见创建数据库账号和绑定或解绑RAM用户与数据库账号。
AnalyticDB for MySQL集群与OSS存储空间位于相同地域。
AnalyticDB for MySQL集群与E-MapReduce集群位于相同地域。具体操作,请参见创建集群。
E-MapReduce集群需满足以下要求:
形态为EMR on ECS。
业务场景为数据湖,需具有Hadoop-Common、HDFS、YARN和Hive服务,元数据为自建RDS或内置MySQL。
如果您访问Kerberos加密的Hive数据源,确保已开启Kerberos身份认证。
准备工作
下载AnalyticDB for MySQL Spark访问Hive依赖的JAR包。下载链接,请参见mysql-connector-java.jar。
(条件必选)如果服务端Hive版本低于Hive 2.3,且出现Hive版本不兼容时,请登录集群的Master节点,在
/opt/apps/HIVE/hive-current/lib
路径中,下载所有的Jar包。如何登录集群的Master节点,请参见登录集群。(条件必选)如果Hive集群启用了Kerberos认证,需下载Kerberos认证相关的配置文件。
登录E-MapReduce集群。具体操作,请参见登录集群主节点。
下载krb5.conf文件。具体操作,请参见Kerberos基础使用。
下载hadoop.keytab、core-site.xml和hdfs-site.xml文件。具体操作如下:
执行以下语句,查找与Hadoop相关的环境变量和配置信息。
env | grep hadoop
返回示例如下:
HADOOP_HOME=/opt/apps/HADOOP-COMMON/hadoop-common-current/ HADOOP_CONF_DIR=/etc/taihao-apps/hadoop-conf PATH=/opt/apps/JINDOSDK/jindosdk-current/bin:/opt/apps/HADOOP-COMMON/hadoop-common-current/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/opt/apps/HIVE/hive-current/bin:/opt/apps/SPARK-EXTENSION/spark-extension-current/bin:/opt/apps/SPARK3/spark-current/bin:/root/bin
跳转至HADOOP_CONF_DIR参数所对应的目录下,下载hadoop.keytab、core-site.xml和hdfs-site.xml文件。
执行以下命令查看Principal。
listprincs
返回信息中前缀为
hadoop/master
的字符串即为所需的Principal。
将步骤1~3获取的JAR包和Kerberos认证相关的配置文件上传至OSS。具体操作,请参见简单上传。
Spark Jar访问Hive数据源
编写访问Hive的示例程序(即Spark作业依赖的JAR包),并进行编译打包。本文生成的JAR包名称为
hive_test.jar
,生成后的JAR包需上传至OSS中。具体操作,请参见简单上传。示例代码如下:package com.aliyun.spark import org.apache.spark.sql.SparkSession object SparkHive { def main(args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName("Spark HIVE TEST") .enableHiveSupport() .getOrCreate() val welcome = "hello, adb-spark" //Hive表名。 val tableName = args(0) import sparkSession.implicits._ //将只有一行一列数据的DataFrame: df存入Hive, 表名为用户传进来的tableName, 列名为welcome_col。 val df = Seq(welcome).toDF("welcome_col") df.write.format("hive").mode("overwrite").saveAsTable(tableName) //从Hive中读取表tableName。 val dfFromHive = sparkSession.sql( s""" |select * from $tableName |""".stripMargin) dfFromHive.show(10) } }
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击 。
在编辑器窗口上方,选择Job型资源组和Batch作业类型,编写Spark作业。
Spark SQL访问Hive数据源
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在企业版、基础版或湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击 。
在编辑器窗口上方,选择Spark引擎和Job型资源组,编写Spark SQL作业后,单击执行SQL。
- 本页导读 (1)
- 前提条件
- 准备工作
- Spark Jar访问Hive数据源
- Spark SQL访问Hive数据源