您可以通过console工具访问DataHub项目并运行命令。本文为您介绍如何安装、配置和运行客户端并提供客户端相关使用说明信息。
前提条件
在使用console工具前,请您确认已满足如下条件:
待安装console的设备已安装Java 8或以上版本
安装并配置console客户端
下载命令行工具进行datahub_console.tar.gz并解压
解压下载的安装包文件,得到bin、conf、lib文件夹
进入conf文件夹,在conf目录中的datahub.properties文件填写ak endpoint信息,文件内容如下
datahub.accessid=
datahub.accesskey=
datahub.endpoint=
参数详细信息如下:
参数 | 是否必填 | 描述 | 示例 |
datahub.accessid | 是 | 阿里云账号或RAM用户的AccessKey ID。 | 无 |
datahub.accesskey | 是 | AccessKey ID对应的AccessKey Secret。 | 无 |
datahub.endpoint | 是 | DataHub服务的连接地址。 您需要根据创建DataHub项目时选择的地域以及网络连接方式配置Endpoint。各地域及网络对应的Endpoint值,请参见域名列表 | https://dh-cn-hangzhou.aliyuncs.com |
运行console客户端
console客户端可通过如下方式启动,您可以任选其中一种:
方式一:在console客户端安装路径下的bin文件夹中,双击datahubcmd.bat文件(Windows系统),即可启动console客户端,返回如下信息,表明已经成功启动
方式二:在系统的命令行执行窗口,进入console客户端安装路径下的bin目录,执行datahubcmd命令(Windows系统)或者 sh datahubcmd.sh (Linux系统或者Mac系统),即可启动console客户端,如下图所示表明已经成功连接到DataHub
获取命令帮助
您可以通过如下方式快速获取console客户端的命令帮助,您可以任选其中一种:
方式一:在console客户端查看命令帮助信息
查看全部命令的帮助
help
通过指定关键字查看相关命令帮助信息。
例如获取Topic列表
DataHub=>help lt
NAME
lt - List topic
SYNOPSYS
lt [-p] string
OPTIONS
-p string
projectName
[Mandatory]
方式二:在系统的命令行执行窗口,切换到console客户端安装路径下的bin目录,执行如下命令查看全部命令的帮助信息
...\bin>datahubcmd help
使用指南
Project操作
创建Project
-p: project名称
-c: project描述
cp -p test_project -c test_comment
删除Project
-p: project名称
dp -p test_project
注意:删除Project前需要删除Project下所有的资源(包括Topic以及Topic中的订阅和同步任务),否则删除报错
获取Project列表
lp
Topic操作
创建Topic
-p:project名称
-t: topic名称
-m: 表示不同的Topic类型,BLOB代表创建BLOB类型的Topic,Tuple表示创建Tuple类型的Topic
-f: Tuple类型 Topic字段格式为[(fieldName,fieldType,isNull)],多个字段以逗号隔开
-s: shard数量
-l: 数据生命周期,范围(1-7)天
-c: topic描述
ct -p test_project -t test_topic -m TUPLE -f [(name,string,true)] -s 3 -l 3 -c test_comment
删除Topic
-p: project名称
-t: topic名称
dt -p test_project -t test_topic
获取Topic信息
-p: project名称
-t: topic名称
gt -p test_project -t test_topic
导出Topic schema结构为JSON文件
-f:保存文件路径
-p: project名称
-t: topic名称
gts -f filepath -p test_project -t test_topic
获取Topic列表
-p: project名称
lt -p test_project
导入Json文件创建Topic
-s: shard数量
-l: 数据生命周期,范围(1-7)天
-f: 文件路径
-p: project名称
-t: topic名称
rtt -s 3 -l 3 -c test_comment -f filepath -p test_project -t test_topic
修改Topic生命周期
-p: project名称
-t: topic名称
-l: topic生命周期
-c: topic描述
utl -p test_project -t test_topic -l 3 -c test_comment
Connector操作
创建ODPS connector
-p: project名称
-t: topic名称
-m: 参数表示不同的同步类型,目前同步到odps的有SYSTEM_TIME、USER_DEFINE、EVENT_TIME、META_TIME四种同步类型
-e: odps endpoint,请填写经典网络地址
-op: odps Project名称
-oa: 访问odps的accessId
-ok: 访问odps的accessKey
-tr参数表示分区的时间间隔,console工具默认为60分钟
-tf参数分区格式,ds 表示按天分区,ds hh表示按小时分区,ds hh mm表示按分钟分区
coc -p test_project -t test_topic -m SYSTEM_TIME -e odpsEndpoint -op odpsProject -ot odpsTable -oa odpsAccessId -ok odpsAccessKey -tr 60 -c (field1,field2) -tf ds hh mm
同步odps新增字段
-p: project名称
-t: topic名称
-c: connectorId,可通过数据同步页签查看
-f: fieldName,新增字段名称
acf -p test_project -t test_topic -c connectorId -f fieldName
创建同步到MYSQL/RDS connector
-p: project名称
-t: topic名称
-h: host,请填写经典网络地址
-po: port
-ty参数表示同步的类型,共有两种
SINK_MYSQL表示创建同步到MySQL的connector
SINK_ADS 表示创建同步到ads的connector
-d: database名称
-ta: table名称
-u: userName
-pa: password
-ht表示插入方式,共有两种
IGNORE
OVERWRITE
-n表示同步的字段,示例:(field1,field2)
cdc -p test_project -t test_topic -h host -po 3306 -ty mysql -d mysql_database -ta msyql_table -u username -pa password -ht IGNORE -n (field1,field2)
创建 DATAHUB connector
-p: project名称
-t: topic名称
-sp: sinkProject,数据导入的Project
-st: sinkTopic,数据导入的Topic
-m: 表示认证类型
AK表示通过AK认证,需要填写accessId和accessKey
STS表示通过STS认证
cdhc -p test_project -t test_topic -sp sinkProject -st sinkTopic -m AK -i accessid k accessKey
创建FC connector
-p: project名称
-t: topic名称
-e: fc endpoint,请填写经典网络地址
-s: fc Service名称
-f: fc Function名称
-au: 认证方式
AK表示通过AK认证,需要填写accessId和accessKey
STS表示通过STS认证
-n表示同步的字段,例如:(field1,field2)
cfc -p test_project -t test_topic -e endpoint -s service -f function -au AK -i accessId -k accessKey -n (field1,field2)
创建HOLOGRES connector
-p: project名称
-t: topic名称
-e: endpoint
-cl: 同步到hologres的字段
-au表示认证方式,目前同步到holo只支持AK认证
-m表示解析类型,选择Delimiter需要指定lineDelimiter、parseData、columnDelimiter属性,选择IngormaticaJson需要指定parseData属性
Delimiter
InformaticaJson
chc -p test_project -t test_topic -e endpoint -cl (field,field2) -au AK -hp holoProject -ht holoTopic -i accessId -k accessKey -m Delimiter -l 1 -b false -n (field1,field2)
创建OTSconnector
-p: project名称
-t: topic名称
-it: ots Instance名称
-m表示认证类型,默认使用STS
AK表示通过AK认证,需要填写accessId和accessKey
STS表示通过STS认证
-t: ots Table名称
-wm表示写入方式,支持两种写入方式
PUT
UPDATE
-c表示同步的字段,例如:(field1,field2)
cotsc -p test_project -t test_topic -i accessId -k accessKey -it instanceId -m AK -t table -wm PUT -c (field1,field2)
创建 OSS connector
-p: project名称
-t: topic名称
-b: oss Bucket名称
-e: oss Endpoint名称
-pr: 同步到OSS的目录前缀
-tf:同步时间格式,例如:%Y%m%d%H%M表示按照分钟级别进行分区
-tr: 分区的时间间隔
-c: 同步字段
csc -p test_project -t test_topic -b bucket -e endpoint -pr ossPrefix -tf ossTimeFormat -tr timeRange -c (f1,f2)
删除connector(可传入多个connectorid,以空格分隔)
-p: project名称
-t: topic名称
-c: connectorId,可通过数据同步页签查看
dc -p test_project -t test_topic -c connectorId
获取connector详情信息
-p: project名称
-t: topic名称
-c: connectorId,可通过数据同步页签查看
gc -p test_project -t test_topic -c connectorId
获取某个Topic下面的connector列表
-p: project名称
-t: topic名称
lc -p test_project -t test_topic
重启connector
-p: project名称
-t: topic名称
-c: connectorId,可通过数据同步页签查看
rc -p test_project -t test_topic -c connectorId
更新connector ak
-p: project名称
-t: topic名称
-ty: 同步类型 比如:SINK_ODPS
uca -p test_project -t test_topic -ty SINK_ODPS -a accessId -k accessKey
shard操作
合并shard
-p: project名称
-t: topic名称
-s: 要合并的shardId
-a: 要合并的另一个shardId
ms -p test_project -t test_topic -s shardId -a adjacentShardId
分裂shard
-p: project名称
-t: topic名称
-s: 要分裂的shardId
ss -p test_project -t test_topic -s shardId
获取某个topic下面的所有shard
-p: project名称
-t: topic名称
ls -p test_project -t topicName
获取同步shard的状态
-p: project名称
-t: topic名称
-s: shardId
-c: connectorId,可通过数据同步页签查看
gcs -p test_project -t test_topic -s shardId -c connectorId
获取订阅消费的每个shard点位
-p: project名称
-t: topic名称
-s: 订阅id
-i: shardId
gso -p test_project -t test_topic -s subid -i shardId
订阅操作
创建订阅
-p: project名称
-t: topic名称
-c: 订阅描述
css -p test_project -t test_topic -c comment
删除订阅
-p: project名称
-t: topic名称
-s: 订阅id
dsc -p test_project -t test_topic -s subId
查询订阅列表
-p: project名称
-t: topic名称
lss -p test_project -t test_topic
上传下载数据
上传数据
-f: 参数表示文件路径,注意:windows路径下请添加转义符,例如:D:\\test\\test.txt
-p project名称
-t: topic名称
-m: 参数表示文本分隔符,目前支持逗号、空格分隔符
-n: 参数表示每次上传batchsize大小,默认为1000
uf -f filepath -p test_topic -t test_topic -m "," -n 1000
示例: CSV文件上传
下面以CSV文件为例,说明下如何使用console工具将CSV文件上传到DataHub数据。CSV文件的格式如下所示:
1. 0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
2. 1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
3. 2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
4. 3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
5. 4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
6. 5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
7. 6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
8. 7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
9. 8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
10. 9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
11. 10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111
上述CSV文件中每行一条Record,按照(,)区分字段。保存在本地路径/temp/test.csv中。DataHub Topic格式如下:
字段名称 | 字段类型 |
id | BIGINT |
name | STRING |
gender | BOOLEAN |
salary | DOUBLE |
my_time | TIMESTAMP |
使用console工具命令如下
uf -f /temp/test.csv -p test_topic -t test_topic -m "," -n 1000
下载数据
-f: 参数表示文件路径,注意:windows路径下请添加转义符,例如:D:\\test\\test.txt
-p: project名称
-t: topic名称
-s: shardId
-d: 订阅id
-f: 下载路径
-ti: 参数表示读取该时间之后的点位,格式为:yyyy-mm-dd hh:mm:ss
-l: 参数表示每次读取的数量
-g: 参数表示是否一直读
0表示只读一次,即获取当前recordsize后不再消费
1表示一直读取
down -p test_project -t test_topic -s shardId -d subId -f filePath -ti "1970-01-01 00:00:00" -l 100 -g 0
常见问题
脚本启动失败:windows环境下运行脚本检查脚本路径是否包含括号