本文介绍Spark如何消费Table Store的数据。
Spark接入Table Store
准备一张数据表pet,其中name为主键。
name | owner | species | sex | birth | death |
Fluffy | Harold | cat | f | 1993-02-04 | - |
Claws | Gwen | cat | m | 1994-03-17 | - |
Buffy | Harold | dog | f | 1989-05-13 | - |
Fang | Benny | dog | m | 1990-08-27 | - |
Bowser | Diane | dog | m | 1979-08-31 | 1995-07-29 |
Chirpy | Gwen | bird | f | 1998-09-11 | - |
Whistler | Gwen | bird | - | 1997-12-09 | - |
Slim | Benny | snake | m | 1996-04-29 | - |
Puffball | Diane | hamster | f | 1999-03-30 | - |
以下示例演示了如何在Spark中消费Table Store的数据。
private static RangeRowQueryCriteria fetchCriteria() {
RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
res.setMaxVersions(1);
List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
return res;
}
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
JavaSparkContext sc = null;
try {
sc = new JavaSparkContext(sparkConf);
Configuration hadoopConf = new Configuration();
TableStore.setCredential(
hadoopConf,
new Credential(accessKeyId, accessKeySecret, securityToken));
Endpoint ep = new Endpoint(endpoint, instance);
TableStore.setEndpoint(hadoopConf, ep);
TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
hadoopConf, TableStoreInputFormat.class,
PrimaryKeyWritable.class, RowWritable.class);
System.out.println(
new Formatter().format("TOTAL: %d", rdd.count()).toString());
} finally {
if (sc != null) {
sc.close();
}
}
}
spark-sql访问Table Store
命令示例如下。
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
--hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
--hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量。
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
中包含TableStore DataSource类型,如果您EMR集群使用的是Spark2,则应该将上面命令中的spark3
替换为spark2
。针对Spark3,如果查询Table Store时报错
java.lang.ClassNotFoundException: org.apache.commons.net.util.Base64
,则需要添加commons-net依赖,--jars
参数值可以改为/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar
。如果您不想在每次执行命令时都添加
--jars
参数,可以在Spark服务的配置页签,修改配置项spark.driver.extraClassPath和spark.executor.extraClassPath,在配置项的值中添加/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*:/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar
的内容。
建表和读取数据示例如下。
create table test_tableStore
using tablestore
options(endpoint = 'https://test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
access.key.id = '${hiveconf:accessKeyId}',
access.key.secret = '${hiveconf:accessKeySecret}',
table.name = 'test_table',
instance.name = 'test_instance',
catalog = '{"columns":{"pk":{"col":"pk","type":"string"},"data":{"col":"data","type":"string"}}}'
);
select * from test_tableStore
相关文档
完整示例代码,请参见Spark接入Table Store。
更多示例信息,请参见批计算。