介绍
Go 的 High-Level sdk 目前只支持 Producer,Consumer 暂时还不支持,Consumer 支持后,会在此文档中进行更新。
身份认证
AccessKey(简称AK)是阿里云提供给阿里云用户的访问密钥,用于访问阿里云OpenAPI时的身份验证。AccessKey包括AccessKey ID和AccessKey Secret,需妥善保管。AK如果泄露,会威胁该账号下所有资源的安全。访问阿里云OpenAPI时,如果在代码中硬编码明文AK,容易因代码仓库权限管理不当造成AK泄露。
Alibaba Cloud Credentials是阿里云为阿里云开发者用户提供的身份凭证管理工具。配置了Credentials默认凭据链后,访问阿里云OpenAPI时,您无需在代码中硬编码明文AK,可有效保证您账号下云资源的安全。
前提条件
已获取RAM用户账号的AccessKey ID和AccessKey Secret。相关操作,请参见查看RAM用户的AccessKey信息。
配置方案
本文示例的全部通采用配置环境变量方式获取 AK 信息,更多方式请访问管理访问凭据
配置方法
配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。
Linux和macOS系统配置方法
执行以下命令:
export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
<access_key_id>
需替换为已准备好的AccessKey ID,<access_key_secret>
替换为AccessKey Secret。
Windows系统配置方法
新建环境变量文件,添加环境变量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
,并写入已准备好的AccessKey ID和AccessKey Secret。重启Windows系统。
代码示例
credential, err := credentials.NewCredential(nil)
if err != nil {
fmt.Println(err)
// TODO: handle error
}
Producer 介绍
Go SDK 支持同步和异步两种写入方式,下面会分别介绍两种写入方式。
依赖如下所示,也可以去 github 上查看当前最新的版本。
require (
github.com/aliyun/aliyun-datahub-sdk-go v1.1.0
)
参数介绍
参数名称 | 类型 | 是否必须 | 默认值 | 场景 | 描述 |
Account | Account | 是 | - | 同步 / 异步 | 账号类型 |
UserAgent | string | 否 | - | 同步 / 异步 | 请求的 UserAgent |
Endpoint | string | 是 | - | 同步 / 异步 | 访问 DataHub 服务的域名 |
Project | string | 是 | - | 同步 / 异步 | 访问 DataHub 的 project 名称 |
Topic | string | 是 | - | 同步 / 异步 | 访问的 DataHub 的 topic 名称 |
MaxRetry | int | 否 | 3 | 同步 / 异步 | 请求出错时的重试次数 |
RetryInterval | time.Duration | 否 | 500ms | 同步 / 异步 | 每次重试的间隔 |
SendStrategy | SendStrategy | 否 | RoundRobin | 同步 / 异步 | 不指定 shard 发送数据时,选择 shard 的策略。 RoundRobin:所有 shard 轮询; Random:随机; |
Parittioner | 函数指针 | 否 | DefaultPartitionFunc | 同步 / 异步 | 通过哈希值选择 shard 时的方法,用户可以根据需求自定义,也可以使用默认哈希函数。 |
Protocol | Protocol | 否 | Batch | 同步 / 异步 | 写入 DataHub 的时的序列化协议,目前支持: Batch/Protobuf,一般情况下,不建议用户修改此参数。 |
MaxAsyncFlightingNum | int | 否 | 16 | 异步 | 异步写入攒批完成,等待写入的数据批数,每个 shard 单独限制,打满后会阻塞异步写入的调用,主要用于防止程序 OOM。一般情况下,不建议修改此参数。 |
MaxAsyncBufferNum | int | 否 | 1000 | 异步 | 异步写入时,默认的攒批条数。 |
MaxAsyncBufferTime | time.Duration | 否 | 5s | 异步 | 异步写入时,默认的攒批超时时间。 |
EnableSuccessCh | bool | 否 | true | 异步 | 异步写入时,如果为 true,所有写入成功的数据,都会放入 channel 中,用户需要及时消费掉 AsyncProducer.Successes()中的数据,否则会阻塞发送。 |
EnableErrorCh | bool | 否 | true | 异步 | 异步写入时,如果为 true,所有写入失败的数据,都会放入 channel 中,用户需要及时消费掉 AsyncProducer.Errors()中的数据,否则会阻塞发送。 |
异步写入(推荐)
异步写入可以让用户不攒批,委托给 sdk 进行攒批,只需要监控回调的 channel 来处理写入成功和写入失败的做出相应的处理,这可以极大的降低用户使用的复杂度。
func handleSuccessRun(producer datahub.AsyncProducer) {
for suc := range producer.Successes() {
// handle request success
fmt.Printf("shard:%s, rid:%s, records:%d, latency:%v\n",
suc.ShardId, suc.RequestId, len(suc.Records), suc.Latency)
}
}
func handleFailedRun(producer datahub.AsyncProducer) {
// handle request failed
for err := range producer.Errors() {
fmt.Printf("shard:%s, records:%d, latency:%v, error:%v\n",
err.ShardId, len(err.Records), err.Latency, err.Err)
}
}
func main() {
cfg := datahub.NewProducerConfig()
credential, err := credentials.NewCredential(nil)
if err != nil {
fmt.Println(err)
// TODO: handle error
}
cfg.Account = datahub.NewCredentialAccount(credential)
cfg.Endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
cfg.Project = "test_project"
cfg.Topic = "test_topic"
producer := datahub.NewAsyncProducer(cfg)
err = producer.Init()
if err != nil {
// TODO: handle error
fmt.Println(err)
}
schema, err := producer.GetSchema()
if err != nil {
// TODO: handle error
fmt.Println(err)
}
// 处理success channle
go handleSuccessRun(producer)
// 处理error channle
go handleFailedRun(producer)
// 循环生成数据
for i := 0; i < 1000; i++ {
record := datahub.NewTupleRecord(schema)
// 每次设置值都需要检查设置是否成功
err = record.SetValueByName("f1", "val1")
if err != nil {
fmt.Println(err)
return
}
err = record.SetValueByName("f2", 1234)
if err != nil {
fmt.Println(err)
return
}
producer.Input() <- record
}
err = producer.Close()
if err != nil {
fmt.Println(err)
}
}
Hash 写入
如果数据有保序的需求,那么需要根据一些信息进行 hash,相同 hash 值的数据会写入到同一个 shard,单个 shard 的数据是可以保证顺序的,一般 hash 写入建议使用异步的方式写入。
func handleSuccessRun(producer datahub.AsyncProducer) {
for suc := range producer.Successes() {
// handle request success
fmt.Printf("shard:%s, rid:%s, records:%d, latency:%v\n",
suc.ShardId, suc.RequestId, len(suc.Records), suc.Latency)
}
}
func handleFailedRun(producer datahub.AsyncProducer) {
// handle request failed
for err := range producer.Errors() {
fmt.Printf("shard:%s, records:%d, latency:%v, error:%v\n",
err.ShardId, len(err.Records), err.Latency, err.Err)
}
}
func main() {
cfg := datahub.NewProducerConfig()
credential, err := credentials.NewCredential(nil)
if err != nil {
fmt.Println(err)
// TODO: handle error
}
cfg.Account = datahub.NewCredentialAccount(credential)
cfg.Endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
cfg.Project = "test_project"
cfg.Topic = "test_topic"
producer := datahub.NewAsyncProducer(cfg)
err = producer.Init()
if err != nil {
// TODO: handle error
fmt.Println(err)
}
schema, err := producer.GetSchema()
if err != nil {
// TODO: handle error
fmt.Println(err)
}
// 处理success channle
go handleSuccessRun(producer)
// 处理error channle
go handleFailedRun(producer)
// 循环生成数据
for i := 0; i < 1000; i++ {
record := datahub.NewTupleRecord(schema)
// 每次设置值都需要检查设置是否成功
err = record.SetValueByName("f1", "val1")
if err != nil {
fmt.Println(err)
return
}
err = record.SetValueByName("f2", 1234)
if err != nil {
fmt.Println(err)
return
}
// 对record设置hash值
record.SetPartitionKey("pk-val")
producer.Input() <- record
}
err = producer.Close()
if err != nil {
fmt.Println(err)
}
}
同步写入
同步写入需要用户自行攒批,推荐攒批 512K~1MB。
func main() {
cfg := datahub.NewProducerConfig()
credential, err := credentials.NewCredential(nil)
if err != nil {
fmt.Println(err)
// TODO: handle error
}
cfg.Account = datahub.NewCredentialAccount(credential)
cfg.Endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
cfg.Project = "test_project"
cfg.Topic = "test_topic"
producer := datahub.NewProducer(cfg)
err = producer.Init()
if err != nil {
// TODO: handle error
fmt.Println(err)
}
schema, err := producer.GetSchema()
if err != nil {
// TODO: handle error
fmt.Println(err)
}
// 循环生成数据,推荐攒批512K~1MB
records := make([]datahub.IRecord, 0)
for i := 0; i < 1000; i++ {
record := datahub.NewTupleRecord(schema)
err = record.SetValueByName("f1", "val1")
if err != nil {
fmt.Println(err)
return
}
err = record.SetValueByName("f2", 1234)
if err != nil {
fmt.Println(err)
return
}
records = append(records, record)
}
shardId, err := producer.Send(records)
if err != nil {
fmt.Printf("write failed, error:%v\n", err)
} else {
fmt.Printf("write success, shardId:%v\n", shardId)
}
err = producer.Close()
if err != nil {
fmt.Println(err)
}
}