本文通过代码示例向您介绍如何使用TableTunnel接口实现多线程下载,暂不支持其他接口使用多线程下载。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
class DownloadThread implements Callable<Long> {
private long id;
private RecordReader recordReader;
private TableSchema tableSchema;
public DownloadThread(int id, RecordReader recordReader, TableSchema tableSchema) {
this.id = id;
this.recordReader = recordReader;
this.tableSchema = tableSchema;
}
@Override
public Long call() {
Long recordNum = 0L;
try {
Record record;
while ((record = recordReader.read()) != null) {
recordNum++;
System.out.print("Thread " + id + "\t");
consumeRecord(record, tableSchema);
}
recordReader.close();
} catch (IOException e) {
e.printStackTrace();
}
return recordNum;
}
private static void consumeRecord(Record record, TableSchema schema) {
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
String colValue = null;
switch (column.getType()) {
case BIGINT: {
Long v = record.getBigint(i);
colValue = v == null ? null : v.toString();
break;
}
case BOOLEAN: {
Boolean v = record.getBoolean(i);
colValue = v == null ? null : v.toString();
break;
}
case DATETIME: {
Date v = record.getDatetime(i);
colValue = v == null ? null : v.toString();
break;
}
case DOUBLE: {
Double v = record.getDouble(i);
colValue = v == null ? null : v.toString();
break;
}
case STRING: {
String v = record.getString(i);
colValue = v == null ? null : v.toString();
break;
}
default:
throw new RuntimeException("Unknown column type: " + column.getType());
}
System.out.print(colValue == null ? "null" : colValue);
if (i != this.tableSchema.getColumns().size() - 1) System.out.print("\t");
}
System.out.println();
}
}
public class DownloadThreadSample {
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String tunnelUrl = "http://dt.cn-shanghai.maxcompute.aliyun-inc.com";
//设置tunnelUrl,若为内网时必须设置,否则默认公网。此处给的是华东2地域的云产品互联网络Tunnel Endpoint
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
private static int threadNum = 10;
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(tunnelUrl);
//tunnelUrl设置
PartitionSpec partitionSpec = new PartitionSpec(partition);
DownloadSession downloadSession;
try {
downloadSession = tunnel.createDownloadSession(project, table, partitionSpec);
System.out.println("Session Status is : " + downloadSession.getStatus().toString());
long count = downloadSession.getRecordCount();
System.out.println("RecordCount is: " + count);
ExecutorService pool = Executors.newFixedThreadPool(threadNum);
ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>();
long start = 0;
long step = count / threadNum;
for (int i = 0; i < threadNum - 1; i++) {
RecordReader recordReader = downloadSession.openRecordReader(step * i, step);
callers.add(new DownloadThread(i, recordReader, downloadSession.getSchema()));
}
RecordReader recordReader = downloadSession.openRecordReader(step * (threadNum - 1), count - ((threadNum - 1) * step));
callers.add(new DownloadThread(threadNum - 1, recordReader, downloadSession.getSchema()));
Long downloadNum = 0L;
List<Future<Long>> recordNum = pool.invokeAll(callers);
for (Future<Long> num : recordNum) downloadNum += num.get();
System.out.println("Record Count is: " + downloadNum);
pool.shutdown();
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
说明
对于Tunnel Endpoint,支持指定或者不指定。
如果指定,按照指定的Endpoint下载。
如果不指定,默认为公网Endpoint。
文中使用的是华东2区域的云产品互联网络Tunnel Endpoint,其他地域的Tunnel Endpoint设置可以参考文档配置Endpoint。
文档内容是否对您有帮助?