Go High-Level SDK

介绍

Go 的 High-Level sdk 目前只支持 Producer,Consumer 暂时还不支持,Consumer 支持后,会在此文档中进行更新。

身份认证

AccessKey(简称AK)是阿里云提供给阿里云用户的访问密钥,用于访问阿里云OpenAPI时的身份验证。AccessKey包括AccessKey IDAccessKey Secret,需妥善保管。AK如果泄露,会威胁该账号下所有资源的安全。访问阿里云OpenAPI时,如果在代码中硬编码明文AK,容易因代码仓库权限管理不当造成AK泄露。

Alibaba Cloud Credentials是阿里云为阿里云开发者用户提供的身份凭证管理工具。配置了Credentials默认凭据链后,访问阿里云OpenAPI时,您无需在代码中硬编码明文AK,可有效保证您账号下云资源的安全。

前提条件

配置方案

本文示例的全部通采用配置环境变量方式获取 AK 信息,更多方式请访问管理访问凭据

配置方法

配置环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

LinuxmacOS系统配置方法

执行以下命令:

export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>

<access_key_id>需替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。

Windows系统配置方法
  1. 新建环境变量文件,添加环境变量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET,并写入已准备好的AccessKey IDAccessKey Secret。

  2. 重启Windows系统。

代码示例
credential, err := credentials.NewCredential(nil)
if err != nil {
    fmt.Println(err)
    // TODO: handle error
}

Producer 介绍

Go SDK 支持同步和异步两种写入方式,下面会分别介绍两种写入方式。

依赖如下所示,也可以去 github 上查看当前最新的版本。

require (
	github.com/aliyun/aliyun-datahub-sdk-go v1.1.0
)

参数介绍

参数名称

类型

是否必须

默认值

场景

描述

Account

Account

-

同步 / 异步

账号类型

UserAgent

string

-

同步 / 异步

请求的 UserAgent

Endpoint

string

-

同步 / 异步

访问 DataHub 服务的域名

Project

string

-

同步 / 异步

访问 DataHub 的 project 名称

Topic

string

-

同步 / 异步

访问的 DataHub 的 topic 名称

MaxRetry

int

3

同步 / 异步

请求出错时的重试次数

RetryInterval

time.Duration

500ms

同步 / 异步

每次重试的间隔

SendStrategy

SendStrategy

RoundRobin

同步 / 异步

不指定 shard 发送数据时,选择 shard 的策略。

RoundRobin:所有 shard 轮询;

Random:随机;

Parittioner

函数指针

DefaultPartitionFunc

同步 / 异步

通过哈希值选择 shard 时的方法,用户可以根据需求自定义,也可以使用默认哈希函数。

Protocol

Protocol

Batch

同步 / 异步

写入 DataHub 的时的序列化协议,目前支持: Batch/Protobuf,一般情况下,不建议用户修改此参数。

MaxAsyncFlightingNum

int

16

异步

异步写入攒批完成,等待写入的数据批数,每个 shard 单独限制,打满后会阻塞异步写入的调用,主要用于防止程序 OOM。一般情况下,不建议修改此参数。

MaxAsyncBufferNum

int

1000

异步

异步写入时,默认的攒批条数。

MaxAsyncBufferTime

time.Duration

5s

异步

异步写入时,默认的攒批超时时间。

EnableSuccessCh

bool

true

异步

异步写入时,如果为 true,所有写入成功的数据,都会放入 channel 中,用户需要及时消费掉 AsyncProducer.Successes()中的数据,否则会阻塞发送。

EnableErrorCh

bool

true

异步

异步写入时,如果为 true,所有写入失败的数据,都会放入 channel 中,用户需要及时消费掉 AsyncProducer.Errors()中的数据,否则会阻塞发送。

异步写入(推荐)

异步写入可以让用户不攒批,委托给 sdk 进行攒批,只需要监控回调的 channel 来处理写入成功和写入失败的做出相应的处理,这可以极大的降低用户使用的复杂度。

func handleSuccessRun(producer datahub.AsyncProducer) {
	for suc := range producer.Successes() {
		// handle request success
		fmt.Printf("shard:%s, rid:%s, records:%d, latency:%v\n",
			suc.ShardId, suc.RequestId, len(suc.Records), suc.Latency)
	}
}

func handleFailedRun(producer datahub.AsyncProducer) {
	// handle request failed
	for err := range producer.Errors() {
		fmt.Printf("shard:%s, records:%d, latency:%v, error:%v\n",
			err.ShardId, len(err.Records), err.Latency, err.Err)
	}
}

