console命令工具

您可以通过console工具访问DataHub项目并运行命令。本文为您介绍如何安装、配置和运行客户端并提供客户端相关使用说明信息。

前提条件

在使用console工具前,请您确认已满足如下条件:

  • 待安装console的设备已安装Java 8或以上版本

安装并配置console客户端

  1. 下载命令行工具进行datahub_console.tar.gz并解压

  2. 解压下载的安装包文件,得到bin、conf、lib文件夹

  3. 进入conf文件夹,在conf目录中的datahub.properties文件填写ak endpoint信息,文件内容如下

datahub.accessid=
datahub.accesskey=
datahub.endpoint=

参数详细信息如下:

参数

是否必填

描述

示例

datahub.accessid

阿里云账号或RAM用户的AccessKey ID。

datahub.accesskey

AccessKey ID对应的AccessKey Secret。

datahub.endpoint

DataHub服务的连接地址。

您需要根据创建DataHub项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见域名列表

https://dh-cn-hangzhou.aliyuncs.com

运行console客户端

console客户端可通过如下方式启动,您可以任选其中一种:

  • 方式一:在console客户端安装路径下的bin文件夹中,双击datahubcmd.bat文件(Windows系统),即可启动console客户端,返回如下信息,表明已经成功启动

image.png

  • 方式二:在系统的命令行执行窗口,进入console客户端安装路径下的bin目录,执行datahubcmd命令(Windows系统)或者 sh datahubcmd.sh (Linux系统或者Mac系统),即可启动console客户端,如下图所示表明已经成功连接到DataHub

image.png

获取命令帮助

您可以通过如下方式快速获取console客户端的命令帮助,您可以任选其中一种:

方式一:在console客户端查看命令帮助信息

  • 查看全部命令的帮助

help
  • 通过指定关键字查看相关命令帮助信息。

例如获取Topic列表

DataHub=>help lt
NAME
        lt - List topic

SYNOPSYS
        lt [-p] string

OPTIONS
        -p  string
                projectName
                [Mandatory]

方式二:在系统的命令行执行窗口,切换到console客户端安装路径下的bin目录,执行如下命令查看全部命令的帮助信息

...\bin>datahubcmd help

使用指南

Project操作

创建Project

  • -p: project名称

  • -c: project描述

cp -p test_project  -c test_comment

删除Project

  • -p: project名称

dp -p test_project

注意:删除Project前需要删除Project下所有的资源(包括Topic以及Topic中的订阅和同步任务),否则删除报错

获取Project列表

lp

Topic操作

创建Topic

  • -p:project名称

  • -t: topic名称

  • -m: 表示不同的Topic类型,BLOB代表创建BLOB类型的Topic,Tuple表示创建Tuple类型的Topic

  • -f: Tuple类型 Topic字段格式为[(fieldName,fieldType,isNull)],多个字段以逗号隔开

  • -s: shard数量

  • -l: 数据生命周期,范围(1-7)天

  • -c: topic描述

ct -p test_project -t test_topic -m TUPLE -f [(name,string,true)] -s 3 -l 3 -c test_comment

删除Topic

  • -p: project名称

  • -t: topic名称

dt -p test_project -t test_topic

获取Topic信息

  • -p: project名称

  • -t: topic名称

gt -p test_project -t test_topic

导出Topic schema结构为JSON文件

  • -f:保存文件路径

  • -p: project名称

  • -t: topic名称

gts -f filepath -p test_project -t test_topic

获取Topic列表

  • -p: project名称

lt -p test_project

导入Json文件创建Topic

  • -s: shard数量

  • -l: 数据生命周期,范围(1-7)天

  • -f: 文件路径

  • -p: project名称

  • -t: topic名称

rtt -s 3 -l 3 -c test_comment -f filepath -p test_project -t test_topic

修改Topic生命周期

  • -p: project名称

  • -t: topic名称

  • -l: topic生命周期

  • -c: topic描述

utl -p test_project -t test_topic -l 3 -c test_comment

Connector操作

