Tablestore

本文介绍如何使用Databricks 读写Tablestore数据。

前提条件

使用Databricks 读写Tablestore数据

1.通过创建表的方式读取Tablestore数据;

%sql
--创建数据库
CREATE DATABASE IF NOT EXISTS table_store;
USE table_store;
--创建表
DROP TABLE IF EXISTS delta_order_source;
CREATE TABLE delta_order_source
USING tablestore
-- 配置项信息链接tablestore,定义schema
OPTIONS(
  endpoint="your endpoint",
  access.key.id="your akId",
  access.key.secret="your ads",
  instance.name="your instanceName",
  table.name="your tableName",
catalog='{"columns": {"user_id": {"type": "string"}, "order_id": {"type": "string"},"price": {"type": "double"}, "name": {"type": "string"}}}'
);
-- 数据查询
SELECT * FROM delta_order_source;
data

2.使用spark API读取Tablestore数据;

%spark
//读取配置
val df = spark.read.format("tablestore")
    .option("endpoint", "your endpoint")
    .option("access.key.id", "your akId")
    .option("access.key.secret", "your ads")
    .option("instance.name", "your instanceName")
    .option("table.name", "your tableName")
    .option("catalog", """{"columns": {"user_id": {"type": "string"}, "order_id": {"type": "string"},"price": {"type": "double"}, "name": {"type": "string"}}}""")
    .load()
df.show()
data
重要
  • catalog为Tablestore中表的schema,当catalog声明的列名在Tablestore表中不存在时,Tablestore表会自动增加一列,默认为NULL

  • 如果报Tablestore的DataSource找不到的错误,说明依赖的数据源jar包还没有安装生效

  • 注意明确元数据库的Location,推荐使用oss,`desc database default;`命令查看

3.将数据写入到Tablestore

%spark
//定义将要插入Tablestore的DataFrame;
val add_df = spark.createDataFrame(
    Seq(
        ("1086", "20191118-10", 2250.0, "jack"),
        ("1010", "20191118-11", 2200.0, "rose")
    )
).toDF("user_id", "order_id", "price", "name")
// 将定义好的DF写入到创建的数据表中,同时也会写入到Tablestore中;
add_df.write.format("Tablestore").mode("append").saveAsTable("delta_order_source")
//数据查询
spark.table("delta_order_source").where("user_id == '1086'").show()
data
说明

结果已经写入Tablestore

data