func main() {
	cfg := datahub.NewProducerConfig()
	credential, err := credentials.NewCredential(nil)
	if err != nil {
		fmt.Println(err)
		// TODO: handle error
	}
	cfg.Account = datahub.NewCredentialAccount(credential)
	cfg.Endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
	cfg.Project = "test_project"
	cfg.Topic = "test_topic"

	producer := datahub.NewAsyncProducer(cfg)
	err = producer.Init()

	if err != nil {
		// TODO: handle error
		fmt.Println(err)
	}

	schema, err := producer.GetSchema()
	if err != nil {
		// TODO: handle error
		fmt.Println(err)
	}

	// 处理success channle
	go handleSuccessRun(producer)

	// 处理error channle
	go handleFailedRun(producer)

	// 循环生成数据
	for i := 0; i < 1000; i++ {
		record := datahub.NewTupleRecord(schema)
        // 每次设置值都需要检查设置是否成功
		err = record.SetValueByName("f1", "val1")
		if err != nil {
			fmt.Println(err)
			return
		}

		err = record.SetValueByName("f2", 1234)
		if err != nil {
			fmt.Println(err)
			return
		}

		producer.Input() <- record
	}

	err = producer.Close()
	if err != nil {
		fmt.Println(err)
	}
}

Hash 写入

如果数据有保序的需求,那么需要根据一些信息进行 hash,相同 hash 值的数据会写入到同一个 shard,单个 shard 的数据是可以保证顺序的,一般 hash 写入建议使用异步的方式写入。

func handleSuccessRun(producer datahub.AsyncProducer) {
	for suc := range producer.Successes() {
		// handle request success
		fmt.Printf("shard:%s, rid:%s, records:%d, latency:%v\n",
			suc.ShardId, suc.RequestId, len(suc.Records), suc.Latency)
	}
}

func handleFailedRun(producer datahub.AsyncProducer) {
	// handle request failed
	for err := range producer.Errors() {
		fmt.Printf("shard:%s, records:%d, latency:%v, error:%v\n",
			err.ShardId, len(err.Records), err.Latency, err.Err)
	}
}

func main() {
	cfg := datahub.NewProducerConfig()
	credential, err := credentials.NewCredential(nil)
	if err != nil {
		fmt.Println(err)
		// TODO: handle error
	}
	cfg.Account = datahub.NewCredentialAccount(credential)
	cfg.Endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
	cfg.Project = "test_project"
	cfg.Topic = "test_topic"

	producer := datahub.NewAsyncProducer(cfg)
	err = producer.Init()

	if err != nil {
		// TODO: handle error
		fmt.Println(err)
	}

	schema, err := producer.GetSchema()
	if err != nil {
		// TODO: handle error
		fmt.Println(err)
	}

	// 处理success channle
	go handleSuccessRun(producer)

	// 处理error channle
	go handleFailedRun(producer)

	// 循环生成数据
	for i := 0; i < 1000; i++ {
		record := datahub.NewTupleRecord(schema)
        // 每次设置值都需要检查设置是否成功
		err = record.SetValueByName("f1", "val1")
		if err != nil {
			fmt.Println(err)
			return
		}

		err = record.SetValueByName("f2", 1234)
		if err != nil {
			fmt.Println(err)
			return
		}

        // 对record设置hash值
        record.SetPartitionKey("pk-val")
		producer.Input() <- record
	}

	err = producer.Close()
	if err != nil {
		fmt.Println(err)
	}
}

同步写入

同步写入需要用户自行攒批,推荐攒批 512K~1MB。

func main() {
	cfg := datahub.NewProducerConfig()
	credential, err := credentials.NewCredential(nil)
	if err != nil {
		fmt.Println(err)
		// TODO: handle error
	}
	cfg.Account = datahub.NewCredentialAccount(credential)
	cfg.Endpoint = "https://dh-cn-hangzhou.aliyuncs.com"
	cfg.Project = "test_project"
	cfg.Topic = "test_topic"

	producer := datahub.NewProducer(cfg)
	err = producer.Init()

	if err != nil {
		// TODO: handle error
		fmt.Println(err)
	}

	schema, err := producer.GetSchema()
	if err != nil {
		// TODO: handle error
		fmt.Println(err)
	}

	// 循环生成数据,推荐攒批512K~1MB
	records := make([]datahub.IRecord, 0)
	for i := 0; i < 1000; i++ {
		record := datahub.NewTupleRecord(schema)
		err = record.SetValueByName("f1", "val1")
		if err != nil {
			fmt.Println(err)
			return
		}

		err = record.SetValueByName("f2", 1234)
		if err != nil {
			fmt.Println(err)
			return
		}

		records = append(records, record)
	}

	shardId, err := producer.Send(records)
	if err != nil {
		fmt.Printf("write failed, error:%v\n", err)
	} else {
		fmt.Printf("write success, shardId:%v\n", shardId)
	}

	err = producer.Close()
	if err != nil {
		fmt.Println(err)
	}
}