本文介绍Spark如何消费Table Store的数据。
Spark接入Table Store
准备一张数据表pet,其中name为主键。
name | owner | species | sex | birth | death |
---|---|---|---|---|---|
Fluffy | Harold | cat | f | 1993-02-04 | - |
Claws | Gwen | cat | m | 1994-03-17 | - |
Buffy | Harold | dog | f | 1989-05-13 | - |
Fang | Benny | dog | m | 1990-08-27 | - |
Bowser | Diane | dog | m | 1979-08-31 | 1995-07-29 |
Chirpy | Gwen | bird | f | 1998-09-11 | - |
Whistler | Gwen | bird | - | 1997-12-09 | - |
Slim | Benny | snake | m | 1996-04-29 | - |
Puffball | Diane | hamster | f | 1999-03-30 | - |
以下示例演示了如何在Spark中消费Table Store的数据。
private static RangeRowQueryCriteria fetchCriteria() {
RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
res.setMaxVersions(1);
List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
return res;
}
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
JavaSparkContext sc = null;
try {
sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
TableStore.setCredential(
hadoopConf,
new Credential(accessKeyId, accessKeySecret, securityToken));
Endpoint ep = new Endpoint(endpoint, instance);
TableStore.setEndpoint(hadoopConf, ep);
TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
hadoopConf, TableStoreInputFormat.class,
PrimaryKeyWritable.class, RowWritable.class);
System.out.println(
new Formatter().format("TOTAL: %d", rdd.count()).toString());
} finally {
if (sc != null) {
sc.close();
}
}
}
spark-sql访问Table Store
命令示例如下。
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
说明
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
中包含TableStore DataSource类型,如果您EMR集群使用的是Spark2,则应修改上面命令中的spark3
应该换成spark2
。建表和读取数据示例如下。
create table test_tableStore
using tablestore
options(endpoint = "https://test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
access.key.id = "LTAI5tM85Z4s****",
access.key.secret = "HF7P1L8PS6Eqfapiku****",
table.name = "test_table",
instance.name = "test_instance",
catalog = '{"columns":{"pk":{"col":"pk","type":"string"},"data":{"col":"data","type":"string"}}}'
);
select * from test_tableStore
相关文档
- 完整示例代码,请参见Spark接入Table Store。
- 更多示例信息,请参见批计算。