本文为您介绍如何使用 JDBC 数据表。

建表语法

CREATE TABLE tbName
USING jdbc2
OPTIONS(propertyName=propertyValue[,propertyName=propertyValue]*);
注意 这里我们使用的是 “jdbc2”。

配置参数说明

参数名 说明 是否必选
url 数据库地址
driver 数据库连接 driver,可选“com.mysql.jdbc.Driver”eper.quorum":"a.b.c.d:2181"}'
dbtable 数据表名
user 连接用户名
password 连接密码
batchsize 向数据库写入数据时生效,表示每个批次更新多少条数据
isolationLevel 事务隔离级别,默认“READ_UNCOMMITTED”
  • 事务隔离级别
    事务隔离级别 脏读 不可重复读 幻读
    READ_UNCOMMITTED
    READ_COMMITTED
    REPEATABLE_READ
    SERIALIZABLE
    NONE - - -

Table Schema

创建 JDBC 表时,无须显式地定义表的字段信息,例如:

spark-sql> CREATE DATABASE IF NOT EXISTS default;
spark-sql> USE default;
spark-sql> DROP TABLE IF EXISTS rds_table_test;
spark-sql> CREATE TABLE rds_table_test
         > USING jdbc2
         > OPTIONS (
         > url="jdbc:mysql://rm-bp11*********i7w9.mysql.rds.aliyuncs.com:3306/default?useSSL=true",
         > driver="com.mysql.jdbc.Driver",
         > dbtable="test",
         > user="root",
         > password="thisisapassword",
         > batchsize="100",
         > isolationLevel="NONE");

spark-sql> DESC rds_table_test;
id  int NULL
name  string  NULL
Time taken: 0.413 seconds, Fetched 2 row(s)

写入数据

我们需要额外设置一个关联的 SQL statement 来表达如何向数据库中写入数据,通过以下语句来表达:
spark-sql> SET streaming.query.${queryName}.sql=insert into `test` (`id`,`name`) values(?, ?);
spark-sql> SET ...
spark-sql> INSERT INTO rds_table_test SELECT ...