创建ODPS connector

  • -p: project名称

  • -t: topic名称

  • -m: 参数表示不同的同步类型,目前同步到odps的有SYSTEM_TIME、USER_DEFINE、EVENT_TIME、META_TIME四种同步类型

  • -e: odps endpoint,请填写经典网络地址

  • -op: odps Project名称

  • -oa: 访问odps的accessId

  • -ok: 访问odps的accessKey

  • -tr参数表示分区的时间间隔,console工具默认为60分钟

  • -tf参数分区格式,ds 表示按天分区,ds hh表示按小时分区,ds hh mm表示按分钟分区

coc -p test_project -t test_topic -m SYSTEM_TIME -e odpsEndpoint -op odpsProject -ot odpsTable -oa odpsAccessId -ok odpsAccessKey -tr 60 -c (field1,field2) -tf ds hh mm

同步odps新增字段

  • -p: project名称

  • -t: topic名称

  • -c: connectorId,可通过数据同步页签查看

  • -f: fieldName,新增字段名称

acf -p test_project -t test_topic -c connectorId -f fieldName

创建同步到MYSQL/RDS connector

  • -p: project名称

  • -t: topic名称

  • -h: host,请填写经典网络地址

  • -po: port

  • -ty参数表示同步的类型,共有两种

  • SINK_MYSQL表示创建同步到MySQL的connector

  • SINK_ADS 表示创建同步到ads的connector

  • -d: database名称

  • -ta: table名称

  • -u: userName

  • -pa: password

  • -ht表示插入方式,共有两种

  • IGNORE

  • OVERWRITE

  • -n表示同步的字段,示例:(field1,field2)

cdc -p test_project -t test_topic -h host -po 3306 -ty mysql -d mysql_database -ta msyql_table -u username -pa password -ht IGNORE -n (field1,field2)

创建 DATAHUB connector

  • -p: project名称

  • -t: topic名称

  • -sp: sinkProject,数据导入的Project

  • -st: sinkTopic,数据导入的Topic

  • -m: 表示认证类型

  • AK表示通过AK认证,需要填写accessId和accessKey

  • STS表示通过STS认证

cdhc -p test_project -t test_topic -sp sinkProject -st sinkTopic -m AK -i accessid k accessKey

创建FC connector

  • -p: project名称

  • -t: topic名称

  • -e: fc endpoint,请填写经典网络地址

  • -s: fc Service名称

  • -f: fc Function名称

  • -au: 认证方式

  • AK表示通过AK认证,需要填写accessId和accessKey

  • STS表示通过STS认证

  • -n表示同步的字段,例如:(field1,field2)

cfc -p test_project -t test_topic -e endpoint -s service -f function -au AK -i accessId -k accessKey -n (field1,field2)

创建HOLOGRES connector

  • -p: project名称

  • -t: topic名称

  • -e: endpoint

  • -cl: 同步到hologres的字段

  • -au表示认证方式,目前同步到holo只支持AK认证

  • -m表示解析类型,选择Delimiter需要指定lineDelimiter、parseData、columnDelimiter属性,选择IngormaticaJson需要指定parseData属性

  • Delimiter

  • InformaticaJson

chc -p test_project -t test_topic -e endpoint -cl (field,field2) -au AK -hp holoProject -ht holoTopic -i accessId -k accessKey -m Delimiter -l 1 -b false -n (field1,field2)

创建OTSconnector

  • -p: project名称

  • -t: topic名称

  • -it: ots Instance名称

  • -m表示认证类型,默认使用STS

  • AK表示通过AK认证,需要填写accessId和accessKey

  • STS表示通过STS认证

  • -t: ots Table名称

  • -wm表示写入方式,支持两种写入方式

  • PUT

  • UPDATE

  • -c表示同步的字段,例如:(field1,field2)

cotsc -p test_project -t test_topic -i accessId -k accessKey -it instanceId -m AK -t table -wm PUT -c (field1,field2)

创建 OSS connector

  • -p: project名称

  • -t: topic名称

  • -b: oss Bucket名称

  • -e: oss Endpoint名称

  • -pr: 同步到OSS的目录前缀

  • -tf:同步时间格式,例如:%Y%m%d%H%M表示按照分钟级别进行分区

  • -tr: 分区的时间间隔

  • -c: 同步字段

