全部产品
存储与CDN 数据库 安全 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网
E-MapReduce

MR+TableStore

更新时间:2017-06-07 13:26:11

MR + TableStore

MR接入TableStore

  • 准备一张数据表

创建一张表pet,其中name为主键。

name owner species sex birth death
Fluffy Harold cat f 1993-02-04
Claws Gwen cat m 1994-03-17
Buffy Harold dog f 1989-05-13
Fang Benny dog m 1990-08-27
Bowser Diane dog m 1979-08-31 1995-07-29
Chirpy Gwen bird f 1998-09-11
Whistler Gwen bird 1997-12-09
Slim Benny snake m 1996-04-29
Puffball Diane hamster f 1999-03-30
  • 下面这个例子示例了如何在MR中消费TableStore中的数据。
  1. public class RowCounter {
  2. public static class RowCounterMapper
  3. extends Mapper<PrimaryKeyWritable, RowWritable, Text, LongWritable> {
  4. private final static Text agg = new Text("TOTAL");
  5. private final static LongWritable one = new LongWritable(1);
  6. @Override public void map(PrimaryKeyWritable key, RowWritable value,
  7. Context context) throws IOException, InterruptedException {
  8. context.write(agg, one);
  9. }
  10. }
  11. public static class IntSumReducer
  12. extends Reducer<Text,LongWritable,Text,LongWritable> {
  13. @Override public void reduce(Text key, Iterable<LongWritable> values,
  14. Context context) throws IOException, InterruptedException {
  15. long sum = 0;
  16. for (LongWritable val : values) {
  17. sum += val.get();
  18. }
  19. context.write(key, new LongWritable(sum));
  20. }
  21. }
  22. private static RangeRowQueryCriteria fetchCriteria() {
  23. RangeRowQueryCriteria res = new RangeRowQueryCriteria("pet");
  24. res.setMaxVersions(1);
  25. List<PrimaryKeyColumn> lower = new ArrayList<PrimaryKeyColumn>();
  26. List<PrimaryKeyColumn> upper = new ArrayList<PrimaryKeyColumn>();
  27. lower.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MIN));
  28. upper.add(new PrimaryKeyColumn("name", PrimaryKeyValue.INF_MAX));
  29. res.setInclusiveStartPrimaryKey(new PrimaryKey(lower));
  30. res.setExclusiveEndPrimaryKey(new PrimaryKey(upper));
  31. return res;
  32. }
  33. public static void main(String[] args) throws Exception {
  34. Configuration conf = new Configuration();
  35. job.setJarByClass(RowCounter.class);
  36. job.setMapperClass(RowCounterMapper.class);
  37. job.setCombinerClass(IntSumReducer.class);
  38. job.setReducerClass(IntSumReducer.class);
  39. job.setOutputKeyClass(Text.class);
  40. job.setOutputValueClass(LongWritable.class);
  41. job.setInputFormatClass(TableStoreInputFormat.class);
  42. TableStore.setCredential(job, accessKeyId, accessKeySecret, securityToken);
  43. TableStore.setEndpoint(job, endpoint, instance);
  44. TableStoreInputFormat.addCriteria(job, fetchCriteria());
  45. FileOutputFormat.setOutputPath(job, new Path(outputPath));
  46. System.exit(job.waitForCompletion(true) ? 0 : 1);
  47. }
  48. }
  • 下面这个例子示例了如何在MR中将数据写到TableStore。
  1. public static class OwnerMapper
  2. extends Mapper<PrimaryKeyWritable, RowWritable, Text, MapWritable> {
  3. @Override public void map(PrimaryKeyWritable key, RowWritable row,
  4. Context context) throws IOException, InterruptedException {
  5. PrimaryKeyColumn pet = key.getPrimaryKey().getPrimaryKeyColumn("name");
  6. Column owner = row.getRow().getLatestColumn("owner");
  7. Column species = row.getRow().getLatestColumn("species");
  8. MapWritable m = new MapWritable();
  9. m.put(new Text(pet.getValue().asString()),
  10. new Text(species.getValue().asString()));
  11. context.write(new Text(owner.getValue().asString()), m);
  12. }
  13. }
  14. public static class IntoTableReducer
  15. extends Reducer<Text,MapWritable,Text,BatchWriteWritable> {
  16. @Override public void reduce(Text owner, Iterable<MapWritable> pets,
  17. Context context) throws IOException, InterruptedException {
  18. List<PrimaryKeyColumn> pkeyCols = new ArrayList<PrimaryKeyColumn>();
  19. pkeyCols.add(new PrimaryKeyColumn("owner",
  20. PrimaryKeyValue.fromString(owner.toString())));
  21. PrimaryKey pkey = new PrimaryKey(pkeyCols);
  22. List<Column> attrs = new ArrayList<Column>();
  23. for(MapWritable petMap: pets) {
  24. for(Map.Entry<Writable, Writable> pet: petMap.entrySet()) {
  25. Text name = (Text) pet.getKey();
  26. Text species = (Text) pet.getValue();
  27. attrs.add(new Column(name.toString(),
  28. ColumnValue.fromString(species.toString())));
  29. }
  30. }
  31. RowPutChange putRow = new RowPutChange(outputTable, pkey)
  32. .addColumns(attrs);
  33. BatchWriteWritable batch = new BatchWriteWritable();
  34. batch.addRowChange(putRow);
  35. context.write(owner, batch);
  36. }
  37. }
  38. public static void main(String[] args) throws Exception {
  39. Configuration conf = new Configuration();
  40. Job job = Job.getInstance(conf, TableStoreOutputFormatExample.class.getName());
  41. job.setMapperClass(OwnerMapper.class);
  42. job.setReducerClass(IntoTableReducer.class);
  43. job.setMapOutputKeyClass(Text.class);
  44. job.setMapOutputValueClass(MapWritable.class);
  45. job.setInputFormatClass(TableStoreInputFormat.class);
  46. job.setOutputFormatClass(TableStoreOutputFormat.class);
  47. TableStore.setCredential(job, accessKeyId, accessKeySecret, securityToken);
  48. TableStore.setEndpoint(job, endpoint, instance);
  49. TableStoreInputFormat.addCriteria(job, ...);
  50. TableStoreOutputFormat.setOutputTable(job, outputTable);
  51. System.exit(job.waitForCompletion(true) ? 0 : 1);
  52. }

附录

完整示例代码请参考:

本文导读目录