全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
E-MapReduce

Spark + TableStore

更新时间:2017-06-07 13:26:11

Spark + TableStore

Spark接入TableStore

  • 准备一张数据表

创建一张表pet,其中name为主键。

name owner species sex birth death
Fluffy Harold cat f 1993-02-04
Claws Gwen cat m 1994-03-17
Buffy Harold dog f 1989-05-13
Fang Benny dog m 1990-08-27
Bowser Diane dog m 1979-08-31 1995-07-29
Chirpy Gwen bird f 1998-09-11
Whistler Gwen bird 1997-12-09
Slim Benny snake m 1996-04-29
Puffball Diane hamster f 1999-03-30
  • 下面这个例子示例了如何在Spark中消费TableStore中的数据。
  1. private static RangeRowQueryCriteria fetchCriteria() {
  2. RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
  3. res.setMaxVersions(1);
  4. List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
  5. List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
  6. lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
  7. upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
  8. res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
  9. res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
  10. return res;
  11. }
  12. public static void main(String[] args) {
  13. SparkConf sparkConf = new SparkConf().setAppName("RowCounter");
  14. JavaSparkContext sc = new JavaSparkContext(sparkConf);
  15. Configuration hadoopConf = new Configuration();
  16. JavaSparkContext sc = null;
  17. try {
  18. sc = new JavaSparkContext(sparkConf);
  19. Configuration hadoopConf = new Configuration();
  20. TableStore.setCredential(
  21. hadoopConf,
  22. new Credential(accessKeyId, accessKeySecret, securityToken));
  23. Endpoint ep = new Endpoint(endpoint, instance);
  24. TableStore.setEndpoint(hadoopConf, ep);
  25. TableStoreInputFormat.addCriteria(hadoopConf, fetchCriteria());
  26. JavaPairRDD<PrimaryKeyWritable, RowWritable> rdd = sc.newAPIHadoopRDD(
  27. hadoopConf, TableStoreInputFormat.class,
  28. PrimaryKeyWritable.class, RowWritable.class);
  29. System.out.println(
  30. new Formatter().format("TOTAL: %d", rdd.count()).toString());
  31. } finally {
  32. if (sc != null) {
  33. sc.close();
  34. }
  35. }
  36. }

附录

完整示例代码请参考:

本文导读目录