本文通过代码示例向您介绍如何使用TableTunnel接口实现多线程下载。

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;

class DownloadThread implements Callable<Long> {
    private long id;
    private RecordReader recordReader;
    private TableSchema tableSchema;

    public DownloadThread(int id, RecordReader recordReader, TableSchema tableSchema) {
        this.id = id;
        this.recordReader = recordReader;
        this.tableSchema = tableSchema;
    }

    @Override
    public Long call() {
        Long recordNum = 0L;
        try {
            Record record;
            while ((record = recordReader.read()) != null) {
                recordNum++;
                System.out.print("Thread " + id + "\t");
                consumeRecord(record, tableSchema);
            }
            recordReader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return recordNum;
    }

    private static void consumeRecord(Record record, TableSchema schema) {
        for (int i = 0; i < schema.getColumns().size(); i++) {
            Column column = schema.getColumn(i);
            String colValue = null;
            switch (column.getType()) {
                case BIGINT: {
                    Long v = record.getBigint(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                case BOOLEAN: {
                    Boolean v = record.getBoolean(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                case DATETIME: {
                    Date v = record.getDatetime(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                case DOUBLE: {
                    Double v = record.getDouble(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                case STRING: {
                    String v = record.getString(i);
                    colValue = v == null ? null : v.toString();
                    break;
                }
                default:
                    throw new RuntimeException("Unknown column type: " + column.getType());
            }
            System.out.print(colValue == null ? "null" : colValue);
            if (i != schema.getColumns().size()) System.out.print("\t");
        }
        System.out.println();
    }
}

public class DownloadThreadSample {
    private static String accessId = "<your access id>";
    private static String accessKey = "<your access Key>";
    private static String odpsUrl = "http://service.odps.aliyun.com/api";
    private static String tunnelUrl = "http://dt.cn-shanghai.maxcompute.aliyun-inc.com";
    //设置tunnelUrl,若需要走内网时必须设置,否则默认公网。此处给的是华东2经典网络Tunnel Endpoint,其他region请参见文档访问域名和数据中心。
    private static String project = "<your project>";
    private static String table = "<your table name>";
    private static String partition = "<your partition spec>";
    private static int threadNum = 10;

    public static void main(String args[]) {
        Account account = new AliyunAccount(accessId, accessKey);
        Odps odps = new Odps(account);
        odps.setEndpoint(odpsUrl);
        odps.setDefaultProject(project);
        TableTunnel tunnel = new TableTunnel(odps);
        tunnel.setEndpoint(tunnelUrl);
        //tunnelUrl设置 
        PartitionSpec partitionSpec = new PartitionSpec(partition);
        DownloadSession downloadSession;
        try {
            downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);
            System.out.println("Session Status is : " + downloadSession.getStatus().toString());
            long count = downloadSession.getRecordCount();
            System.out.println("RecordCount is: " + count);
            ExecutorService pool = Executors.newFixedThreadPool(threadNum);
            ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>();
            long start = 0;
            long step = count / threadNum;
            for (int i = 0; i < threadNum - 1; i++) {
                RecordReader recordReader = downloadSession.openRecordReader(step * i, step);
                callers.add(new DownloadThread(i, recordReader, downloadSession.getSchema()));
            }
            RecordReader recordReader = downloadSession.openRecordReader(step * (threadNum - 1), count - ((threadNum - 1) * step));
            callers.add(new DownloadThread(threadNum - 1, recordReader, downloadSession.getSchema()));
            Long downloadNum = 0L;
            List<Future<Long>> recordNum = pool.invokeAll(callers);
            for (Future<Long> num : recordNum) downloadNum += num.get();
            System.out.println("Record Count is: " + downloadNum);
            pool.shutdown();
        } catch (TunnelException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}
说明
对于Tunnel Endpoint,支持指定或者不指定。
  • 如果指定,按照指定的Endpoint下载。
  • 如果不指定,默认为公网Endpoint。
  • 文中给出的是华东2经典网络Tunnel Endpoint,其他region的Tunnel Endpoint设置可以参考文档访问域名和数据中心