说明
无地域属性的存储空间不支持使用 SelectObject。
背景信息 目前 Hadoop 3.0 已经支持 OSS 在 EMR 上运行 Spark、Hive、Presto 等服务,同时阿里云 MaxCompute 以及 Data Lake Analytics 均支持从 OSS 直接处理数据。
OSS 提供的GetObject 接口决定了大数据平台只能把 OSS 数据全部下载到本地然后进行分析过滤,在很多查询场景下浪费了大量带宽和客户端资源。
SelectObject 接口通过将条件和 Projection 下推到 OSS 层,只返回有用数据,减少带宽和处理量,节省 CPU 和内存资源,使基于 OSS 的数据分析更具吸引力。
费用说明 调用SelectObject 接口查询数据时,按扫描的原文件实际大小计费。更多信息,请参见数据处理费用 。
支持的文件类型 以下内容是对SelectObject 支持的文件类型、支持的 SQL 语法等的详细介绍。
RFC 4180 标准的 CSV(包括 TSV 等类 CSV 文件,行列分隔符和 Quote 字符可自定义)。
JSON 文件(UTF-8 编码),支持 DOCUMENT 和 LINES 两种格式:。
标准存储和低频访问存储类型的文件。归档、冷归档和深度冷归档存储类型文件需先执行解冻操作。
OSS 完全托管加密和 KMS 托管主密钥加密的文件。
支持的 SQL 语法 SQL 语句: Select From Where
数据类型:string、int(64bit)、double(64bit), decimal(128bit) 、timestamp、bool
操作: 逻辑条件(AND,OR,NOT), 算术表达式(+-*/%), 比较操作(>,=, <, >=, <=, !=),String 操作 (LIKE, || )
支持的数据类型 OSS 中的 CSV 数据默认是 String 类型,您可以使用 CAST 函数进行数据转换。
通过 SQL 查询语句将_1 和_2 转换为 int 的示例:Select * from OSSOBject where cast (_1 as int) > cast(_2 as int)
SelectObject 支持在 Where 条件中隐式转换,例如下面语句中的第一列和第二列将被转换成 int:
Select _1 from ossobject where _1 + _2 > 100
对于 JSON 文件,若 SQL 中未指定 cast 函数,其类型根据 JSON 数据的实际类型而定,标准 JSON 内建的数据类型包括 null、bool、int64、double、string 等类型。
常见的 SQL 用例 常见的 SQL 用例包括 CSV 及 JSON 两种。
CSV
应用场景
SQL 语句
返回前 10 行数据
select * from ossobject limit 10
返回第 1 列和第 3 列的整数,并且第 1 列大于第 3 列
select _1, _3 from ossobject where cast(_1 as int) > cast(_3 as int)
返回第 1 列以'陈'开头的记录的个数(注:此处 like 后的中文需要用 UTF-8 编码)
select count(*) from ossobject where _1 like '陈%'
返回所有第 2 列时间大于 2018-08-09 11:30:25 且第 3 列大于 200 的记录
select * from ossobject where _2 > cast('2018-08-09 11:30:25' as timestamp) and _3 > 200
返回第 2 列浮点数的平均值,总和,最大值,最小值
select AVG(cast(_6 as double)), SUM(cast(_6 as double)), MAX(cast(_6 as double)), MIN(cast(_6 as double)) from ossobject
返回第 1 列和第 3 列连接的字符串中以'Tom'为开头以’Anderson‘结尾的所有记录
select * from ossobject where (_1 || _3) like 'Tom%Anderson'
返回第 1 列能被 3 整除的所有记录
select * from ossobject where (_1 % 3) = 0
返回第 1 列大小在 1995 到 2012 之间的所有记录
select * from ossobject where _1 between 1995 and 2012
返回第 5 列值为 N,M,G,L 的所有记录
select * from ossobject where _5 in ('N', 'M', 'G', 'L')
返回第 2 列乘以第 3 列比第 5 列大 100 以上的所有记录
select * from ossobject where _2 * _3 > _5 + 100
JSON
假设 JSON 文件如下:
{
"contacts":[
{
"firstName": "John",
"lastName": "Smith",
"isAlive": true,
"age": 27,
"address": {
"streetAddress": "21 2nd Street",
"city": "New York",
"state": "NY",
"postalCode": "10021-3100"
},
"phoneNumbers": [
{
"type": "home",
"number": "212 555-1234"
},
{
"type": "office",
"number": "646 555-4567"
},
{
"type": "mobile",
"number": "123 456-7890"
}
],
"children": [],
"spouse": null
},…… #此处省略其他类似的节点
]}
SQL 用例如下:
应用场景
SQL 语句
返回所有 age 是 27 的记录
select * from ossobject.contacts[*] s where s.age = 27
返回所有的家庭电话
select s.number from ossobject.contacts[*].phoneNumbers[*] s where s.type = "home"
返回所有单身的记录
select * from ossobject s where s.spouse is null
返回所有没有孩子的记录
select * from ossobject s where s.children[0] is null
使用场景 SelectObject 通常用于大文件分片查询、JSON 文件查询、日志文件分析等场景。
大文件分片查询
和GetObject 提供的基于 Byte 的分片下载类似,SelectObject 也提供了分片查询的机制,包括以下两种分片方式:
如果确定 CSV 文件列中不包含换行符,则基于 Bytes 的分片由于不需要创建 Meta,其使用更为简便。如果列中包含换行符或者是 JSON 文件时,则使用以下步骤:
调用 CreateSelectObjectMeta API 获得该文件的总的 Split 数。如果该文件需要用 SelectObject,则建议在查询前异步调用该接口,以节省扫描时间。
根据客户端资源情况选择合适的并发度 n,用总的 Split 数除以并发度 n 得到每个分片查询应该包含的 Split 个数。
在请求 Body 中用诸如 split-range=1-20 的形式进行分片查询。
合并结果。
JSON 文件查询
查询 JSON 文件时,在 SQL 的 From 语句中尽可能缩小 From 后的 JSON Path 范围。
如下是 JSON 文件示例:
{
"contacts":[
{
"firstName": "John",
"lastName": "Smith",
"address": {
"streetAddress": "21 2nd Street",
"city": "New York",
"state": "NY",
"postalCode": "10021-3100"
},
"phoneNumbers": [
{
"type": "home",
"number": "212 555-1234"
},
{
"type": "office",
"number": "646 555-4567"
},
{
"type": "mobile",
"number": "123 456-7890"
}
]
}
]}
如果要查找所有 postalCode 为 10021 开头的 streetAddress,SQL 可以写为 select s.address.streetAddress from ossobject.contacts[*] s where s.address.postalCode like '10021%'
或者select s.streetAddress from ossobject.contacts[*].address s where s.postalCode like '10021%'
由于select s.streetAddress from ossobject.contacts[*].address s where s.postalCode like '10021%'
的 JSON Path 更加精确,因此性能更优。
在 JSON 文件中处理高精度浮点数
在 JSON 文件中需要进行高精度浮点数的数值计算时,建议设置 ParseJsonNumberAsString 选项为 true, 同时将该值 cast 成 Decimal。比如一个属性 a 值为 123456789.123456789,用select s.a from ossobject s where cast(s.a as decimal) > 123456789.12345
就可以保持原始数据的精度不丢失。
操作方式
使用 OSS 控制台
重要
通过控制台仅支持从 128 MB 以下的文件中提取 40 MB 以下的数据记录。
登录OSS 管理控制台 。
单击Bucket 列表 ,然后单击目标 Bucket 名称。
在左侧导航栏,选择文件管理 >文件列表 。
在目标文件右侧的操作栏下,选择。
在选取内容 面板,按以下说明设置各项参数。
参数
说明
文件类型
仅支持 CSV 和 JSON 两种文件类型。
分隔符
仅适用于 CSV 文件。请选择半角逗号(,)或自定义分隔符。
标题行
仅适用于 CSV 文件。请选择文件第一行是否包含列标题。
JSON 格式符
仅适用于 JSON 文件。请选择您的 JSON 文件对应的格式。
压缩格式
选择您当前的文件是否为压缩文件。目前压缩文件仅支持 GZIP 文件。
单击显示文件预览 。
重要
预览标准存储类型文件时,会产生 Select 扫描费用。预览低频访问、归档存储、冷归档存储或者深度冷归档存储类型文件时,会产生 Select 扫描费用和数据取回费用。更多信息,请参见数据处理费用 。
单击下一步 ,输入 SQL 语句并执行。
假设名为People 的 CSV 文件有 3 列数据,分别是姓名 、公司 和年龄 。
如果想查找年龄大于 50 岁,并且名字以 Lora 开头的人(其中_1,_2,_3 是列索引,代表第一列、第二列、第三列),可以执行以下 SQL 语句:
select * from ossobject where _1 like 'Lora*' and _3 > 50
如果想统计这个文件有多少行,最大年龄与最小年龄是多少,可以执行以下 SQL 语句:
select count(*), max(cast(_3 as int)), min(cast(_3 as int)) from ossobject
查看执行结果。
使用阿里云 SDK
当前仅支持通过 Java SDK 和 Python SDK 查询文件。
Java
import com.aliyun.oss.model.*;
import com.aliyun.oss.OSS;
import com.aliyun.oss.common.auth.*;
import com.aliyun.oss.OSSClientBuilder;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;
public class SelectObjectSample {
private static String endpoint = "https://oss-cn-hangzhou.aliyuncs.com" ;
private static String bucketName = "examplebucket" ;
public static void main (String[] args) throws Exception {
EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
String region = "cn-hangzhou" ;
ClientBuilderConfiguration clientBuilderConfiguration = new ClientBuilderConfiguration ();
clientBuilderConfiguration.setSignatureVersion(SignVersion.V4);
OSS ossClient = OSSClientBuilder.create()
.endpoint(endpoint)
.credentialsProvider(credentialsProvider)
.clientConfiguration(clientBuilderConfiguration)
.region(region)
.build();
selectCsvSample("test.csv" , ossClient);
selectJsonSample("test.json" , ossClient);
ossClient.shutdown();
}
private static void selectCsvSample (String key, OSS ossClient) throws Exception {
String content = "name,school,company,age\r\n" +
"Lora Francis,School A,Staples Inc,27\r\n" +
"Eleanor Little,School B,\"Conectiv, Inc\",43\r\n" +
"Rosie Hughes,School C,Western Gas Resources Inc,44\r\n" +
"Lawrence Ross,School D,MetLife Inc.,24" ;
ossClient.putObject(bucketName, key, new ByteArrayInputStream (content.getBytes()));
SelectObjectMetadata selectObjectMetadata = ossClient.createSelectObjectMetadata(
new CreateSelectObjectMetadataRequest (bucketName, key)
.withInputSerialization(
new InputSerialization ().withCsvInputFormat(
new CSVFormat ().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n" ))));
System.out.println(selectObjectMetadata.getCsvObjectMetadata().getTotalLines());
System.out.println(selectObjectMetadata.getCsvObjectMetadata().getSplits());
SelectObjectRequest selectObjectRequest =
new SelectObjectRequest (bucketName, key)
.withInputSerialization(
new InputSerialization ().withCsvInputFormat(
new CSVFormat ().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n" )))
.withOutputSerialization(new OutputSerialization ().withCsvOutputFormat(new CSVFormat ()));
selectObjectRequest.setExpression("select * from ossobject where _4 > 40" );
OSSObject ossObject = ossClient.selectObject(selectObjectRequest);
BufferedReader reader = new BufferedReader (new InputStreamReader (ossObject.getObjectContent()));
while (true ) {
String line = reader.readLine();
if (line == null ) {
break ;
}
System.out.println(line);
}
reader.close();
ossClient.deleteObject(bucketName, key);
}
private static void selectJsonSample (String key, OSS ossClient) throws Exception {
final String content = "{\n" +
"\t\"name\": \"Lora Francis\",\n" +
"\t\"age\": 27,\n" +
"\t\"company\": \"Staples Inc\"\n" +
"}\n" +
"{\n" +
"\t\"name\": \"Eleanor Little\",\n" +
"\t\"age\": 43,\n" +
"\t\"company\": \"Conectiv, Inc\"\n" +
"}\n" +
"{\n" +
"\t\"name\": \"Rosie Hughes\",\n" +
"\t\"age\": 44,\n" +
"\t\"company\": \"Western Gas Resources Inc\"\n" +
"}\n" +
"{\n" +
"\t\"name\": \"Lawrence Ross\",\n" +
"\t\"age\": 24,\n" +
"\t\"company\": \"MetLife Inc.\"\n" +
"}" ;
ossClient.putObject(bucketName, key, new ByteArrayInputStream (content.getBytes()));
SelectObjectRequest selectObjectRequest =
new SelectObjectRequest (bucketName, key)
.withInputSerialization(new InputSerialization ()
.withCompressionType(CompressionType.NONE)
.withJsonInputFormat(new JsonFormat ().withJsonType(JsonType.LINES)))
.withOutputSerialization(new OutputSerialization ()
.withCrcEnabled(true )
.withJsonOutputFormat(new JsonFormat ()))
.withExpression("select * from ossobject as s where s.age > 40" );
OSSObject ossObject = ossClient.selectObject(selectObjectRequest);
BufferedReader reader = new BufferedReader (new InputStreamReader (ossObject.getObjectContent()));
while (true ) {
String line = reader.readLine();
if (line == null ) {
break ;
}
System.out.println(line);
}
reader.close();
ossClient.deleteObject(bucketName, key);
}
}
Python
import oss2
from oss2.credentials import EnvironmentVariableCredentialsProvider
def select_call_back (consumed_bytes, total_bytes = None ):
print ('Consumed Bytes:' + str (consumed_bytes) + '\n' )
auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
region = "cn-hangzhou"
bucket = oss2.Bucket(auth, endpoint, "yourBucketName" , region=region)
key ='python_select.csv'
content ='Tom Hanks,USA,45\r\n' *1024
filename ='python_select.csv'
bucket.put_object(key, content)
csv_meta_params = {'RecordDelimiter' : '\r\n' }
select_csv_params = {'CsvHeaderInfo' : 'None' ,
'RecordDelimiter' : '\r\n' ,
'LineRange' : (500 , 1000 )}
csv_header = bucket.create_select_object_meta(key, csv_meta_params)
print (csv_header.rows)
print (csv_header.splits)
result = bucket.select_object(key, "select * from ossobject where _3 > 44" , select_call_back, select_csv_params)
select_content = result.read()
print (select_content)
result = bucket.select_object_to_file(key, filename,
"select * from ossobject where _3 > 44" , select_call_back, select_csv_params)
bucket.delete_object(key)
key = 'python_select.json'
content = "{\"contacts\":[{\"key1\":1,\"key2\":\"hello world1\"},{\"key1\":2,\"key2\":\"hello world2\"}]}"
filename = 'python_select.json'
bucket.put_object(key, content)
select_json_params = {'Json_Type' : 'DOCUMENT' }
result = bucket.select_object(key, "select s.key2 from ossobject.contacts[*] s where s.key1 = 1" , None , select_json_params)
select_content = result.read()
print (select_content)
result = bucket.select_object_to_file(key, filename,
"select s.key2 from ossobject.contacts[*] s where s.key1 = 1" , None , select_json_params)
bucket.delete_object(key)
key = 'python_select_lines.json'
content = "{\"key1\":1,\"key2\":\"hello world1\"}\n{\"key1\":2,\"key2\":\"hello world2\"}"
filename = 'python_select.json'
bucket.put_object(key, content)
select_json_params = {'Json_Type' : 'LINES' }
json_header = bucket.create_select_object_meta(key,select_json_params)
print (json_header.rows)
print (json_header.splits)
result = bucket.select_object(key, "select s.key2 from ossobject s where s.key1 = 1" , None , select_json_params)
select_content = result.read()
print (select_content)
result = bucket.select_object_to_file(key, filename,
"select s.key2 from ossobject s where s.key1 = 1" , None , select_json_params)
bucket.delete_object(key)
package main
import (
"context"
"flag"
"io"
"log"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss"
"github.com/aliyun/alibabacloud-oss-go-sdk-v2/oss/credentials"
)
var (
region string
bucketName string
objectName string
)
func init () {
flag.StringVar(®ion, "region" , "" , "The region in which the bucket is located." )
flag.StringVar(&bucketName, "bucket" , "" , "The name of the bucket." )
flag.StringVar(&objectName, "object" , "" , "The name of the object." )
}
func main () {
flag.Parse()
if len (bucketName) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, bucket name required" )
}
if len (region) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, region required" )
}
if len (objectName) == 0 {
flag.PrintDefaults()
log.Fatalf("invalid parameters, object name required" )
}
cfg := oss.LoadDefaultConfig().
WithCredentialsProvider(credentials.NewEnvironmentVariableCredentialsProvider()).
WithRegion(region)
client := oss.NewClient(cfg)
request := &oss.SelectObjectRequest{
Bucket: oss.Ptr(bucketName),
Key: oss.Ptr(objectName),
SelectRequest: &oss.SelectRequest{
Expression: oss.Ptr("select * from ossobject limit 10" ),
InputSerializationSelect: oss.InputSerializationSelect{
CsvBodyInput: &oss.CSVSelectInput{
FileHeaderInfo: oss.Ptr("Use" ),
},
},
OutputSerializationSelect: oss.OutputSerializationSelect{
OutputHeader: oss.Ptr(true ),
},
},
}
result, err := client.SelectObject(context.TODO(), request)
if err != nil {
log.Fatalf("failed to select object %v" , err)
}
content, err := io.ReadAll(result.Body)
if err != nil {
log.Fatalf("failed to read object %v" , err)
}
log.Printf("select object result:%#v\n" , string (content))
}
使用命令行工具 ossutil
您可以使用命令行工具 ossutil 来查询文件,ossutil 的安装请参见安装 ossutil 。
以下命令用于为存储空间examplebucket
中的 exampleobject 执行 SQL 语句,请求语法 CSV。
ossutil api select-object --bucket examplebucket --key exampleobject --select-request "{\"Expression\":\"c2VsZWN0IFllYXIsU3RhdGVBYmJyLCBDaXR5TmFtZSwgU2hvcnRfUXVlc3Rpb25fVGV4dCBmcm9tIG9zc29iamVjdA==\",\"InputSerialization\":{\"CSV\":{\"FileHeaderInfo\":\"Use\",\"Range\":\"line-range=0-100\"}},\"OutputSerialization\":{\"JSON\":{\"RecordDelimiter\":\",\"}}}"
关于该命令的更多信息,请参见select-object 。
相关 API 以上操作方式底层基于 API 实现,如果您的程序自定义要求较高,您可以直接发起 REST API 请求。直接发起 REST API 请求需要手动编写代码计算签名。更多信息,请参见SelectObject 。