成功构建ODPS对象后,您可对项目空间下的Tables和Instances等对象执行后续操作,包括SQL操作、数据上传/下载、表/分区管理,以及Instance管理等。
您可使用访问凭证配置方式中的任意一种方法来创建ODPS对象,为了方便,本文的示例代码中,均使用config.ini中加载AK的方法。
执行SQL
您可通过SQLTask对象的run方法或MaxCompute SQL Driver执行各类MaxCompute SQL。
通过SDK执行SQL
您可通过SQLTask对象的run方法执行各类MaxCompute SQL,该方法会返回Instance对象。当执行SELECT语句时,如果查询结果大于10000行数据,需要使用Tunnel下载全部的查询结果。当查询结果小于10000行数据时,可以直接从Instance对象获取查询结果。下面将以SELECT语句为例介绍SQL执行方法。
示例一:执行SELECT并获取查询结果
当查询结果小于10000行数据时,查询结果可用CSV Reader的形式直接读取。
package main
import (
"fmt"
"io"
"log"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1>0 or p2 > '';"
// SQL引擎参数, 例如odps.sql.skewjoin
var hints map[string]string = nil
sqlTask := odps.NewSqlTask("select_demo", sql, hints)
ins, err := sqlTask.Run(odpsIns, odpsIns.DefaultProjectName())
if err != nil {
log.Fatalf("%+v", err)
}
err = ins.WaitForSuccess()
if err != nil {
log.Fatalf("%+v", err)
}
csvReader, err := sqlTask.GetSelectResultAsCsv(ins, true)
if err != nil {
log.Fatalf("%+v", err)
}
for {
record, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("%v\n", record)
}
}
示例二:执行SELECT,通过Tunnel获取查询结果
当查询结果大于10000行数据时,查询结果需要通过Tunnel获取。
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// 配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1 = 20 and p2 = 'hangzhou';"
// SQL引擎参数, 例如odps.sql.skewjoin
var hints map[string]string = nil
sqlTask := odps.NewSqlTask("select_demo", sql, hints)
// 使用项目关联的配额运行SQL
projectName := odpsIns.DefaultProjectName()
ins, err := sqlTask.Run(odpsIns, projectName)
if err != nil {
log.Fatalf("%+v", err)
}
err = ins.WaitForSuccess()
if err != nil {
log.Fatalf("%+v", err)
}
// 生成logView以获取作业详细信息
lv := odpsIns.LogView()
lvUrl, err := lv.GenerateLogView(ins, 10)
if err != nil {
log.Fatalf("%+v", err)
}
println(lvUrl)
project := odpsIns.DefaultProject()
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
// 创建Tunnel实例
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
session, err := tunnelIns.CreateInstanceResultDownloadSession(project.Name(), ins.Id())
if err != nil {
log.Fatalf("%+v", err)
}
start := 0
step := 200000
recordCount := session.RecordCount()
schema := session.Schema()
total := 0
for start < recordCount {
reader, err := session.OpenRecordReader(start, step, 0, nil)
if err != nil {
log.Fatalf("%+v", err)
}
count := 0
err = reader.Iterator(func(record data.Record, _err error) {
count += 1
if _err != nil {
return
}
for i, d := range record {
if d == nil {
fmt.Printf("%s=null", schema.Columns[i].Name)
} else {
fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql())
}
if i < record.Len()-1 {
fmt.Printf(", ")
} else {
fmt.Println()
}
}
})
if err != nil {
log.Fatalf("%+v", err)
}
start += count
total += count
log.Println(count)
if err = reader.Close(); err != nil {
log.Fatalf("%+v", err)
}
}
println("total count ", total)
}
通过MaxCompute SQL Driver执行SQL
示例一:执行CREATE TABLE语句
package main
import (
"database/sql"
"log"
)
func main() {
// 在环境变量中设置AK: ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECURITY
dsn := "http://<endpoint>?project=<project>&odps.sql.type.system.odps2=true&odps.sql.decimal.odps2=true"
db, err := sql.Open("odps", dsn)
if err != nil {
log.Fatalf("%+v", err)
}
sqlStr := "create table table_with_date (date_col DATE);"
_, err = db.Exec(sqlStr)
if err != nil {
log.Fatalf("%+v", err)
}
}
示例二:执行SELECT语句,并获取结果
package main
import (
"database/sql"
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/sqldriver"
"log"
"reflect"
)
func main() {
config, err := sqldriver.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
dsn := config.FormatDsn()
// or dsn := "http://<accessId>:<accessKey>@<endpoint>?project=<project>"
db, err := sql.Open("odps", dsn)
if err != nil {
log.Fatalf("%+v", err)
}
selectSql := "select * from all_types_demo where bigint_type=@bigint_type and p1=@p1 and p2='@p2';"
rows, err := db.Query(
selectSql,
sql.Named("bigint_type", 100000000000),
sql.Named("p1", 20),
sql.Named("p2", "hangzhou"),
)
if err != nil {
log.Fatalf("%+v", err)
}
columnTypes, err := rows.ColumnTypes()
if err != nil {
log.Fatalf("%+v", err)
}
record := make([]interface{}, len(columnTypes))
for i, columnType := range columnTypes {
record[i] = reflect.New(columnType.ScanType()).Interface()
t := reflect.TypeOf(record[i])
fmt.Printf("kind=%s, name=%s\n", t.Kind(), t.String())
}
columns, err := rows.Columns()
if err != nil {
log.Fatalf("%+v", err)
}
for rows.Next() {
err = rows.Scan(record...)
if err != nil {
log.Fatalf("%+v", err)
}
for i, r := range record {
rr := r.(sqldriver.NullAble)
if rr.IsNull() {
fmt.Printf("%s=NULL", columns[i])
} else {
switch r.(type) {
case *sqldriver.NullInt8:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt8).Int8)
case *sqldriver.NullInt16:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt16).Int16)
case *sqldriver.NullInt32:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt32).Int32)
case *sqldriver.NullInt64:
fmt.Printf("%s=%d", columns[i], r.(*sqldriver.NullInt64).Int64)
case *sqldriver.Binary:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullFloat32:
fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat32).Float32)
case *sqldriver.NullFloat64:
fmt.Printf("%s=%f", columns[i], r.(*sqldriver.NullFloat64).Float64)
case *sqldriver.Decimal:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullString:
fmt.Printf("%s=%s", columns[i], r.(*sqldriver.NullString).String)
case *sqldriver.NullDate:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullDateTime:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullTimeStamp:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.NullBool:
fmt.Printf("%s=%v", columns[i], r.(*sqldriver.NullBool).Bool)
case *sqldriver.Map:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Array:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Struct:
fmt.Printf("%s=%s", columns[i], r)
case *sqldriver.Json:
fmt.Printf("%s=%s", columns[i], r)
}
}
if i < len(record)-1 {
fmt.Printf(", ")
} else {
fmt.Print("\n\n")
}
}
}
}
数据上传与下载
您可使用Tunnel对表/分区的数据进行批量上传与下载,也可通过流式数据通道将数据写入表/分区。
初始化Tunnel
Tunnel的初始化示例代码如下。您可使用访问凭证配置方式中的任意一种方法来创建ODPS对象,为了方便,接下来的示例代码中,都使用config.ini中加载AK的方法。
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// 从配置文件中获取配置信息
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// 初始化ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// 获取Tunnel Endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
// 初始化Tunnel
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
println("%+v", tunnelIns)
}
批量数据上传
上传表或分区数据时,在初始化Tunnel后,需要进行如下操作:
创建UploadSession,指定要将数据上传到哪张表/分区,以及指定上传数据使用的压缩算法等。
使用UploadSession创建Writer,Writer进行数据上传,一个Writer上传的数据被称为一个Block,使用一个INT类型值作为Block ID。为了提高上传速度,可以创建多个Writer进行并发数据上传。
Writer上传数据完毕后,需要调用UploadSession.commit来最终完成上传,commit需要指定Block ID列表。
package main import ( "fmt" "github.com/aliyun/aliyun-odps-go-sdk/odps" "github.com/aliyun/aliyun-odps-go-sdk/odps/account" "github.com/aliyun/aliyun-odps-go-sdk/odps/data" "github.com/aliyun/aliyun-odps-go-sdk/odps/datatype" "github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema" "github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel" "log" ) func main() { // 从配置文件中获取配置信息 conf, err := odps.NewConfigFromIni("./config.ini") if err != nil { log.Fatalf("%+v", err) } // 初始化ODPS aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey) odpsIns := odps.NewOdps(aliAccount, conf.Endpoint) odpsIns.SetDefaultProjectName(conf.ProjectName) project := odpsIns.DefaultProject() // 初始化Tunnel tunnelEndpoint, err := project.GetTunnelEndpoint() if err != nil { log.Fatalf("%+v", err) } fmt.Println("tunnel endpoint: " + tunnelEndpoint) tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint) // 创建一个UploadSession,指定要写入的表/分区 session, err := tunnelIns.CreateUploadSession( project.Name(), "all_types_demo", tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"), tunnel.SessionCfg.WithDefaultDeflateCompressor(), ) if err != nil { log.Fatalf("%+v", err) } writerNum := 3 blockIds := make([]int, writerNum) for i := 0; i < writerNum; i++ { blockIds[i] = i } errChan := make(chan error, writerNum) // 通过多个writer并发上传数据,每个writer都有一个blockId作为它写入的数据的身份 for _, blockId := range blockIds { blockId := blockId go func() { schema := session.Schema() record, err := makeRecord(schema) if err != nil { errChan <- err return } recordWriter, err := session.OpenRecordWriter(blockId) if err != nil { errChan <- err return } for i := 0; i < 100; i++ { err = recordWriter.Write(record) if err != nil { _ = recordWriter.Close() errChan <- err return } } err = recordWriter.Close() if err == nil { fmt.Printf("success to upload %d record, %d bytes\n", recordWriter.RecordCount(), recordWriter.BytesCount()) } errChan <- err }() } // 等待所有Writers完成数据上传 for i := 0; i < writerNum; i++ { err := <-errChan if err != nil { log.Fatalf("%+v", err) } } // 提交所有Blocks完成上传,即可在表中查看数据 err = session.Commit(blockIds) log.Println("success to commit all blocks") if err != nil { log.Fatalf("%+v", err) } } func makeRecord(schema tableschema.TableSchema) (data.Record, error) { varchar, _ := data.NewVarChar(500, "varchar") char, _ := data.NewVarChar(254, "char") s := data.String("hello world") date, _ := data.NewDate("2022-10-19") datetime, _ := data.NewDateTime("2022-10-19 17:00:00") timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000") mapType := schema.Columns[15].Type.(datatype.MapType) mapData := data.NewMapWithType(mapType) err := mapData.Set("hello", 1) if err != nil { return nil, err } err = mapData.Set("world", 2) if err != nil { return nil, err } arrayType := schema.Columns[16].Type.(datatype.ArrayType) arrayData := data.NewArrayWithType(arrayType) err = arrayData.Append("a") if err != nil { return nil, err } err = arrayData.Append("b") if err != nil { return nil, err } structType := schema.Columns[17].Type.(datatype.StructType) structData := data.NewStructWithTyp(structType) arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType)) err = arr.Append("x") if err != nil { return nil, err } err = arr.Append("y") if err != nil { return nil, err } err = structData.SetField("arr", arr) if err != nil { return nil, err } err = structData.SetField("name", "tom") if err != nil { return nil, err } record := []data.Data{ data.TinyInt(1), data.SmallInt(32767), data.Int(100), data.BigInt(100000000000), data.Binary("binary"), data.Float(3.14), data.Double(3.1415926), data.NewDecimal(38, 18, "3.1415926"), varchar, char, s, date, datetime, timestamp, data.Bool(true), mapData, arrayData, structData, } return record, nil }
批量数据下载
下载表/分区数据时,在初始化Tunnel后,需要进行如下操作:
创建DownloadSession,指定从哪个表、分区下载数据,以及指定传输数据使用的压缩算法等。
使用DownloadSession创建Reader,Reader可以用分页的形式,分批将数据下载到本地。
package main import ( "fmt" "github.com/aliyun/aliyun-odps-go-sdk/odps" "github.com/aliyun/aliyun-odps-go-sdk/odps/account" "github.com/aliyun/aliyun-odps-go-sdk/odps/data" "github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel" "log" ) func main() { // 从配置文件中读取配置信息 conf, err := odps.NewConfigFromIni("./config.ini") if err != nil { log.Fatalf("%+v", err) } // 初始化ODPS aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey) odpsIns := odps.NewOdps(aliAccount, conf.Endpoint) odpsIns.SetDefaultProjectName(conf.ProjectName) project := odpsIns.DefaultProject() // 获取Tunnel Endpoint tunnelEndpoint, err := project.GetTunnelEndpoint() if err != nil { log.Fatalf("%+v", err) } fmt.Println("tunnel endpoint: " + tunnelEndpoint) tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint) // 创建一个DownloadSession,指定要读取的表/分区 session, err := tunnelIns.CreateDownloadSession( project.Name(), "all_types_demo", tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"), ) if err != nil { log.Fatalf("%+v", err) } recordCount := session.RecordCount() fmt.Printf("record count is %d\n", recordCount) start := 0 step := 100001 total := 0 schema := session.Schema() for start < recordCount { reader, err := session.OpenRecordReader(start, step, nil) if err != nil { log.Fatalf("%+v", err) } count := 0 err = reader.Iterator(func(record data.Record, _err error) { if _err != nil { return } for i, d := range record { if d == nil { fmt.Printf("%s=null", schema.Columns[i].Name) } else { fmt.Printf("%s=%s", schema.Columns[i].Name, d.Sql()) } if i < record.Len()-1 { fmt.Printf(", ") } else { fmt.Println() } } }) if err != nil { log.Fatalf("%+v", err) } start += count total += count log.Println(count) if err = reader.Close(); err != nil { log.Fatalf("%+v", err) } } println("total count ", total) }
流式数据上传
MaxCompute支持通过流式数据通道将数据写入表/分区的能力。
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/data"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel"
"log"
)
func main() {
// 从配置文件中获取配置信息
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
// 初始化ODPS
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.DefaultProject()
// 获取Tunnel Endpoint
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Println("tunnel endpoint: " + tunnelEndpoint)
tunnelIns := tunnel.NewTunnel(odpsIns, tunnelEndpoint)
session, err := tunnelIns.CreateStreamUploadSession(
project.Name(),
"all_types_demo",
tunnel.SessionCfg.WithPartitionKey("p1=20,p2='hangzhou'"),
tunnel.SessionCfg.WithCreatePartition(), // 如果指定分区不存在,则创建新的分区
tunnel.SessionCfg.WithDefaultDeflateCompressor(),
)
if err != nil {
log.Fatalf("%+v", err)
}
packWriter := session.OpenRecordPackWriter()
for i := 0; i < 2; i++ {
record, err := makeRecord(session.Schema())
if err != nil {
log.Fatalf("%+v", err)
}
// 将数据加入packWriter中,直到数据大小达到阈值
for packWriter.DataSize() < 64 {
err = packWriter.Append(record)
if err != nil {
log.Fatalf("%+v", err)
}
}
// 刷新数据
traceId, recordCount, bytesSend, err := packWriter.Flush()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"success to upload data with traceId=%s, record count=%d, record bytes=%d\n",
traceId, recordCount, bytesSend,
)
}
}
func makeRecord(schema tableschema.TableSchema) (data.Record, error) {
varchar, _ := data.NewVarChar(500, "varchar")
char, _ := data.NewVarChar(254, "char")
s := data.String("hello world")
date, _ := data.NewDate("2022-10-19")
datetime, _ := data.NewDateTime("2022-10-19 17:00:00")
timestamp, _ := data.NewTimestamp("2022-10-19 17:00:00.000")
mapType := schema.Columns[15].Type.(datatype.MapType)
mapData := data.NewMapWithType(mapType)
err := mapData.Set("hello", 1)
if err != nil {
return nil, err
}
err = mapData.Set("world", 2)
if err != nil {
return nil, err
}
arrayType := schema.Columns[16].Type.(datatype.ArrayType)
arrayData := data.NewArrayWithType(arrayType)
err = arrayData.Append("a")
if err != nil {
return nil, err
}
err = arrayData.Append("b")
if err != nil {
return nil, err
}
structType := schema.Columns[17].Type.(datatype.StructType)
structData := data.NewStructWithTyp(structType)
arr := data.NewArrayWithType(structType.FieldType("arr").(datatype.ArrayType))
err = arr.Append("x")
if err != nil {
return nil, err
}
err = arr.Append("y")
if err != nil {
return nil, err
}
err = structData.SetField("arr", arr)
if err != nil {
return nil, err
}
err = structData.SetField("name", "tom")
if err != nil {
return nil, err
}
record := []data.Data{
data.TinyInt(1),
data.SmallInt(32767),
data.Int(100),
data.BigInt(100000000000),
data.Binary("binary"),
data.Float(3.14),
data.Double(3.1415926),
data.NewDecimal(38, 18, "3.1415926"),
varchar,
char,
s,
date,
datetime,
timestamp,
data.Bool(true),
mapData,
arrayData,
structData,
}
return record, nil
}
Schema管理
package schema
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
// schemas means all Schema in default project
schemas := odpsIns.Schemas()
// 获取默认项目中的所有Schema
schemas.List(func(schema *odps.Schema, err error) {
print(schema.Name() + "\n")
})
// 指定当前Schema
odpsIns.SetCurrentSchemaName("default_schema")
// 直接对表进行操作,如果未指定Schema,则查询“default” Schema下的表
table := odpsIns.Table("table") // actually, the table name is "project.default_schema.table"
print(table.SchemaName())
// 获取指定Schema(例如:schema_A)下的所有表
tablesInSchemaA := odps.NewTables(odpsIns, conf.ProjectName, "schema_A")
tablesInSchemaA.List(func(table *odps.Table, err error) {
print(table.Name() + "\n")
})
// 创建Schema
schemas.Create("new_schema", false, "comment")
// 删除Schema
schemas.Delete("to_delete_schema")
// 获取Schema元数据
schema := schemas.Get("new_schema")
schema.Load()
schema.Name()
schema.ProjectName()
schema.Type()
schema.Owner()
schema.Comment()
schema.CreateTime()
schema.ModifiedTime()
}
表管理
创建表
创建普通表
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
conf, err := odps.NewConfigFromIni("./config.ini")
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
c1 := tableschema.Column{
Name: "tiny_int_type",
Type: datatype.TinyIntType,
}
c2 := tableschema.Column{
Name: "small_int_type",
Type: datatype.SmallIntType,
}
c3 := tableschema.Column{
Name: "int_type",
Type: datatype.IntType,
}
c4 := tableschema.Column{
Name: "bigint_type",
Type: datatype.BigIntType,
}
c5 := tableschema.Column{
Name: "binary_type",
Type: datatype.BinaryType,
}
c6 := tableschema.Column{
Name: "float_type",
Type: datatype.FloatType,
}
c7 := tableschema.Column{
Name: "double_type",
Type: datatype.DoubleType,
}
c8 := tableschema.Column{
Name: "decimal_type",
Type: datatype.NewDecimalType(10, 8),
}
c9 := tableschema.Column{
Name: "varchar_type",
Type: datatype.NewVarcharType(500),
}
c10 := tableschema.Column{
Name: "char_type",
Type: datatype.NewCharType(254),
}
c11 := tableschema.Column{
Name: "string_type",
Type: datatype.StringType,
}
c12 := tableschema.Column{
Name: "date_type",
Type: datatype.DateType,
}
c13 := tableschema.Column{
Name: "datetime_type",
Type: datatype.DateTimeType,
}
c14 := tableschema.Column{
Name: "timestamp_type",
Type: datatype.TimestampType,
}
c15 := tableschema.Column{
Name: "timestamp_ntz_type",
Type: datatype.TimestampNtzType,
}
c16 := tableschema.Column{
Name: "boolean_type",
Type: datatype.BooleanType,
}
mapType := datatype.NewMapType(datatype.StringType, datatype.BigIntType)
arrayType := datatype.NewArrayType(datatype.StringType)
structType := datatype.NewStructType(
datatype.NewStructFieldType("arr", arrayType),
datatype.NewStructFieldType("name", datatype.StringType),
)
jsonType := datatype.NewJsonType()
c17 := tableschema.Column{
Name: "map_type",
Type: mapType,
}
c18 := tableschema.Column{
Name: "array_type",
Type: arrayType,
}
c19 := tableschema.Column{
Name: "struct_type",
Type: structType,
}
c20 := tableschema.Column{
Name: "json_type",
Type: jsonType,
}
p1 := tableschema.Column{
Name: "p1",
Type: datatype.BigIntType,
}
p2 := tableschema.Column{
Name: "p2",
Type: datatype.StringType,
}
schemaBuilder := tableschema.NewSchemaBuilder()
schemaBuilder.Name("all_types_demo").
Columns(c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16, c17, c18, c19, c20).
PartitionColumns(p1, p2).
Lifecycle(2) // 单位:天
schema := schemaBuilder.Build()
tablesIns := odpsIns.Tables()
// 如果project的数据类型版本是1.0,需要通过下面的hints使用mc 2.0数据类型
hints := make(map[string]string)
hints["odps.sql.type.system.odps2"] = "true"
hints["odps.sql.decimal.odps2"] = "true"
err = tablesIns.Create(schema, true, hints, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
创建聚簇表
创建Hash聚簇表
Hash聚簇表的详细介绍请参考Hash Clustering。
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
// 创建一个DDL语句为“
// CREATE TABLE test_hash_clustering (a string, b string, c bigint)
// PARTITIONED BY (dt string)
// CLUSTERED BY (c)
// SORTED by (c) INTO 1024 BUCKETS;”的表。
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// 分区列
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name("test_hash_clustering"). // 表名称
Columns(c1, c2, c3). // 列名
PartitionColumns(pc). // 分区列
ClusterType(tableschema.CLUSTER_TYPE.Hash). // 聚簇类型为哈希聚簇(hash clustering)
ClusterColumns([]string{c3.Name}). // 指定Cluster Key(即Hash Key)
// Sort key,可选项,但在大多数情况下,建议和Cluster Key一致,以便取得最佳的优化效果。
ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
ClusterBucketNum(1024) // Hash分片(Bucket)的数目
tablesIns := odpsIns.Tables()
schema := sb.Build()
println(schema.ToSQLString("test_cluster", "", true))
err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
创建Range聚簇表
Range聚簇表的详细介绍请参考Range Clustering。
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
// 创建一个DDL语句为”
// CREATE TABLE test_range_clustering (a string, b string, c int)
// PARTITIONED BY (dt int)
// RANGE CLUSTERED BY (c)
// SORTED by (c)
// INTO 1024 BUCKETS;“的表
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// 分区列
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name("test_range_clustering"). // 表名称
Columns(c1, c2, c3). // 列名
PartitionColumns(pc). // 分区列
ClusterType(tableschema.CLUSTER_TYPE.Range). // 聚簇类型为Range Clustering
ClusterColumns([]string{c3.Name}). // 指定Range Cluster Key
// Sort key,可选项,但在大多数情况下,建议和Cluster Key一致,以便取得最佳的优化效果。
ClusterSortColumns([]tableschema.SortColumn{{Name: c3.Name, Order: tableschema.SORT_ORDER.ASC}}).
ClusterBucketNum(1024) // Bucket数目(可选)
tablesIns := odpsIns.Tables()
schema := sb.Build()
println(schema.ToSQLString("test_cluster", "", true))
err = tablesIns.Create(schema, true, nil, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
创建OSS外表
关于创建OSS外表的详细信息可以参考创建OSS外部表。
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/datatype"
"github.com/aliyun/aliyun-odps-go-sdk/odps/tableschema"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
// create external table if not exists go_sdk_regression_testing.`testCreateExternalTableWithUserDefinedStorageHandler` (
// `a` STRING ,
// `b` STRING ,
// `c` BIGINT
//)
// comment 'External table using user defined TextStorageHandler'
// partitioned by (`dt` STRING)
// stored by 'com.aliyun.odps.udf.example.text.TextStorageHandler'
// with serdeproperties('odps.text.option.delimiter'='|', 'my.own.option'='value')
// location 'MOCKoss://full/uri/path/to/oss/directory/'
// lifecycle 10;
tableName := "testCreateExternalTableWithUserDefinedStorageHandler"
c1 := tableschema.Column{
Name: "a",
Type: datatype.StringType,
}
c2 := tableschema.Column{
Name: "b",
Type: datatype.StringType,
}
c3 := tableschema.Column{
Name: "c",
Type: datatype.BigIntType,
}
// partition column
pc := tableschema.Column{
Name: "dt",
Type: datatype.StringType,
}
sb := tableschema.NewSchemaBuilder()
sb.Name(tableName). // table name
Columns(c1, c2, c3). // columns
PartitionColumns(pc). // partition columns
Location("oss://full/uri/path/to/oss/directory/").
StorageHandler("com.aliyun.odps.udf.example.text.TextStorageHandler").
Comment("External table using user defined TextStorageHandler").
Lifecycle(10)
tablesIns := odpsIns.Tables()
schema := sb.Build()
// 定义 properties 映射
serDeProperties := map[string]string{
"odps.text.option.delimiter": "|",
"my.own.option": "value",
}
// 定义 hints 映射
hints := map[string]string{
"odps.sql.preparse.odps2": "lot",
"odps.sql.planner.mode": "lot",
"odps.sql.planner.parser.odps2": "true",
"odps.sql.ddl.odps2": "true",
"odps.compiler.output.format": "lot,pot",
}
sql, err := schema.ToExternalSQLString(odpsIns.DefaultProjectName(), "", true, serDeProperties, nil)
print(sql)
err = tablesIns.CreateExternal(schema, true, serDeProperties, nil, hints, nil)
if err != nil {
log.Fatalf("%+v", err)
}
}
获取表列表
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
ts := project.Tables()
ts.List(
func(t *odps.Table, err error) {
if err != nil {
log.Fatalf("%+v", err)
}
println(t.Name())
},
// 按表名前缀过滤
odps.TableFilter.NamePrefix("all_type"),
// 按表类型过滤. 其他表类型包括:VirtualView、ExternalTable
odps.TableFilter.Type(odps.ManagedTable),
)
}
获取单个表信息
判断表是否存在
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
ok, err := table.Exists()
if err != nil {
log.Fatalf("%+v", err)
}
println(ok)
}
获取表的大小、行数
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// 获取表大小(以字节为单位)
size := table.Size()
println("size = ", size)
// 获取表的行数
rowCount := table.RecordNum()
println("rowCount = ", rowCount)
}
获取表的CreatedTime、LastDDLTime、ModifiedTime
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// 获取表的创建时间
createTime := table.CreatedTime()
println("create time = ", createTime)
// 获取最近一次执行DDL操作的时间
lastDDLTime := table.LastDDLTime()
println("last ddl time = ", lastDDLTime)
// 获取最近一次修改表的时间
lastModifiedTime := table.LastModifiedTime()
println("last modified time = ", lastModifiedTime)
}
获取表的所有者
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// 获取表Owner
owner := table.Owner()
println("owner is ", owner)
}
获取表类型
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// 获取表类型
t := table.Type()
println("type is ", t)
}
获取表结构
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"strings"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")
err = table.Load()
if err != nil {
log.Fatalf("%+v", err)
}
// 获取表的Schema
schema := table.Schema()
println("table name = ", schema.TableName)
if table.LifeCycle() > 0 {
println("table lifecycle = ", table.LifeCycle())
}
// 获取列
for _, c := range schema.Columns {
fmt.Printf("column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}
// 获取分区列
for _, c := range schema.PartitionColumns {
fmt.Printf("partition column %s %s comment '%s'\n", c.Name, c.Type, c.Comment)
}
// 获取集群信息
if schema.ClusterInfo.ClusterType != "" {
ci := schema.ClusterInfo
println("cluster type = ", ci.ClusterType)
println("cluster columns = ", strings.Join(ci.ClusterCols, ", "))
println("cluster bucket num = ", ci.BucketNum)
}
}
删除表
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("test_cluster_table")
err = table.Delete()
if err != nil {
log.Fatalf("%+v", err)
}
}
分区管理
获取分区列表
通过MaxCompute SDK既可以获取一个表的所有“分区值”列表,也可以获取一个表的所有“分区对象”列表。“分区对象”中包含分区的一些基本信息,如size、lastModifiedTime等。
获取分区列表值
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// 获取配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
partitionValues, err := table.GetPartitionValues()
if err != nil {
log.Fatalf("%+v", err)
}
for _, pv := range partitionValues {
println(pv)
}
}
获取分区对象列表
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
partitions, err := table.GetPartitions()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("get %d partitions\n", len(partitions))
for _, p := range partitions {
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
}
}
获取单个分区信息
获取分区的基本信息
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
}
获取分区的扩展信息
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
p, err := table.GetPartition("p1=20/p2=hangzhou")
if err != nil {
log.Fatalf("%+v", err)
}
// 获取基本分区信息
fmt.Printf(
"value=%s, createTime=%s, lastDDLTime=%s, lastModifiedTime=%s, size=%d\n",
p.Value(), p.CreatedTime(), p.LastDDLTime(), p.LastModifiedTime(), p.Size(),
)
// 获取扩展分区信息
err = p.LoadExtended()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf(
"isArchived=%t, lifeCycle=%d, physicalSize=%d",
p.IsArchivedEx(), p.LifeCycleEx(), p.PhysicalSizeEx(),
)
}
添加分区
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.AddPartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
log.Fatalf("%+v", err)
}
}
删除分区
package main
import (
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
project := odpsIns.Project(conf.ProjectName)
tables := project.Tables()
table := tables.Get("all_types_demo")
err = table.DeletePartitions(true, []string{"p1=23/p2=beijing", "p1=24/p2=shanghai"})
if err != nil {
log.Fatalf("%+v", err)
}
}
Instance管理
MaxCompute执行SQL后返回Instance对象,Instance表示MaxCompute SQL作业,用于追踪SQL执行状态、结果。
获取Instance列表
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
"time"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
timeFormat := "2006-01-02 15:04:05"
startTime, _ := time.Parse(timeFormat, "2024-10-11 02:15:30")
endTime, _ := time.Parse(timeFormat, "2024-10-13 06:22:02")
var f = func(i *odps.Instance) {
if err != nil {
log.Fatalf("%+v", err)
}
println(
fmt.Sprintf(
"%s, %s, %s, %s, %s",
i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
))
}
instances := odpsIns.Instances()
instances.List(
f,
odps.InstanceFilter.TimeRange(startTime, endTime),
odps.InstanceFilter.Status(odps.InstanceTerminated),
)
}
获取Instance信息
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
ins := odpsIns.Instances().Get("<yourInstanceId>")
err = ins.Load()
if err != nil {
log.Fatalf("%+v", err)
}
fmt.Printf("owner=%s\n", ins.Owner())
fmt.Printf("status=%s\n", ins.Status())
fmt.Printf("startTime=%s\n", ins.StartTime())
fmt.Printf("endTime=%s\n", ins.EndTime())
fmt.Printf("result=%+v\n", ins.TaskResults())
}
权限管理
MaxCompute可以通过操作权限的相关命令进行权限管理,关于授权方案详情,请参见授权方案概述。下述示例通过DESC ROLE
命令查看角色的相关信息。
package main
import (
"fmt"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
"github.com/aliyun/aliyun-odps-go-sdk/odps/security"
"log"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
var restClient = odpsIns.RestClient()
sm := security.NewSecurityManager(restClient, conf.ProjectName)
result, err := sm.RunQuery("desc role role_project_admin;", true, "")
if err != nil {
log.Fatalf("%+v", err)
}
println(fmt.Sprintf("ok: %s", result))
}
Logview
您可以通过Logview查看已提交的MaxCompute作业,并进行Debug调试,详情请参见使用Logview查看作业运行信息。
package main
import (
"log"
"github.com/aliyun/aliyun-odps-go-sdk/odps"
"github.com/aliyun/aliyun-odps-go-sdk/odps/account"
)
func main() {
// 指定配置文件路径
configPath := "./config.ini"
conf, err := odps.NewConfigFromIni(configPath)
if err != nil {
log.Fatalf("%+v", err)
}
aliAccount := account.NewAliyunAccount(conf.AccessId, conf.AccessKey)
odpsIns := odps.NewOdps(aliAccount, conf.Endpoint)
// 设置默认的MaxCompute项目
odpsIns.SetDefaultProjectName(conf.ProjectName)
sql := "select * from all_types_demo where p1>0 or p2 > '';"
// SQL引擎参数, 例如odps.sql.skewjoin
var hints map[string]string = nil
// 创建一个SqlTask
sqlTask := odps.NewSqlTask("select", sql, hints)
// 使用项目关联的Quota运行SQL
project := odpsIns.DefaultProjectName()
ins, err := sqlTask.Run(odpsIns, project)
if err != nil {
log.Fatalf("%+v", err)
}
logView, err := odpsIns.LogView().GenerateLogView(ins, 1)
if err != nil {
log.Fatalf("%+v", err)
}
println(logView)
}