本文介绍如何使用Databricks 读写Tablestore数据。
前提条件
已创建Tablestore实例
通过主账号登录阿里云 Databricks控制台。
已创建DDI集群,具体请参见DDI集群创建。
创建集群并通过knox账号访问NoteBook。
使用Databricks 读写Tablestore数据
1.通过创建表的方式读取Tablestore数据;
%sql
--创建数据库
CREATE DATABASE IF NOT EXISTS table_store;
USE table_store;
--创建表
DROP TABLE IF EXISTS delta_order_source;
CREATE TABLE delta_order_source
USING tablestore
-- 配置项信息链接tablestore,定义schema
OPTIONS(
endpoint="your endpoint",
access.key.id="your akId",
access.key.secret="your ads",
instance.name="your instanceName",
table.name="your tableName",
catalog='{"columns": {"user_id": {"type": "string"}, "order_id": {"type": "string"},"price": {"type": "double"}, "name": {"type": "string"}}}'
);
-- 数据查询
SELECT * FROM delta_order_source;
2.使用spark API读取Tablestore数据;
%spark
//读取配置
val df = spark.read.format("tablestore")
.option("endpoint", "your endpoint")
.option("access.key.id", "your akId")
.option("access.key.secret", "your ads")
.option("instance.name", "your instanceName")
.option("table.name", "your tableName")
.option("catalog", """{"columns": {"user_id": {"type": "string"}, "order_id": {"type": "string"},"price": {"type": "double"}, "name": {"type": "string"}}}""")
.load()
df.show()
重要
catalog为Tablestore中表的schema,当catalog声明的列名在Tablestore表中不存在时,Tablestore表会自动增加一列,默认为NULL
如果报Tablestore的DataSource找不到的错误,说明依赖的数据源jar包还没有安装生效
注意明确元数据库的Location,推荐使用oss,`desc database default;`命令查看
3.将数据写入到Tablestore
%spark
//定义将要插入Tablestore的DataFrame;
val add_df = spark.createDataFrame(
Seq(
("1086", "20191118-10", 2250.0, "jack"),
("1010", "20191118-11", 2200.0, "rose")
)
).toDF("user_id", "order_id", "price", "name")
// 将定义好的DF写入到创建的数据表中,同时也会写入到Tablestore中;
add_df.write.format("Tablestore").mode("append").saveAsTable("delta_order_source")
//数据查询
spark.table("delta_order_source").where("user_id == '1086'").show()
说明
结果已经写入Tablestore
说明
Tablestore结合spark 参考文档:Tablestore结合spark的流批一体SQL实战
Jar包Java方式参考文档:https://github.com/aliyun/aliyun-emapreduce-datasources/blob/main/docs/Spark-on-TableStore.md
文档内容是否对您有帮助?