全部产品
MaxCompute

SQL运行结果的导出方法汇总

更新时间:2017-05-02 14:14:15   分享:   

本文通过几个例子,介绍了几种下载MaxCompute SQL计算结果的方法。为了减少篇幅,所有的SDK部分都只举例介绍Java的例子。

导出方式有:

  • 如果数据比较少,可以直接用SQL Task得到全部的查询结果。
  • 如果只是想导出某个表或者分区,可以用Tunnel直接导出数据。
  • 如果SQL比较复杂,需要Tunnel和SQL相互配合才行。
  • 大数据开发套件可以方便地帮我们运行SQL,同步数据,并有定时调度,配置任务依赖的功能。
  • 开源工具DataX能帮助我们很方便把MaxCompute里的数据导出到目标数据源,具体请看该文档,本文不做具体介绍。

SQLTask方式导出

SQL Task是SDK直接调用MaxCompute SQL的接口,能很方便得运行SQL并获得其返回结果。从文档可以看到,SQLTask.getResult(i); 返回的是一个List,可以循环迭代这个List,获得完整的SQL计算返回结果。不过这个方法有个缺陷,可以参考这里提到的SetProject READ_TABLE_MAX_ROW的功能。目前Select语句返回给客户端的数据条数最大可以调整到1万。也就是说如果在客户端上(包括SQLTask)直接Select,那相当于查询结果上最后加了个Limit N(如果是CREATE TABLE XX AS SELECT或者用INSERT INTO/OVERWRITE TABLE把结果固化到具体的表里就没关系)。

Tunnel方式导出

如果需要导出的查询结果就是某张表的全部内容(或者是具体的某个分区的全部内容),可以用Tunnel来做。官网提供了命令行工具和基于SDK编写的Tunnel SDK

在此提供一个Tunnel命令行导出数据的简单例子,Tunnel SDK的编写是在有一些命令行没办法支持的情况下才需要考虑,具体使用请阅读文档

  1. tunnel d wc_out c:\wc_out.dat;
  2. 2016-12-16 19:32:08 - new session: 201612161932082d3c9b0a012f68e7 total lines: 3
  3. 2016-12-16 19:32:08 - file [0]: [0, 3), c:\wc_out.dat
  4. downloading 3 records into 1 file
  5. 2016-12-16 19:32:08 - file [0] start
  6. 2016-12-16 19:32:08 - file [0] OK. total: 21 bytes
  7. download OK

SQLTask+Tunnel方式导出

从前面SQL Task方式导出介绍可以看到,SQLTask不能处理超过1万条记录,而Tunnel可以,两者存在互补。所以可以基于两者实现数据的导出。以下用一个代码的例子来实现:

  1. private static final String accessId = "userAccessId";
  2. private static final String accessKey = "userAccessKey";
  3. private static final String endPoint = "http://service.odps.aliyun.com/api";
  4. private static final String project = "userProject";
  5. private static final String sql = "userSQL";
  6. private static final String table = "Tmp_" + UUID.randomUUID().toString().replace("-", "_");//其实也就是随便找了个随机字符串作为临时表的名字
  7. private static final Odps odps = getOdps();
  8. public static void main(String[] args) {
  9. System.out.println(table);
  10. runSql();
  11. tunnel();
  12. }
  13. /*
  14. * 把SQLTask的结果下载过来
  15. * */
  16. private static void tunnel() {
  17. TableTunnel tunnel = new TableTunnel(odps);
  18. try {
  19. DownloadSession downloadSession = tunnel.createDownloadSession(
  20. project, table);
  21. System.out.println("Session Status is : "
  22. + downloadSession.getStatus().toString());
  23. long count = downloadSession.getRecordCount();
  24. System.out.println("RecordCount is: " + count);
  25. RecordReader recordReader = downloadSession.openRecordReader(0,
  26. count);
  27. Record record;
  28. while ((record = recordReader.read()) != null) {
  29. consumeRecord(record, downloadSession.getSchema());
  30. }
  31. recordReader.close();
  32. } catch (TunnelException e) {
  33. e.printStackTrace();
  34. } catch (IOException e1) {
  35. e1.printStackTrace();
  36. }
  37. }
  38. /*
  39. * 保存这条数据
  40. * 数据量少的话直接打印后拷贝走也是一种取巧的方法。实际场景可以用Java.io写到本地文件,或者写到远端数据等各种目标保存起来。
  41. * */
  42. private static void consumeRecord(Record record, TableSchema schema) {
  43. System.out.println(record.getString("username")+","+record.getBigint("cnt"));
  44. }
  45. /*
  46. * 运行SQL,把查询结果保存成临时表,方便后面用Tunnel下载
  47. * 这里保存数据的lifecycle为1天,所以哪怕删除步骤出了问题,也不会太浪费存储空间
  48. * */
  49. private static void runSql() {
  50. Instance i;
  51. StringBuilder sb = new StringBuilder("Create Table ").append(table)
  52. .append(" lifecycle 1 as ").append(sql);
  53. try {
  54. System.out.println(sb.toString());
  55. i = SQLTask.run(getOdps(), sb.toString());
  56. i.waitForSuccess();
  57. } catch (OdpsException e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. /*
  62. * 初始化MaxCompute(原ODPS)的连接信息
  63. * */
  64. private static Odps getOdps() {
  65. Account account = new AliyunAccount(accessId, accessKey);
  66. Odps odps = new Odps(account);
  67. odps.setEndpoint(endPoint);
  68. odps.setDefaultProject(project);
  69. return odps;
  70. }

大数据开发套件的数据同步方式导出

前面介绍的方式解决了数据下载后保存的问题,但是没解决数据的生成以及两个步骤之间的调度依赖的问题。

本小节介绍的数加·大数据开发套件这个产品,可以运行SQL、配置数据同步任务,还可以设置自动周期性运行还有多任务之间的依赖,彻底解决了前面的烦恼。

我们将用一个简单例子介绍如何通过大数据开发套件运行SQL并配置数据同步任务完成数据生成和导出需求。

步骤一:创建一个工作流,工作流里创建一个SQL节点和一个数据同步节点,并将两个节点连线配置成依赖关系,SQL节点作为数据产出的节点,数据同步节点作为数据导出节点。

1

步骤二:配置SQL节点。注意,SQL这里的创建表要先执行一次再去配置同步(否则表都没有,同步任务没办法配置)

2

步骤三:配置数据同步任务。

3

步骤四:工作流调度配置好后(可以直接使用默认配置),保存提交工作流,然后操作测试运行。对数据同步任务查看运行日志可看到

  1. 2016-12-17 23:43:46.394 [job-15598025] INFO JobContainer -
  2. 任务启动时刻 : 2016-12-17 23:43:34
  3. 任务结束时刻 : 2016-12-17 23:43:46
  4. 任务总计耗时 : 11s
  5. 任务平均流量 : 31.36KB/s
  6. 记录写入速度 : 1668rec/s
  7. 读出记录总数 : 16689
  8. 读写失败总数 : 0

到mysql里查看数据同步结果。

4

本文导读目录
本文导读目录
以上内容是否对您有帮助?