您可以使用SelectObject对目标文件执行SQL语句,返回执行结果。

背景信息

目前Hadoop 3.0已经支持OSS在EMR上运行Spark、Hive、Presto等服务,同时阿里云MaxCompute以及Data Lake Analytics均支持从OSS直接处理数据。

OSS提供的GetObject接口决定了大数据平台只能把OSS数据全部下载到本地然后进行分析过滤,在很多查询场景下浪费了大量带宽和客户端资源。

SelectObject接口是对上述问题的解决方案。其核心思想是大数据平台将条件、Projection下推到OSS层,让OSS做基本的过滤,从而只返回有用的数据。客户端一方面可以减少网络带宽,另一方面也减少了数据的处理量,从而节省了CPU和内存用来做其他更多的事情。这使得基于OSS的数据仓库、数据分析成为一种更有吸引力的选择。

费用说明

调用SelectObject接口查询数据时,按扫描的原文件实际大小计费。更多信息,请参见数据处理费用

支持的文件类型

以下内容是对SelectObject支持的文件类型、支持的SQL语法等的详细介绍。

  • RFC 4180标准的CSV(包括TSV等类CSV文件,文件的行列分隔符以及Quote字符都可自定义)。
  • JSON文件,且文件编码为UTF-8。JSON支持DOCUMENT和LINES两种文件。
    • DOCUMENT是指整个文件是单一的JSON对象。
    • LINES表示整个文件由一行行的JSON对象组成,每一行是一个JSON对象(但整个文件本身并不是一个合法的JSON对象),行与行之间以换行分隔符隔开。OSS Select可以支持常见的\n,\r\n等分隔符,且无需用户指定。
  • 标准存储类型和低频访问存储类型的文件。归档存储和冷归档存储类型文件需要先执行解冻操作。
  • OSS完全托管加密、KMS托管主密钥加密的文件。

支持的SQL语法

  • SQL语句: Select From Where
  • 数据类型:string、int(64bit)、double(64bit), decimal(128bit) 、timestamp、bool
  • 操作: 逻辑条件(AND,OR,NOT), 算术表达式(+-*/%), 比较操作(>,=, <, >=, <=, !=),String 操作 (LIKE, || )
    重要 LIKE模糊匹配时对字母大小写敏感。

支持的数据类型

OSS中的CSV数据默认都是String类型,您可以使用CAST函数实现数据转换。

通过SQL查询语句将_1和_2转换为int的示例:Select * from OSSOBject where cast (_1 as int) > cast(_2 as int)

同时,对于SelectObject支持在Where条件中进行隐式转换,例如下面语句中的第一列和第二列将被转换成int:

Select _1 from ossobject where _1 + _2 > 100

对于JSON文件,如果在SQL中未指定cast函数,则其类型根据JSON数据的实际类型而定,标准JSON内建的数据类型包括null、bool、int64、double、string等类型。

常见的SQL用例

