本文为您介绍如何通过StreamRecordPack接口实现IO与业务逻辑异步化。
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
abstract class StoppableThread implements Callable<Boolean> {
private AtomicBoolean stop = new AtomicBoolean(false);
public void stop() {
stop.set(true);
}
public boolean isStop() {
return stop.get();
}
};
class FlushThread extends StoppableThread {
private BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue;
private BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue;
public FlushThread(BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue,
BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue) {
this.flushingQueue = flushingQueue;
this.flushedQueue = flushedQueue;
}
@Override
public Boolean call() throws Exception {
while (!isStop()) {
TableTunnel.StreamRecordPack pack = flushingQueue.poll(1000, TimeUnit.MILLISECONDS);
if (pack != null) {
int retry = 0;
while (retry < 3) {
try {
// flush成功表示数据写入成功,写入成功后数据立即可见。
// flush成功后pack对象可以复用,避免频繁申请内存导致内存回收。
// flush失败可以直接重试。
// flush失败后pack对象不可重用,需要重新创建新的StreamRecordPack对象。
String traceId = pack.flush();
flushedQueue.offer(pack, 1000, TimeUnit.MILLISECONDS);
System.out.println("flush success:" + traceId);
break;
} catch (IOException e) {
retry++;
e.printStackTrace();
Thread.sleep(500);
}
}
}
}
return null;
}
};
class WriteThread extends StoppableThread {
private BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue;
private BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue;
private String project;
private String table;
private String partition;
private TableTunnel tunnel;
public WriteThread(BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue,
BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue,
String project,
String table,
String partition,
TableTunnel tunnel) {
this.flushingQueue = flushingQueue;
this.flushedQueue = flushedQueue;
this.project = project;
this.table = table;
this.partition = partition;
this.tunnel = tunnel;
}
@Override
public Boolean call() {
try {
PartitionSpec partitionSpec = new PartitionSpec(partition);
TableTunnel.StreamUploadSession uploadSession = tunnel.buildStreamUploadSession(project, table).setPartitionSpec(partitionSpec).build();
TableSchema schema = uploadSession.getSchema();
Record record = uploadSession.newRecord();
for (int i = 0; i < schema.getColumns().size(); i++) {
Column column = schema.getColumn(i);
switch (column.getType()) {
case BIGINT:
record.setBigint(i, 1L);
break;
case BOOLEAN:
record.setBoolean(i, true);
break;
case DATETIME:
record.setDatetime(i, new Date());
break;
case DOUBLE:
record.setDouble(i, 0.0);
break;
case STRING:
record.setString(i, "sample");
break;
default:
throw new RuntimeException("Unknown column type: "
+ column.getType());
}
}
while (!isStop()) {
TableTunnel.StreamRecordPack pack = flushedQueue.poll();
if (pack == null) {
pack = uploadSession.newRecordPack();
}
try {
for (int i = 0; i < 10; i++) {
pack.append(record);
}
while (!isStop() && !flushingQueue.offer(pack, 1000, TimeUnit.MILLISECONDS)) {
System.out.println("flushing Queue full");
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch(TunnelException e){
e.printStackTrace();
} catch(IOException e){
e.printStackTrace();
}
return true;
}
}
public class StreamUploadAsyncIOSample {
// 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户
// 此处以把AccessKey 和 AccessKeySecret 保存在环境变量为例说明。您也可以根据业务需要,保存到配置文件里
// 强烈建议不要把 AccessKey 和 AccessKeySecret 保存到代码里,会存在密钥泄漏风险
private static String accessId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// MaxCompute项目的Endpoint信息,详情请参见Endpoint。
private static String odpsEndpoint = "<endpoint>";
// MaxCompute项目的Tunnel Endpoint信息,详情请参见Endpoint。
private static String tunnelEndpoint = "<tunnel_endpoint>";
// MaxCompute项目的名称。
private static String project = "<your_project>";
// MaxCompute项目中的表名称。
private static String table = "<your_table_name>";
// MaxCompute项目中的表的分区信息。
private static String partition = "<your_partition_spec>";
private static int writeThreadNum = 10;
private static int flushThreadNum = 10;
private static int flushQueueSize = 100;
private static BlockingDeque<TableTunnel.StreamRecordPack> flushingQueue = new LinkedBlockingDeque<>(flushQueueSize);
private static BlockingDeque<TableTunnel.StreamRecordPack> flushedQueue = new LinkedBlockingDeque<>();
public static void main(String args[]) {
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(odpsEndpoint);
odps.setDefaultProject(project);
try {
TableTunnel tunnel = new TableTunnel(odps);
// tunnel.setEndpoint(tunnelEndpoint);
ExecutorService pool = Executors.newFixedThreadPool(writeThreadNum + flushThreadNum);
ArrayList<StoppableThread> threads = new ArrayList<StoppableThread>();
for (int i = 0; i < writeThreadNum; i++) {
threads.add(new WriteThread(flushingQueue, flushedQueue, project, table, partition, tunnel));
}
for (int i = 0; i < flushThreadNum; i++) {
threads.add(new FlushThread(flushingQueue, flushedQueue));
}
pool.invokeAll(threads);
for (StoppableThread thread : threads) {
thread.stop();
}
pool.shutdown();
System.out.println("upload success!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
文档内容是否对您有帮助?