Spark访问Table Store

本文介绍Spark如何消费Table Store的数据。

Spark接入Table Store

准备一张数据表pet,其中name为主键。

name

owner

species

sex

birth

death

Fluffy

Harold

cat

f

1993-02-04

-

Claws

Gwen

cat

m

1994-03-17

-

Buffy

Harold

dog

f

1989-05-13

-

Fang

Benny

dog

m

1990-08-27

-

Bowser

Diane

dog

m

1979-08-31

1995-07-29

Chirpy

Gwen

bird

f

1998-09-11

-

Whistler

Gwen

bird

-

1997-12-09

-

Slim

Benny

snake

m

1996-04-29

-

Puffball

Diane

hamster

f

1999-03-30

-

以下示例演示了如何在Spark中消费Table Store的数据。

private static RangeRowQueryCriteria fetchCriteria() {
    RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
    res.setMaxVersions(1);
    List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
    upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
    res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    return res;
}
public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    Configuration hadoopConf = new Configuration();
    JavaSparkContext sc = null;
    try {
        sc = new JavaSparkContext(sparkConf);
        Configuration hadoopConf = new Configuration();
        TableStore.setCredential(
                hadoopConf,
                new Credential(accessKeyId, accessKeySecret, securityToken));
        Endpoint ep = new Endpoint(endpoint, instance);
        TableStore.setEndpoint(hadoopConf, ep);
        TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
        JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
                hadoopConf, TableStoreInputFormat.class,
                PrimaryKeyWritable.class, RowWritable.class);
        System.out.println(
            new Formatter().format("TOTAL: %d", rdd.count()).toString());
    } finally {
        if (sc != null) {
            sc.close();
        }
    }
}

spark-sql访问Table Store

命令示例如下。

spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/* \
  --hiveconf accessKeyId=$ALIBABA_CLOUD_ACCESS_KEY_ID \
  --hiveconf accessKeySecret=$ALIBABA_CLOUD_ACCESS_KEY_SECRET
说明
  • 运行代码示例前必须先配置环境变量。关于如何配置环境变量,请参见配置环境变量

  • /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*中包含TableStore DataSource类型,如果您EMR集群使用的是Spark2,则应该将上面命令中的spark3替换为spark2

  • 针对Spark3,如果查询Table Store时报错java.lang.ClassNotFoundException: org.apache.commons.net.util.Base64,则需要添加commons-net依赖,--jars参数值可以改为/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*,/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar

  • 如果您不想在每次执行命令时都添加--jars参数,可以在Spark服务的配置页签,修改配置项spark.driver.extraClassPathspark.executor.extraClassPath,在配置项的值中添加/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*:/opt/apps/HADOOP-COMMON/hadoop-common-current/share/hadoop/common/lib/commons-net-3.6.jar的内容

建表和读取数据示例如下。

create table test_tableStore
using tablestore
  options(endpoint = 'https://test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
  access.key.id = '${hiveconf:accessKeyId}',
  access.key.secret = '${hiveconf:accessKeySecret}',
  table.name = 'test_table',
  instance.name = 'test_instance',
  catalog = '{"columns":{"pk":{"col":"pk","type":"string"},"data":{"col":"data","type":"string"}}}'
);

select * from test_tableStore

相关文档