全部产品

console命令工具

console命令行工具

操作视频

准备工作

  1. 下载命令行工具进行datahub_console.tar.gz并解压
  2. 在conf目录中的datahub.properties文件填写ak endpoint信息2参数说明:
    • datahub.accessid: 访问DataHub的accessId
    • datahub.accesskey: 访问DataHuv的accessKey信息
    • datahub.endpoint: 域名,具体可查看域名列表
  3. 在bin目录执行启动脚本
    • Linux环境:sh start.sh
    • Windows环境:直接运行start.cmd文件

使用指南

基础操作

  • 查看所有可执行命令
  • help后输入其他命令可查看该命令所需参数,例如:help lt即可查看lt所需要的参数
  1. help
  • 清除屏幕
  1. clear
  • 退出程序
  1. exit
  • 查看命令报错详情,命令报错后可使用stacktrace查看具体错误
  1. stacktrace
  • 运行多命令脚本
  1. script

Project操作

  • 创建Project
    • -p: project名称
    • -c: project描述
  1. cp -p test_project -c test_comment
  • 删除Project
    • -p: project名称
  1. dp -p test_project
  • 获取Project列表
  1. lp

Topic操作

  • 创建Topic
    • -p:project名称
    • -t: topic名称
    • -m: 表示不同的Topic类型,BLOB代表创建BLOB类型的Topic,Tuple表示创建Tuple类型的Topic
    • -f: Tuple类型 Topic字段格式为[(fieldName,fieldType,isNull)],多个字段以逗号隔开
    • -s: shard数量
    • -l: 数据生命周期,范围(1-7)天
    • -c: topic描述
  1. ct -p test_project -t test_topic -m TUPLE -f [(name,string,true)] -s 3 -l 3 -c test_comment
  • 删除Topic
    • -p: project名称
    • -t: topic名称
  1. dt -p test_project -t test_topic
  • 获取Topic信息
    • -p: project名称
    • -t: topic名称
  1. gt -p test_project -t test_topic
  • 导出Topic schema结构为json文件
    • -f:保存文件路径
    • -p: project名称
    • -t: topic名称
  1. gts -f filepath -p test_project -t test_topic
  • 获取Topic列表
    • -p: project名称
  1. lt -p test_project
  • 导入Json文件创建Topic
    • -s: shard数量
    • -l: 数据生命周期,范围(1-7)天
    • -f: 文件路径
    • -p: project名称
    • -t: topic名称
  1. rtt -s 3 -l 3 -c test_comment -f filepath -p test_project -t test_topic

