查询加速MCQA(MaxCompute Query Acceleration)是MaxCompute内建的功能,使用原生的MaxCompute SQL语言,并且支持所有的MaxCompute内建函数以及权限系统。本文为您介绍如何使用MCQA功能。
背景信息
MCQA功能支持的接入方式如下:
- MaxCompute客户端:无缝接入,使用说明请参见MaxCompute客户端。
- DataWorks临时查询或数据开发:默认无缝接入,使用说明请参见DataWorks临时查询或数据开发。
- JDBC:无缝接入,使用说明请参见JDBC。
- SDK:需要配置Pom依赖,使用说明请参见基于Java SDK启用MCQA功能。
MaxCompute客户端
您可以通过MaxCompute客户端开启MCQA功能,操作步骤如下:
DataWorks临时查询或数据开发
DataWorks的临时查询及手动业务流程模块默认开启MCQA功能,您无需手动开启。如果您需要关闭MCQA功能,请提工单处理。
JDBC
JDBC使用场景如下:
- 使用JDBC连接MaxCompute,详情请参见JDBC使用说明。如果您需要在该场景下开启MCQA功能,需要修改相关配置,详情请参见配置JDBC启用MCQA功能。
- 使用JDBC连接Tableau,对MaxCompute中的数据进行可视化分析,详情请参见配置JDBC使用Tableau。如果您需要在该场景下开启MCQA功能,需要修改相关配置,详情请参见基于JDBC配置Tableau启用MCQA功能。
- 使用JDBC连接SQL Workbench/J,使用SQL Workbench/J对MaxCompute中的数据执行SQL语句,详情请参见配置JDBC使用SQL Workbench/J。如果您需要在该场景下开启MCQA功能,需要修改相关配置,详情请参见基于JDBC配置SQLWorkBench启用MCQA功能。
配置JDBC启用MCQA功能
使用JDBC连接MaxCompute时,您可以通过执行如下操作开启MCQA功能。使用JDBC连接MaxCompute的操作详情请参见JDBC使用说明。
基于JDBC配置Tableau启用MCQA功能
服务器增加interactiveMode=true
属性,用于开启MCQA功能。建议您同步增加enableOdpsLogger=true
属性,用于打印日志。配置操作详情请参见配置JDBC使用Tableau。
完整的服务器配置示例如下。
http://service.cn-beijing.maxcompute.aliyun.com/api?project=****_beijing&interactiveMode=true&enableOdpsLogger=true
如果只对项目空间中的部分表进行Tableau操作,您可以在服务器参数中增加table_list=table_name1, table_name2
属性选择需要的表,表之间用英文逗号(,)分隔。如果表过多,会导致Tableau打开缓慢,强烈建议使用此方式只载入需要的表。示例如下。http://service.cn-beijing.maxcompute.aliyun.com/api?project=****_beijing&interactiveMode=true&enableOdpsLogger=true&table_list=orders,customers
对于有大量分区的表不建议把所有分区的数据都设置成数据源,可以筛选需要的分区,或通过自定义SQL获取需要的数据。基于JDBC配置SQLWorkBench启用MCQA功能
完成JDBC驱动配置后,在Profile配置界面修改已填写的JDBC URL,支持SQLWorkBench使用MCQA功能。Profile配置操作详情请参见配置JDBC使用SQL Workbench/J。
需要配置的URL格式为
jdbc:odps:<MaxCompute_endpoint>?project=<MaxCompute_project_name>&accessId=<AccessKey
ID>&accessKey=<AccessKey Secret>&charset=UTF-8&interactiveMode=true
,参数说明如下:
- maxcompute_endpoint:MaxCompute服务所在区域的Endpoint。详情请参见配置Endpoint。
- maxcompute_project_name:MaxCompute项目空间名称。
- AccessKey ID:有访问指定项目空间权限的AccessKey ID。
- AccessKey Secret:AccessKey ID对应的AccessKey Secret。
- charset=UTF-8:字符集编码格式。
- interactiveMode:MCQA功能开关,
ture
表示开启MCQA功能。
基于Java SDK启用MCQA功能
Java SDK详情请参见Java SDK介绍。您需要通过Maven配置Pom依赖,配置示例如下。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.35.5-public</version>
</dependency>
说明 版本需要为0.35.1及以上版本。
创建Java程序,命令示例如下。
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.ResultSet;
import com.aliyun.odps.sqa.*;
import java.io.IOException;
import java.util.*;
public class SQLExecutorExample {
public static void SimpleExample() {
// 设置账号和项目信息。
Account account = new AliyunAccount("your_access_id", "your_access_key");
Odps odps = new Odps(account);
odps.setDefaultProject("your_project_name");
odps.setEndpoint("http://service.<regionid>.maxcompute.aliyun.com/api");
// 准备构建SQLExecutor。
SQLExecutorBuilder builder = SQLExecutorBuilder.builder();
SQLExecutor sqlExecutor = null;
try {
// run in offline mode or run in interactive mode
if (false) {
// 创建一个默认执行离线SQL的Executor。
sqlExecutor = builder.odps(odps).executeMode(ExecuteMode.OFFLINE).build();
} else {
// 创建一个默认执行查询加速SQL的Executor。
sqlExecutor = builder.odps(odps).executeMode(ExecuteMode.INTERACTIVE).build();
}
// 如果需要的话可以传入查询的特殊设置。
Map<String, String> queryHint = new HashMap<>();
queryHint.put("odps.sql.mapper.split.size", "128");
// 提交一个查询作业,支持传入Hint。
sqlExecutor.run("select count(1) from test_table;", queryHint);
// 列举一些支持的常用获取信息的接口。
// UUID
System.out.println("ExecutorId:" + sqlExecutor.getId());
// 当前查询作业的logview。
System.out.println("Logview:" + sqlExecutor.getLogView());
// 当前查询作业的Instance对象(Interactive模式多个查询作业可能为同一个Instance)。
System.out.println("InstanceId:" + sqlExecutor.getInstance().getId());
// 当前查询作业的阶段进度(Console的进度条)
System.out.println("QueryStageProgress:" + sqlExecutor.getProgress());
// 当前查询作业的执行状态变化日志,例如回退信息。
System.out.println("QueryExecutionLog:" + sqlExecutor.getExecutionLog());
// 提供两种获取结果的接口。
if(false) {
// 直接获取全部查询作业结果,同步接口,可能会占用本线程直到查询成功或失败。
List<Record> records = sqlExecutor.getResult();
printRecords(records);
} else {
// 获取查询结果的迭代器ResultSet,同步接口,可能会占用本线程直到查询成功或失败。
ResultSet resultSet = sqlExecutor.getResultSet();
while (resultSet.hasNext()) {
printRecord(resultSet.next());
}
}
// run another query
sqlExecutor.run("select * from test_table;", new HashMap<>());
if(false) {
// 直接获取全部查询结果,同步接口,可能会占用本线程直到查询成功或失败。
List<Record> records = sqlExecutor.getResult();
printRecords(records);
} else {
// 获取查询结果的迭代器ResultSet,同步接口,可能会占用本线程直到查询成功或失败。
ResultSet resultSet = sqlExecutor.getResultSet();
while (resultSet.hasNext()) {
printRecord(resultSet.next());
}
}
} catch (OdpsException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
if (sqlExecutor != null) {
// 关闭Executor释放相关资源。
sqlExecutor.close();
}
}
}
// SQLExecutor can be reused by pool mode
public static void ExampleWithPool() {
// 设置账号和项目信息。
Account account = new AliyunAccount("your_access_id", "your_access_key");
Odps odps = new Odps(account);
odps.setDefaultProject("your_project_name");
odps.setEndpoint("http://service.<regionid>.maxcompute.aliyun.com/api");
// 通过连接池方式执行查询。
SQLExecutorPool sqlExecutorPool = null;
SQLExecutor sqlExecutor = null;
try {
// 准备连接池,设置连接池大小和默认执行模式。
SQLExecutorPoolBuilder builder = SQLExecutorPoolBuilder.builder();
builder.odps(odps)
.initPoolSize(1) // init pool executor number
.maxPoolSize(5) // max executors in pool
.executeMode(ExecuteMode.INTERACTIVE); // run in interactive mode
sqlExecutorPool = builder.build();
// 从连接池中获取一个Executor,如果不够将会在Max限制内新增Executor。
sqlExecutor = sqlExecutorPool.getExecutor();
// Executor具体用法和上一示例一致。
sqlExecutor.run("select count(1) from test_table;", new HashMap<>());
System.out.println("InstanceId:" + sqlExecutor.getId());
System.out.println("Logview:" + sqlExecutor.getLogView());
List<Record> records = sqlExecutor.getResult();
printRecords(records);
} catch (OdpsException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
sqlExecutor.close();
}
sqlExecutorPool.close();
}
private static void printRecord(Record record) {
for (int k = 0; k < record.getColumnCount(); k++) {
if (k != 0) {
System.out.print("\t");
}
if (record.getColumns()[k].getType().equals(OdpsType.STRING)) {
System.out.print(record.getString(k));
} else if (record.getColumns()[k].getType().equals(OdpsType.BIGINT)) {
System.out.print(record.getBigint(k));
} else {
System.out.print(record.get(k));
}
}
}
private static void printRecords(List<Record> records) {
for (Record record : records) {
printRecord(record);
System.out.println();
}
}
public static void main(String args[]) {
SimpleExample();
ExampleWithPool();
}
}
在文档使用中是否遇到以下问题
更多建议
匿名提交