本文介绍Spark如何消费Table Store的数据。

Spark接入Table Store

准备一张数据表pet,其中name为主键。

nameownerspeciessexbirthdeath
FluffyHaroldcatf1993-02-04-
ClawsGwencatm1994-03-17-
BuffyHarolddogf1989-05-13-
FangBennydogm1990-08-27-
BowserDianedogm1979-08-311995-07-29
ChirpyGwenbirdf1998-09-11-
WhistlerGwenbird-1997-12-09-
SlimBennysnakem1996-04-29-
PuffballDianehamsterf1999-03-30-
以下示例演示了如何在Spark中消费Table Store的数据。
private static RangeRowQueryCriteria fetchCriteria() {
    RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
    res.setMaxVersions(1);
    List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
    List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
    lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
    upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
    res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
    res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
    return res;
}
public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    Configuration hadoopConf = new Configuration();
    JavaSparkContext sc = null;
    try {
        sc = new JavaSparkContext(sparkConf);
        Configuration hadoopConf = new Configuration();
        TableStore.setCredential(
                hadoopConf,
                new Credential(accessKeyId, accessKeySecret, securityToken));
        Endpoint ep = new Endpoint(endpoint, instance);
        TableStore.setEndpoint(hadoopConf, ep);
        TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
        JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
                hadoopConf, TableStoreInputFormat.class,
                PrimaryKeyWritable.class, RowWritable.class);
        System.out.println(
            new Formatter().format("TOTAL: %d", rdd.count()).toString());
    } finally {
        if (sc != null) {
            sc.close();
        }
    }
}

spark-sql访问Table Store

命令示例如下。
spark-sql --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*
说明 /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/*中包含TableStore DataSource类型,如果您EMR集群使用的是Spark2,则应修改上面命令中的spark3应该换成spark2
建表和读取数据示例如下。
create table test_tableStore
using tablestore
  options(endpoint = "https://test.cn-hangzhou.vpc.tablestore.aliyuncs.com",
  access.key.id = "LTAI5tM85Z4s****",
  access.key.secret = "HF7P1L8PS6Eqfapiku****",
  table.name = "test_table",
  instance.name = "test_instance",
  catalog = '{"columns":{"pk":{"col":"pk","type":"string"},"data":{"col":"data","type":"string"}}}'
);

select * from test_tableStore

相关文档