Connector操作

  • 创建ODPS connector
    • -p: project名称
    • -t: topic名称
    • -m: 参数表示不同的同步类型,目前同步到odps的有SYSTEM_TIME、USER_DEFINE、EVENT_TIME、META_TIME四种同步类型
    • -e: odps endpoint,请填写经典网络地址
    • -op: odps Project名称
    • -oa: 访问odps的accessId
    • -ok: 访问odps的accessKey
    • -tr参数表示分区的时间间隔,console工具默认为60分钟
    • -tf参数 分区格式,ds 表示按天分区,ds hh表示按小时分区,ds hh mm表示按分钟分区
  1. coc -p test_project -t test_topic -m SYSTEM_TIME -e odpsEndpoint -op odpsProject -ot odpsTable -oa odpsAccessId -ok odpsAccessKey -tr 60 -c (field1,field2) -tf ds hh mm
  • 新增connector字段
    • -p: project名称
    • -t: topic名称
    • -c: connectorId,可通过数据同步页签查看
    • -f: fieldName,新增字段名称
  1. acf -p test_project -t test_topic -c connectorId -f fieldName
  • 创建同步到MYSQL/RDS connector
    • -p: project名称
    • -t: topic名称
    • -h: host,请填写经典网络地址
    • -po: port
    • -ty参数表示同步的类型,共有两种
      • mysql表示创建同步到mysql的connector
      • ads 表示创建同步到ads的connector
    • -d: database名称
    • -ta: table名称
    • -u: userName
    • -pa: password
    • -ht表示插入方式,共有两种
      • IGNORE
      • OVERWRITE
    • -n表示同步的字段,示例:(field1,field2)
  1. cdc -p test_project -t test_topic -h host -po 3306 -ty mysql -d mysql_database -ta msyql_table -u username -pa password -ht IGNORE -n (field1,field2)
  • 创建 DATAHUB connector

    • -p: project名称
    • -t: topic名称
    • -sp: sinkProject,数据导入的Project
    • -st: sinkTopic,数据导入的Topic

    • -m: 表示认证类型

      • AK表示通过AK认证,需要填写accessId和accessKey
      • STS表示通过STS认证
  1. cdhc -p test_project -t test_topic -sp sinkProject -st sinkTopic -m AK -i accessid k accessKey
  • 创建FC connector
    • -p: project名称
    • -t: topic名称
    • -e: fc endpoint,请填写经典网络地址
    • -s: fc Service名称
    • -f: fc Function名称
    • -au: 认证方式
      • AK表示通过AK认证,需要填写accessId和accessKey
      • STS表示通过STS认证
    • -n表示同步的字段,例如:(field1,field2)
  1. cfc -p test_project -t test_topic -e endpoint -s service -f function -au AK -i accessId -k accessKey -n (field1,field2)
  • 创建HOLOGRES connector
    • -p: project名称
    • -t: topic名称
    • -e: endpoint
    • -cl: 同步到hologres的字段
    • -au表示认证方式,目前同步到holo只支持AK认证
    • -m表示解析类型,选择Delimiter需要指定lineDelimiter、parseData、columnDelimiter属性,选择IngormaticaJson需要指定parseData属性
      • Delimiter
      • InformaticaJson
  1. chc -p test_project -t test_topic -e endpoint -cl (field,field2) -au AK -hp holoProject -ht holoTopic -i accessId -k accessKey -m Delimiter -l 1 -b false -n (field1,field2)
  • 创建OTSconnector

    • -p: project名称
    • -t: topic名称
    • -it: ots Instance名称

    • -m表示认证类型,默认使用STS

      • AK表示通过AK认证,需要填写accessId和accessKey
      • STS表示通过STS认证
    • -t: ots Table名称
    • -wm表示写入方式,支持两种写入方式
      • PUT
      • UPDATE
    • -c表示同步的字段,例如:(field1,field2)
  1. cotsc -p test_project -t test_topic -i accessId -k accessKey -it instanceId -m AK -t table -wm PUT -c (field1,field2)
  • 创建 OSS connector
    • -p: project名称
    • -t: topic名称
    • -b: oss Bucket名称
    • -e: oss Endpoint名称
    • -pr: 同步到OSS的目录前缀
    • -tf: 同步时间格式,例如:%Y%m%d%H%M表示按照分钟级别进行分区
    • -tr: 分区的时间间隔
    • -c: 同步字段
  1. csc -p test_project -t test_topic -b bucket -e endpoint -pr ossPrefix -tf ossTimeFormat -tr timeRange -c (f1,f2)
  • 删除connector,可传入多个connectorid,以空格分隔
    • -p: project名称
    • -t: topic名称
    • -c: connectorId,可通过数据同步页签查看
  1. dc -p test_project -t test_topic -c connectorId
  • 获取connector详情信息
    • -p: project名称
    • -t: topic名称
    • -c: connectorId,可通过数据同步页签查看
  1. gc -p test_project -t test_topic -c connectorId
  • 获取某个Topic下面的connector列表
    • -p: project名称
    • -t: topic名称
  1. lc -p test_project -t test_topic
  • 重启connector
    • -p: project名称
    • -t: topic名称
    • -c: connectorId,可通过数据同步页签查看
  1. rc -p test_project -t test_topic -c connectorId
  • 更新connector ak
    • -p: project名称
    • -t: topic名称
    • -c: connectorId,可通过数据同步页签查看
    • -ty: 同步类型,目前支持SINK_ODPS 、SINK_HOLOGRES、SINK_OTS、SINK_OSS、SINK_FC类型
  1. uca -p test_project -t test_topic -c connectorId -ty connectorType -a accessId -k accessKey

