本文介绍通过JDBC连接Spark Thrift Servert并成功提交Spark作业。
前提条件
连接Spark Thrift Server需要校验用户名和密码,请进行用户认证配置,请参见:用户管理
DDI集群Spark Thrift Server默认端口号为10001,请确认成功添加安全组白名单,请参见:安全组白名单
背景信息
JDBC连接Spark Thrift Server如下:
Beeline:通过HiveServer2的JDBC客户端进行连接。
Java:编写Java代码进行连接。
Python:编写Python代码进行连接。
Beeline客户端连接Spark Thrift Server
执行如下命令,进入Beeline客户端。
beeline
返回如下信息
Beeline version 2.3.7 by Apache Hive
执行如下命令,连接Spark Thrift Servert。
!connect jdbc:hive2://{ddi-header-ip}:10001/{db_name}
输入用户名和密码。
Enter username for jdbc:hive2://ip:10001/beijing_dlf_db_test: username
Enter password for jdbc:hive2://ip:10001/beijing_dlf_db_test: ********
查询数据,返回结果如下:
0: jdbc:hive2://ip:10001/beijing_d> select * from table_name limit 10;
+---------+-------------+
| action | date |
+---------+-------------+
| Close | 2016-07-27 |
| Open | 2016-07-27 |
| Close | 2016-07-27 |
| Close | 2016-07-27 |
| Open | 2016-07-27 |
| Close | 2016-07-27 |
| Open | 2016-07-27 |
| Open | 2016-07-27 |
| Open | 2016-07-27 |
| Open | 2016-07-27 |
+---------+-------------+
10 rows selected (15.853 seconds)
Java代码连接Spark Thrift Server
在执行本操作前,确保您已安装Java环境和Java编程工具,并且已配置环境变量
Java代码连接Spark Thrift Server需要下载Databricks提供的依赖包,下载路径:Databricks JDBC Driver
将项目依赖SparkJDBC42.jar添加到编程工具的Lib下,如图:
编写代码,连接Spark Thrift Server并提交作业。
代码如下:
import com.simba.spark.jdbc.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class SparkJDBC {
private static Connection connectViaDS() throws Exception {
Connection connection = null;
Class.forName("com.simba.spark.jdbc.Driver");
DataSource ds = new com.simba.spark.jdbc.DataSource();
ds.setURL("jdbc:spark://{ip}:10001/{db_name};AuthMech=3;UID=username;PWD=ps");
connection = ds.getConnection();
return connection;
}
public static void main(String[] args) throws Exception {
Connection connection = connectViaDS();
System.out.println("日志打印:"+connection.getClientInfo());
PreparedStatement tables = connection.prepareStatement("show databases ");
ResultSet res = tables.executeQuery();
while (res.next()){
System.out.println("database_name :"+ res.getString(1));
}
res.close();
tables.close();
connection.close();
}
}
本地测试是否正常执行。
注意
执行代码过程中如果出现NoSuchFileException请联系DDI运维人员。
Python代码连接Spark Thrift Server
使用PyHive连接Spark Thrift Server需要依的赖包安装如下:
pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
编写Python代码,连接Spark Thrift Server并提交作业。
代码如下:vim sparkJDBC.py
from pyhive import hive
conn = hive.Connection(host='host',database='db_name', port=10001, username='username',password='ps',auth="LDAP")
cursor=conn.cursor()
cursor.execute('select * from table_name limit 10')
for result in cursor.fetchall():print(result)
conn.close()
执行Python代码结果如下:
python sparkJDBC.py
执行结果:
('Close', '2016-07-27')
('Open', '2016-07-27')
('Close', '2016-07-27')
('Close', '2016-07-27')
('Open', '2016-07-27')
('Close', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
文档内容是否对您有帮助?