常见的SQL用例包括CSV及JSON两种。

  • CSV
    应用场景 SQL语句
    返回前10行数据 select * from ossobject limit 10
    返回第1列和第3列的整数,并且第1列大于第3列 select _1, _3 from ossobject where cast(_1 as int) > cast(_3 as int)
    返回第1列以'陈'开头的记录的个数(注:此处like后的中文需要用UTF-8编码) select count(*) from ossobject where _1 like '陈%'
    返回所有第2列时间大于2018-08-09 11:30:25且第3列大于200的记录 select * from ossobject where _2 > cast('2018-08-09 11:30:25' as timestamp) and _3 > 200
    返回第2列浮点数的平均值,总和,最大值,最小值

    select AVG(cast(_2 as double)), SUM(cast(_2 as double)), MAX(cast(_2 as double)), MIN(cast(_2 as double))

    返回第1列和第3列连接的字符串中以'Tom'为开头以’Anderson‘结尾的所有记录 select * from ossobject where (_1 || _3) like 'Tom%Anderson'
    返回第1列能被3整除的所有记录 select * from ossobject where (_1 % 3) = 0
    返回第1列大小在1995到2012之间的所有记录 select * from ossobject where _1 between 1995 and 2012
    返回第5列值为N,M,G,L的所有记录 select * from ossobject where _5 in ('N', 'M', 'G', 'L')
    返回第2列乘以第3列比第5列大100以上的所有记录 select * from ossobject where _2 * _3 > _5 + 100
  • JSON

    假设JSON文件如下:

    {
      "contacts":[
    {
      "firstName": "John",
      "lastName": "Smith",
      "isAlive": true,
      "age": 27,
      "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021-3100"
      },
      "phoneNumbers": [
        {
          "type": "home",
          "number": "212 555-1234"
        },
        {
          "type": "office",
          "number": "646 555-4567"
        },
        {
          "type": "mobile",
          "number": "123 456-7890"
        }
      ],
      "children": [],
      "spouse": null
    },…… #此处省略其他类似的节点
    ]}

    SQL用例如下:

    应用场景 SQL语句
    返回所有age是27的记录 select * from ossobject.contacts[*] s where s.age = 27
    返回所有的家庭电话 select s.number from ossobject.contacts[*].phoneNumbers[*] s where s.type = “home”
    返回所有单身的记录 select * from ossobject s where s.spouse is null
    返回所有没有孩子的记录 select * from ossobject s where s.children[0] is null
    说明 目前没有专用的空数组的表示方法,用以上语句代替。

使用场景

SelectObject通常用于大文件分片查询、JSON文件查询、日志文件分析等场景。

  • 大文件分片查询

    GetObject提供的基于Byte的分片下载类似,SelectObject也提供了分片查询的机制,包括以下两种分片方式:

    • 按行分片:常用的分片方式,然而对于稀疏数据来说,按行分片可能会导致分片时负载不均衡。
    • 按Split分片:Split是OSS用于分片的一个概念,一个Split包含多行数据,每个Split的数据大小大致相等。
    说明 按Spit分片比按行分片更加高效。

    如果确定CSV文件列中不包含换行符,则基于Bytes的分片由于不需要创建Meta,其使用更为简便。如果列中包含换行符或者是JSON文件时,则使用以下步骤:

    1. 调用CreateSelectObjectMeta API获得该文件的总的Split数。如果该文件需要用SelectObject,则建议在查询前异步调用该接口,以节省扫描时间。
    2. 根据客户端资源情况选择合适的并发度n,用总的Split数除以并发度n得到每个分片查询应该包含的Split个数。
    3. 在请求Body中用诸如split-range=1-20的形式进行分片查询。
    4. 合并结果。
  • JSON文件查询

    查询JSON文件时,在SQL的From语句中尽可能缩小From后的JSON Path范围。

    如下是JSON文件示例:

    {
      "contacts":[
    {
      "firstName": "John",
      "lastName": "Smith",
      "address": {
        "streetAddress": "21 2nd Street",
        "city": "New York",
        "state": "NY",
        "postalCode": "10021-3100"
      },
      "phoneNumbers": [
        {
          "type": "home",
          "number": "212 555-1234"
        },
        {
          "type": "office",
          "number": "646 555-4567"
        },
        {
          "type": "mobile",
          "number": "123 456-7890"
        }
      ]
    }
    ]}

    如果要查找所有postalCode为10021开头的streetAddress,SQL可以写为 select s.address.streetAddress from ossobject.contacts[*] s where s.address.postalCode like ‘10021%’或者select s.streetAddress from ossobject.contacts[*].address s where s.postalCode like ‘10021%’

    由于select s.streetAddress from ossobject.contacts[*].address s where s.postalCode like ‘10021%’的JSON Path更加精确,因此性能更优。

  • 在JSON文件中处理高精度浮点数

    在JSON文件中需要进行高精度浮点数的数值计算时,建议设置ParseJsonNumberAsString选项为true, 同时将该值cast成Decimal。比如一个属性a值为123456789.123456789,用select s.a from ossobject s where cast(s.a as decimal) > 123456789.12345就可以保持原始数据的精度不丢失。