shard操作

  • 合并shard
    • -p: project名称
    • -t: topic名称
    • -s: 要合并的shardId
    • -a: 要合并的另一个shardId
  1. ms -p test_project -t test_topic -s shardId -a adjacentShardId
  • 分裂shard
    • -p: project名称
    • -t: topic名称
    • -s: 要分裂的shardId
  1. ss -p test_project -t test_topic -s shardId
  • 获取某个topic下面的所有shard
    • -p: project名称
    • -t: topic名称
  1. ls -p test_project -t topicName
  • 获取同步shard的状态
    • -p: project名称
    • -t: topic名称
    • -s: shardId
    • -c: connectorId,可通过数据同步页签查看
  1. gcs -p test_project -t test_topic -s shardId -c connectorId
  • 获取订阅消费的每个shard点位
    • -p: project名称
    • -t: topic名称
    • -s: 订阅id
    • -i: shardId
  1. gso -p test_project -t test_topic -s subid -i shardId

订阅操作

  • 创建订阅
    • -p: project名称
    • -t: topic名称
    • -c: 订阅描述
  1. css -p test_project -t test_topic -c comment
  • 删除订阅
    • -p: project名称
    • -t: topic名称
    • -s: 订阅id
  1. dsc -p test_project -t test_topic -s subId
  • 查询订阅列表
    • -p: project名称
    • -t: topic名称
  1. lss -p test_project -t test_topic

上传下载数据

  • 上传数据
    • -f: 参数表示文件路径,注意:windows路径下请添加转义符,例如:D:\\test\\test.txt
    • -p project名称
    • -t: topic名称
    • -m: 参数表示文本分隔符,目前支持逗号、空格分隔符
    • -n: 参数表示每次上传batchsize大小,默认为1000
  1. uf -f filepath -p test_topic -t test_topic -m "," -n 1000
示例: CSV文件上传

下面以CSV文件为例,说明下如何使用console工具将CSV文件上传到DataHub数据。CSV文件的格式如下图所示:

  1. 1. 0,qe614c760fuk8judu01tn5x055rpt1,true,100.1,14321111111
  2. 2. 1,znv1py74o8ynn87k66o32ao4x875wi,true,100.1,14321111111
  3. 3. 2,7nm0mtpgo1q0ubuljjjx9b000ybltl,true,100.1,14321111111
  4. 4. 3,10t0n6pvonnan16279w848ukko5f6l,true,100.1,14321111111
  5. 5. 4,0ub584kw88s6dczd0mta7itmta10jo,true,100.1,14321111111
  6. 6. 5,1ltfpf0jt7fhvf0oy4lo8m3z62c940,true,100.1,14321111111
  7. 7. 6,zpqsfxqy9379lmcehd7q8kftntrozb,true,100.1,14321111111
  8. 8. 7,ce1ga9aln346xcj761c3iytshyzuxg,true,100.1,14321111111
  9. 9. 8,k5j2id9a0ko90cykl40s6ojq6gruyi,true,100.1,14321111111
  10. 10. 9,ns2zcx9bdip5y0aqd1tdicf7bkdmsm,true,100.1,14321111111
  11. 11. 10,54rs9cm1xau2fk66pzyz62tf9tsse4,true,100.1,14321111111

上述CSV文件中每行一条Record,按照(,)区分字段。保存在本地路径/temp/test.csv中。DataHub Topic格式如下:

字段名称 字段类型
id BIGINT
name STRING
gender BOOLEAN
salary DOUBLE
my_time TIMESTAMP

使用console工具命令如下

  1. uf -f /temp/test.csv -p test_topic -t test_topic -m "," -n 1000
  • 下载数据
    • -f: 参数表示文件路径,注意:windows路径下请添加转义符,例如:D:\\test\\test.txt
    • -p: project名称
    • -t: topic名称
    • -s: shardId
    • -d: 订阅id
    • -f: 下载路径
    • -ti: 参数表示读取该时间之后的点位,格式为:yyyy-mm-dd hh:mm:ss
    • -l: 参数表示每次读取的数量
    • -g: 参数表示是否一直读
      • 0表示只读一次,即获取当前recordsize后不再消费
      • 1表示一直读取
  1. down -p test_project -t test_topic -s shardId -d subId -f filePath -ti "1970-01-01 00:00:00" -l 100 -g 0

常见问题

  • 脚本启动失败:windows环境下运行脚本检查脚本路径是否包含括号