csc -p test_project -t test_topic -b bucket -e endpoint -pr ossPrefix -tf ossTimeFormat -tr timeRange -c (f1,f2)

删除connector(可传入多个connectorid,以空格分隔)

  • -p: project名称

  • -t: topic名称

  • -c: connectorId,可通过数据同步页签查看

dc -p test_project -t test_topic -c connectorId

获取connector详情信息

  • -p: project名称

  • -t: topic名称

  • -c: connectorId,可通过数据同步页签查看

gc -p test_project -t test_topic -c connectorId

获取某个Topic下面的connector列表

  • -p: project名称

  • -t: topic名称

lc -p test_project -t test_topic

重启connector

  • -p: project名称

  • -t: topic名称

  • -c: connectorId,可通过数据同步页签查看

rc -p test_project -t test_topic -c connectorId

更新connector ak

  • -p: project名称

  • -t: topic名称

  • -ty: 同步类型 比如:SINK_ODPS

uca -p test_project -t test_topic -ty SINK_ODPS  -a accessId -k accessKey

shard操作

合并shard

  • -p: project名称

  • -t: topic名称

  • -s: 要合并的shardId

  • -a: 要合并的另一个shardId

ms -p test_project -t test_topic -s shardId -a adjacentShardId

分裂shard

  • -p: project名称

  • -t: topic名称

  • -s: 要分裂的shardId

ss -p test_project -t test_topic -s shardId

获取某个topic下面的所有shard

  • -p: project名称

  • -t: topic名称

ls -p test_project -t topicName

获取同步shard的状态

  • -p: project名称

  • -t: topic名称

  • -s: shardId

  • -c: connectorId,可通过数据同步页签查看

gcs -p test_project -t test_topic -s shardId -c connectorId

获取订阅消费的每个shard点位

  • -p: project名称

  • -t: topic名称

  • -s: 订阅id

  • -i: shardId

gso -p test_project -t test_topic -s subid -i shardId

订阅操作

创建订阅

  • -p: project名称

  • -t: topic名称

  • -c: 订阅描述

css -p test_project -t test_topic -c comment

删除订阅

  • -p: project名称

  • -t: topic名称

  • -s: 订阅id

dsc -p test_project -t test_topic -s subId

查询订阅列表

  • -p: project名称

  • -t: topic名称

lss -p test_project -t test_topic

上传下载数据

上传数据

  • -f: 参数表示文件路径,注意:windows路径下请添加转义符,例如:D:\\test\\test.txt

  • -p project名称

  • -t: topic名称

  • -m: 参数表示文本分隔符,目前支持逗号、空格分隔符

  • -n: 参数表示每次上传batchsize大小,默认为1000

uf -f filepath -p test_topic -t test_topic -m "," -n 1000

示例: CSV文件上传

下面以CSV文件为例,说明下如何使用console工具将CSV文件上传到DataHub数据。CSV文件的格式如下所示:

1. 0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
2. 1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
3. 2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
4. 3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
5. 4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
6. 5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
7. 6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
8. 7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
9. 8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
10. 9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
11. 10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111

上述CSV文件中每行一条Record,按照(,)区分字段。保存在本地路径/temp/test.csv中。DataHub Topic格式如下:

字段名称

字段类型

id

BIGINT

name

STRING

gender

BOOLEAN

salary

DOUBLE

my_time

TIMESTAMP

使用console工具命令如下

uf -f /temp/test.csv -p test_topic -t test_topic -m "," -n 1000

下载数据

  • -f: 参数表示文件路径,注意:windows路径下请添加转义符,例如:D:\\test\\test.txt

  • -p: project名称

  • -t: topic名称

  • -s: shardId

  • -d: 订阅id

  • -f: 下载路径

  • -ti: 参数表示读取该时间之后的点位,格式为:yyyy-mm-dd hh:mm:ss

  • -l: 参数表示每次读取的数量

  • -g: 参数表示是否一直读

  • 0表示只读一次,即获取当前recordsize后不再消费

  • 1表示一直读取

down -p test_project -t test_topic -s shardId -d subId -f filePath -ti "1970-01-01 00:00:00" -l 100 -g 0

常见问题

  • 脚本启动失败:windows环境下运行脚本检查脚本路径是否包含括号