使用OSS控制台

重要 通过控制台仅支持从128 MB以下的文件中提取40 MB以下的数据记录。
  1. 登录OSS管理控制台
  2. 单击Bucket列表,然后单击目标Bucket名称。
  3. 在左侧导航栏,选择文件管理 > 文件列表
  4. 在目标文件右侧的操作栏下,选择more > 选取内容
  5. 选取内容面板,按以下说明设置各项参数。
    参数 说明
    文件类型 仅支持CSV和JSON两种文件类型。
    分隔符 仅适用于CSV文件。请选择半角逗号(,)或自定义分隔符。
    标题行 仅适用于CSV文件。请选择文件第一行是否包含列标题。
    JSON格式符 仅适用于JSON文件。请选择您的JSON文件对应的格式。
    压缩格式 选择您当前的文件是否为压缩文件。目前压缩文件仅支持GZIP文件。
  6. 单击显示文件预览
    重要 预览标准存储类型文件时,会产生Select扫描费用。预览低频访问、归档和冷归档存储类型文件时,会产生Select扫描费用和数据取回费用。更多信息,请参见数据处理费用
  7. 单击下一步,输入SQL语句并执行。
    假设名为People的CSV文件有3列数据,分别是姓名公司年龄
    • 如果想查找年龄大于50岁,并且名字以Lora开头的人(其中_1,_2,_3是列索引,代表第一列、第二列、第三列),可以执行以下SQL语句:
      select * from ossobject where _1 like 'Lora*' and _3 > 50
    • 如果想统计这个文件有多少行,最大年龄与最小年龄是多少,可以执行以下SQL语句:
      select count(*), max(cast(_3 as int)), min(cast(_3 as int)) from oss_object
  8. 查看执行结果。
    您还可以单击下载,将所选取的内容下载到本地。

使用阿里云SDK

当前仅支持通过Java SDK和Python SDK查询文件。

import com.aliyun.oss.model.*;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.InputStreamReader;

/**
 * Examples of create select object metadata and select object.
 *
 */
public class SelectObjectSample {
    // yourEndpoint填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
    private static String endpoint = "yourEndpoint";
    // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
    private static String accessKeyId = "yourAccessKeyId";
    private static String accessKeySecret = "yourAccessKeySecret";
    // 填写Bucket名称,例如examplebucket。
    private static String bucketName = "examplebucket";

    public static void main(String[] args) throws Exception {
        OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
        // 填写Object完整路径后,根据SELECT语句查询文件中的数据。Object完整路径中不能包含Bucket名称。
        // 填写CSV格式的Object完整路径。
        selectCsvSample("test.csv", ossClient);
        // 填写JSON格式的Object完整路径。
        selectJsonSample("test.json", ossClient);
        ossClient.shutdown();
    }

