通过JDBC连接Spark Thrift Server提交Spark作业

本文介绍通过JDBC连接Spark Thrift Servert并成功提交Spark作业。

前提条件

  • 连接Spark Thrift Server需要校验用户名和密码,请进行用户认证配置,请参见:用户管理

  • DDI集群Spark Thrift Server默认端口号为10001,请确认成功添加安全组白名单,请参见:安全组白名单

背景信息

JDBC连接Spark Thrift Server如下:

  • Beeline:通过HiveServer2的JDBC客户端进行连接。

  • Java:编写Java代码进行连接。

  • Python:编写Python代码进行连接。

Beeline客户端连接Spark Thrift Server

  1. 执行如下命令,进入Beeline客户端。

beeline

返回如下信息

Beeline version 2.3.7 by Apache Hive
  1. 执行如下命令,连接Spark Thrift Servert。

!connect jdbc:hive2://{ddi-header-ip}:10001/{db_name}
  1. 输入用户名和密码。

Enter username for jdbc:hive2://ip:10001/beijing_dlf_db_test: username
Enter password for jdbc:hive2://ip:10001/beijing_dlf_db_test: ********
  1. 查询数据,返回结果如下:

0: jdbc:hive2://ip:10001/beijing_d> select * from table_name limit 10;
+---------+-------------+
| action  |    date     |
+---------+-------------+
| Close   | 2016-07-27  |
| Open    | 2016-07-27  |
| Close   | 2016-07-27  |
| Close   | 2016-07-27  |
| Open    | 2016-07-27  |
| Close   | 2016-07-27  |
| Open    | 2016-07-27  |
| Open    | 2016-07-27  |
| Open    | 2016-07-27  |
| Open    | 2016-07-27  |
+---------+-------------+
10 rows selected (15.853 seconds)

Java代码连接Spark Thrift Server

  • 在执行本操作前,确保您已安装Java环境和Java编程工具,并且已配置环境变量

  • Java代码连接Spark Thrift Server需要下载Databricks提供的依赖包,下载路径:Databricks JDBC Driver

  1. 将项目依赖SparkJDBC42.jar添加到编程工具的Lib下,如图:

data
  1. 编写代码,连接Spark Thrift Server并提交作业。

代码如下:

import com.simba.spark.jdbc.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class SparkJDBC {
    private static Connection connectViaDS() throws Exception {
        Connection connection = null;
        Class.forName("com.simba.spark.jdbc.Driver");
        DataSource ds = new com.simba.spark.jdbc.DataSource();
        ds.setURL("jdbc:spark://{ip}:10001/{db_name};AuthMech=3;UID=username;PWD=ps");
        connection = ds.getConnection();
        return connection;
    }

    public static void main(String[] args) throws Exception {
        Connection connection = connectViaDS();
        System.out.println("日志打印:"+connection.getClientInfo());
        PreparedStatement tables = connection.prepareStatement("show databases ");
        ResultSet res = tables.executeQuery();
        while (res.next()){
            System.out.println("database_name :"+ res.getString(1));
        }
        res.close();
        tables.close();
        connection.close();
    }
}

本地测试是否正常执行。

注意

执行代码过程中如果出现NoSuchFileException请联系DDI运维人员。

Python代码连接Spark Thrift Server

  1. 使用PyHive连接Spark Thrift Server需要依的赖包安装如下:

pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive
  1. 编写Python代码,连接Spark Thrift Server并提交作业。

代码如下:vim sparkJDBC.py

from pyhive import hive
conn = hive.Connection(host='host',database='db_name', port=10001, username='username',password='ps',auth="LDAP")
cursor=conn.cursor()
cursor.execute('select * from table_name limit 10')
for result in cursor.fetchall():print(result)                  
conn.close()
  1. 执行Python代码结果如下:

python sparkJDBC.py

执行结果:

('Close', '2016-07-27')
('Open', '2016-07-27')
('Close', '2016-07-27')
('Close', '2016-07-27')
('Open', '2016-07-27')
('Close', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')
('Open', '2016-07-27')