本文为您介绍如何使用MaxCompute Java SDK实现数据下载。
使用TableTunnel的DownloadSession接口实现数据下载
典型的表数据下载流程:
创建TableTunnel。
创建DownloadSession。
创建RecordReader,读取Record。
示例
import java.io.IOException;
import java.util.Date;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
public class DownloadSample {
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
private static String odpsUrl = "http://service.odps.aliyun.com/api";
private static String tunnelUrl = "https://dt.cn-shanghai-intranet.maxcompute.aliyun-inc.com";
//设置tunnelUrl,使用内网时需要设置,否则默认公网。
private static String project = "<your project>";
private static String table = "<your table name>";
private static String partition = "<your partition spec>";
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(project);
TableTunnel tunnel = new TableTunnel(odps);
tunnel.setEndpoint(tunnelUrl);//设置tunnelUrl。
PartitionSpec partitionSpec = new PartitionSpec(partition);
try {
DownloadSession downloadSession = tunnel.createDownloadSession(project, table,
partitionSpec);
System.out.println("Session Status is : "
+ downloadSession.getStatus().toString());
long count = downloadSession.getRecordCount();
System.out.println("RecordCount is: " + count);
RecordReader recordReader = downloadSession.openRecordReader(0,
count);
Record record;
while ((record = recordReader.read()) != null) {
consumeRecord(record, downloadSession.getSchema());
}
recordReader.close();
} catch (TunnelException e) {
e.printStackTrace();
} catch (IOException e1) {
e1.printStackTrace();
}
}
private static void consumeRecord(Record record, TableSchema schema) {
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
String colValue = null;
switch (column.getType()) {
case BIGINT: {
Long v = record.getBigint(i);
colValue = v == null ? null : v.toString();
break;
}
case BOOLEAN: {
Boolean v = record.getBoolean(i);
colValue = v == null ? null : v.toString();
break;
}
case DATETIME: {
Date v = record.getDatetime(i);
colValue = v == null ? null : v.toString();
break;
}
case DOUBLE: {
Double v = record.getDouble(i);
colValue = v == null ? null : v.toString();
break;
}
case STRING: {
String v = record.getString(i);
colValue = v == null ? null : v.toString();
break;
}
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
System.out.print(colValue == null ? "null" : colValue);
if (i != schema.getColumns().size())
System.out.print("\t");
}
System.out.println();
}
}
说明
上述示例给出的是华东2(上海)地域的云产品互联Tunnel Endpoint,其他Region的Tunnel Endpoint设置请参见Endpoint。
本例中,为方便测试,数据通过System.out.println
直接打印出来。在实际使用时,您可改写为直接输出到文本文件。
使用InstanceTunnel的DownloadSession接口实现数据下载
Odps odps = OdpsUtils.newDefaultOdps(); // 初始化Odps对象。
Instance i = SQLTask.run(odps, "select * from wc_in;");
i.waitForSuccess();
//创建InstanceTunnel。
InstanceTunnel tunnel = new InstanceTunnel(odps);
//根据instance id,创建DownloadSession。
InstanceTunnel.DownloadSession session = tunnel.createDownloadSession(odps.getDefaultProject(), i.getId());
long count = session.getRecordCount();
//输出结果条数。
System.out.println(count);
//获取数据的写法与TableTunnel一样。
TunnelRecordReader reader = session.openRecordReader(0, count);
Record record;
while ((record = reader.read()) != null) {
for (int col = 0; col < session.getSchema().getColumns().size(); ++col) {
//wc_in表字段均为STRING,这里直接打印输出。
System.out.println(record.get(col));
}
}
reader.close();
通过使用SQLTask.getResultSet()
静态方法实现数据下载
Odps odps = OdpsUtils.newDefaultOdps(); //初始化Odps对象。
Instance i = SQLTask.run(odps, "select * from wc_in;");
i.waitForSuccess();
//根据instance对象,获取结果迭代器。
ResultSet rs = SQLTask.getResultSet(i);
for (Record r : rs) {
//输出结果条数。
System.out.println(rs.getRecordCount());
for (int col = 0; col < rs.getTableSchema().getColumns().size(); ++col) {
//wc_in表字段均为STRING,这里直接打印输出。
System.out.println(r.get(col));
}
}
文档内容是否对您有帮助?