    private static void selectCsvSample(String key, OSS ossClient) throws Exception {
        // 填写上传的内容。
        String content = "name,school,company,age\r\n" +
                "Lora Francis,School A,Staples Inc,27\r\n" +
                "Eleanor Little,School B,\"Conectiv, Inc\",43\r\n" +
                "Rosie Hughes,School C,Western Gas Resources Inc,44\r\n" +
                "Lawrence Ross,School D,MetLife Inc.,24";

        ossClient.putObject(bucketName, key, new ByteArrayInputStream(content.getBytes()));

        SelectObjectMetadata selectObjectMetadata = ossClient.createSelectObjectMetadata(
                new CreateSelectObjectMetadataRequest(bucketName, key)
                        .withInputSerialization(
                                new InputSerialization().withCsvInputFormat(
                                        // 填写内容中不同记录之间的分隔符,例如\r\n。
                                        new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n"))));
        System.out.println(selectObjectMetadata.getCsvObjectMetadata().getTotalLines());
        System.out.println(selectObjectMetadata.getCsvObjectMetadata().getSplits());

        SelectObjectRequest selectObjectRequest =
                new SelectObjectRequest(bucketName, key)
                        .withInputSerialization(
                                new InputSerialization().withCsvInputFormat(
                                        new CSVFormat().withHeaderInfo(CSVFormat.Header.Use).withRecordDelimiter("\r\n")))
                        .withOutputSerialization(new OutputSerialization().withCsvOutputFormat(new CSVFormat()));
        // 使用SELECT语句查询第4列,值大于40的所有记录。
        selectObjectRequest.setExpression("select * from ossobject where _4 > 40");
        OSSObject ossObject = ossClient.selectObject(selectObjectRequest);

        // 读取内容。
        BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()));
        while (true) {
            String line = reader.readLine();
            if (line == null) {
                break;
            }
            System.out.println(line);
        }
        reader.close();

        ossClient.deleteObject(bucketName, key);
    }

    private static void selectJsonSample(String key, OSS ossClient) throws Exception {
        // 填写上传的内容。
        final String content = "{\n" +
                "\t\"name\": \"Lora Francis\",\n" +
                "\t\"age\": 27,\n" +
                "\t\"company\": \"Staples Inc\"\n" +
                "}\n" +
                "{\n" +
                "\t\"name\": \"Eleanor Little\",\n" +
                "\t\"age\": 43,\n" +
                "\t\"company\": \"Conectiv, Inc\"\n" +
                "}\n" +
                "{\n" +
                "\t\"name\": \"Rosie Hughes\",\n" +
                "\t\"age\": 44,\n" +
                "\t\"company\": \"Western Gas Resources Inc\"\n" +
                "}\n" +
                "{\n" +
                "\t\"name\": \"Lawrence Ross\",\n" +
                "\t\"age\": 24,\n" +
                "\t\"company\": \"MetLife Inc.\"\n" +
                "}";

        ossClient.putObject(bucketName, key, new ByteArrayInputStream(content.getBytes()));

        SelectObjectRequest selectObjectRequest =
                new SelectObjectRequest(bucketName, key)
                        .withInputSerialization(new InputSerialization()
                                .withCompressionType(CompressionType.NONE)
                                .withJsonInputFormat(new JsonFormat().withJsonType(JsonType.LINES)))
                        .withOutputSerialization(new OutputSerialization()
                                .withCrcEnabled(true)
                                .withJsonOutputFormat(new JsonFormat()))
                        .withExpression("select * from ossobject as s where s.age > 40"); // 使用SELECT语句查询文件中的数据。

        OSSObject ossObject = ossClient.selectObject(selectObjectRequest);

        // 读取内容。
        BufferedReader reader = new BufferedReader(new InputStreamReader(ossObject.getObjectContent()));
        while (true) {
            String line = reader.readLine();
            if (line == null) {
                break;
            }
            System.out.println(line);
        }
        reader.close();

        ossClient.deleteObject(bucketName, key);
    }
}
import oss2

def select_call_back(consumed_bytes, total_bytes =  None):
        print('Consumed Bytes:' + str(consumed_bytes) + '\n')

# 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
access_key_id = 'yourAccessKeyId'
access_key_secret = 'yourAccessKeySecret'
# 填写Bucket名称,例如examplebucket。
bucket_name = 'yourtBucketName'
# 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
endpoint = 'https://oss-cn-hangzhou.aliyuncs.com'

# 创建Bucket,所有文件相关的方法都需要通过Bucket来调用。
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
key =  'python_select.csv'
content =  'Tom Hanks,USA,45\r\n'*1024
filename =  'python_select.csv'

# 上传CSV文件。
bucket.put_object(key, content)
# Select API的参数。
csv_meta_params = {'RecordDelimiter': '\r\n'}
select_csv_params = {'CsvHeaderInfo': 'None',
                    'RecordDelimiter': '\r\n',
                    'LineRange': (500, 1000)}

csv_header = bucket.create_select_object_meta(key, csv_meta_params)
print(csv_header.rows)
print(csv_header.splits)
result = bucket.select_object(key, "select * from ossobject where _3 > 44", select_call_back, select_csv_params)
select_content = result.read()
print(select_content)

result = bucket.select_object_to_file(key, filename,
      "select * from ossobject where _3 > 44", select_call_back, select_csv_params)
bucket.delete_object(key)

###JSON DOCUMENT
key =  'python_select.json'
content =  "{\"contacts\":[{\"key1\":1,\"key2\":\"hello world1\"},{\"key1\":2,\"key2\":\"hello world2\"}]}"
filename =  'python_select.json'
# 上传JSON DOCUMENT。
bucket.put_object(key, content)
select_json_params = {'Json_Type': 'DOCUMENT'}
result = bucket.select_object(key, "select s.key2 from ossobject.contacts[*] s where s.key1 = 1", None, select_json_params)
select_content = result.read()
print(select_content)

result = bucket.select_object_to_file(key, filename,
      "select s.key2 from ossobject.contacts[*] s where s.key1 = 1", None, select_json_params)
bucket.delete_object(key)

###JSON LINES
key =  'python_select_lines.json'
content =  "{\"key1\":1,\"key2\":\"hello world1\"}\n{\"key1\":2,\"key2\":\"hello world2\"}"
filename =  'python_select.json'
# 上传JSON LINE。
bucket.put_object(key, content)
select_json_params = {'Json_Type': 'LINES'}
json_header = bucket.create_select_object_meta(key,select_json_params)
print(json_header.rows)
print(json_header.splits)

result = bucket.select_object(key, "select s.key2 from ossobject s where s.key1 = 1", None, select_json_params)
select_content =  result.read()
print(select_content)
result = bucket.select_object_to_file(key, filename,
           "select s.key2 from ossobject s where s.key1 = 1", None, select_json_params)
bucket.delete_object(key)
package main

import (
    "fmt"
    "github.com/aliyun/aliyun-oss-go-sdk/oss"
    "io/ioutil"
    "os"
)

func main() {
    // 创建OSSClient实例。
    // yourEndpoint填写Bucket对应的Endpoint,以华东1(杭州)为例,填写为https://oss-cn-hangzhou.aliyuncs.com。其它Region请按实际情况填写。
    // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
    client, err := oss.New("yourEndpoint", "yourAccessKeyId", "yourAccessKeySecret")
    if err != nil {
        fmt.Println("Error:", err)
        os.Exit(-1)
    }
    // 填写Bucket名称,例如examplebucket。
    bucket,err := client.Bucket("examplebucket")
    if err != nil {
        fmt.Println("Error:", err)
        os.Exit(-1)
    }
    // 填写Object完整路径,完整路径中不能包含Bucket名称,例如exampledir/exampledata.csv。
    key := "exampledir/exampledata.csv"
    // 填写本地CSV文件的完整路径,例如D:\\localpath\\exampledata.csv。
    localCsvFile := "D:\\localpath\\exampledata.csv"
    err = bucket.PutObjectFromFile(key, localCsvFile)
    if err != nil {
        fmt.Println("Error:", err)
        os.Exit(-1)
    }

    selReq := oss.SelectRequest{}
    // 使用SELECT语句查询文件中的数据。
    selReq.Expression =  `select * from ossobject`
    body,err := bucket.SelectObject(key, selReq)

    if err != nil {
        fmt.Println("Error:", err)
        os.Exit(-1)
    }
    // 读取内容。
    fc,err  := ioutil.ReadAll(body)

    if err != nil {
        fmt.Println("Error:", err)
        os.Exit(-1)
    }
    defer body.Close()
    fmt.Println(string(fc))
}                    

使用REST API

如果您的程序自定义要求较高,您可以直接发起REST API请求。直接发起REST API请求需要手动编写代码计算签名。更多信息,请参见SelectObject