全部产品
云市场

多语言(C++/Python/Go等)API访问

更新时间:2019-10-15 14:49:50

简介

HBase增强版通过Thrift支持多语言访问,只要是Thrift支持的语言,都可以访问HBase增强版。HBase增强版服务端的Thrift版本是0.12.0,虽然说thrift支持向后兼容,但如果有条件的用户,最好还是下载0.12.0的thrift,点击这里下载。一些语言提供了管理依赖的方法,可以遵循这些语言的使用习惯来安装thrift,如Python语言可以通过pip install thrift来安装,Go语言可以直接在代码里import {"github.com/apache/thrift/lib/go/thrift"}即可。

HBase增强版使用的Thrift接口定义是HBase的thrift2。因此需要下载thrift2的定义文件生成相应语言的接口。相比thrift1定义,HBase中的thrift2接口定义更加清晰,用户可以获得和Java语言类似的API调用体验。之前的thrift2之所以使用不广泛是因为thrift2的接口不全,不支持建表删表等DDL操作。但是目前thrift2相关的API定义已经由阿里云的开发人员补全并回馈给了社区(HBASE-21649)。目前thrift2比thrift1的功能更全而且更容易使用。

准备

  1. 下载Thrift安装包,点击这里下载
  2. 下载HBase Thrift2定义文件
  3. 获取集群的连接地址,多语言访问使用的连接为连接信息中的“非 JAVA 语言 Thrift2 访问地址”访问地址部分(注意专有网络地址和外网地址的区别)。

访问

Thrift的使用帮助可以参考Apache Thrift的官方文档,下面给出简单的使用方法。

1.生成对应语言的接口定义文件

从上述地址下载接口定义文件后,按照如下语法生成接口定义文件

  1. thrift --gen <language> Hbase.thrift

例如:

  1. thrift --gen php Hbase.thrift
  2. thrift --gen cpp Hbase.thrift
  3. thrift --gen py Hbase.thrift

2. 构造客户端访问HBase增强版

HBase增强版Thrift服务器端的transport层使用的是HTTP,因此在构造客户端时,需要thrift中的ThttpClient(各个语言都有相应实现)。并且在ACL开启的情况下,需要在ThttpClient上加上两个header来向服务器传输用户名和密码进行认证(如果关闭ACL则不需要)。Thrift在每个语言实现的ThttpClient都有加定制header的函数。

python示例

  1. # -*- coding: utf-8 -*-
  2. # 以下两个模块可以通过 pip install thrift 安装获得
  3. from thrift.protocol import TBinaryProtocol
  4. from thrift.transport import THttpClient
  5. # 下面的模块通过 thrift --gen py hbase.thrift 来生成
  6. from hbase import THBaseService
  7. from hbase.ttypes import TColumnValue, TColumn, TTableName, TTableDescriptor, TColumnFamilyDescriptor, TNamespaceDescriptor, TGet, TPut, TScan
  8. # 连接地址
  9. url = "http://host:9190"
  10. transport = THttpClient.THttpClient(url)
  11. headers = {}
  12. # 用户名
  13. headers["ACCESSKEYID"]="root";
  14. # 密码
  15. headers["ACCESSSIGNATURE"]="root"
  16. transport.setCustomHeaders(headers)
  17. protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
  18. client = THBaseService.Client(protocol)
  19. transport.open()
  20. # create Namespace
  21. client.createNamespace(TNamespaceDescriptor(name="ns"))
  22. # create table
  23. tableName = TTableName(ns="ns", qualifier="table1")
  24. client.createTable(TTableDescriptor(tableName=tableName, columns=[
  25. TColumnFamilyDescriptor(name="f")
  26. ]), None)
  27. # 做DML操作时,表名参数为bytes,表名的规则是namespace + 冒号 + 表名
  28. tableInbytes = "ns:table1".encode("utf8")
  29. # 插入数据
  30. client.put(table=tableInbytes, tput=TPut(row="row1".encode("utf8"), columnValues=[TColumnValue(family="f".encode("utf8"), qualifier="q1".encode("utf8"), value="value".encode("utf8"))]))
  31. # 批量插入数据
  32. puts = []
  33. put1 = TPut(row="row2".encode("utf8"), columnValues=[TColumnValue(family="f".encode("utf8"), qualifier="q1".encode("utf8"), value="value2".encode("utf8"))])
  34. put2 = TPut(row="row3".encode("utf8"), columnValues=[TColumnValue(family="f".encode("utf8"), qualifier="q1".encode("utf8"), value="value3".encode("utf8"))])
  35. puts.append(put1)
  36. puts.append(put2)
  37. client.putMultiple(table=tableInbytes, tputs=puts)
  38. # 单行查询数据
  39. result = client.get(tableInbytes, TGet(row="row1".encode("utf8")))
  40. print "Get result: ", result
  41. # 批量单行查询
  42. gets = []
  43. get1 = TGet(row="row2".encode("utf8"))
  44. get2 = TGet(row="row3".encode("utf8"))
  45. gets.append(get1)
  46. gets.append(get2)
  47. results = client.getMultiple(tableInbytes, gets)
  48. print "getMultiple results: ", results
  49. # 范围扫描
  50. # 扫描时需要设置startRow和stopRow,否则会变成全表扫描
  51. startRow = "row0".encode("utf8")
  52. stopRow = "row9".encode("utf8")
  53. scan = TScan(startRow=startRow, stopRow=stopRow)
  54. # caching的大小为每次从服务器返回的行数,设置太大会导致服务器处理过久,太小会导致范围扫描与服务器做过多交互
  55. # 根据每行的大小,caching的值一般设置为10到100之间
  56. caching = 2
  57. # 扫描的结果
  58. results = []
  59. # 此函数可以找到比当前row大的最小row,方法是在当前row后加入一个0x00的byte
  60. # 从比当前row大的最小row开始scan,可以保证中间不会漏扫描数据
  61. def createClosestRowAfter(row):
  62. array = bytearray(row)
  63. array.append(0x00)
  64. return bytes(array)
  65. while True:
  66. lastResult = None
  67. # getScannerResults会自动完成open,close 等scanner操作,HBase增强版必须使用此方法进行范围扫描
  68. currentResults = client.getScannerResults(tableInbytes, scan, caching)
  69. for result in currentResults:
  70. results.append(result)
  71. lastResult = result
  72. # 如果一行都没有扫描出来,说明扫描已经结束,我们已经获得startRow和stopRow之间所有的result
  73. if lastResult is None:
  74. break
  75. # 如果此次扫描是有结果的,我们必须构造一个比当前最后一个result的行大的最小row,继续进行扫描,以便返回所有结果
  76. else:
  77. nextStartRow = createClosestRowAfter(lastResult.row)
  78. scan = TScan(startRow=nextStartRow, stopRow=stopRow)
  79. print "Scan results: ", results
  80. # disable 表
  81. client.disableTable(TTableName(ns="ns", qualifier="table1"))
  82. # 删除表
  83. client.deleteTable(TTableName(ns="ns", qualifier="table1"))
  84. # close 连接
  85. transport.close()

Go 语言示例

  1. package main
  2. import (
  3. // hbase模块通过 thrift --gen go hbase.thrift 来生成
  4. "./hbase"
  5. "context"
  6. "fmt"
  7. "github.com/apache/thrift/lib/go/thrift"
  8. "os"
  9. )
  10. const (
  11. HOST = "http://host:9190"
  12. // 用户名
  13. USER = "root"
  14. // 密码
  15. PASSWORD = "root"
  16. )
  17. func main() {
  18. defaultCtx := context.Background()
  19. protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
  20. trans, err := thrift.NewTHttpClient(HOST)
  21. if err != nil {
  22. fmt.Fprintln(os.Stderr, "error resolving address:", err)
  23. os.Exit(1)
  24. }
  25. // 设置用户名密码
  26. httClient := trans.(*thrift.THttpClient)
  27. httClient.SetHeader("ACCESSKEYID", USER)
  28. httClient.SetHeader("ACCESSSIGNATURE", PASSWORD)
  29. client := hbase.NewTHBaseServiceClientFactory(trans, protocolFactory)
  30. if err := trans.Open(); err != nil {
  31. fmt.Fprintln(os.Stderr, "Error opening "+HOST, err)
  32. os.Exit(1)
  33. }
  34. // create Namespace
  35. err = client.CreateNamespace(defaultCtx, &hbase.TNamespaceDescriptor{Name: "ns"})
  36. if err != nil {
  37. fmt.Fprintln(os.Stderr, "error CreateNamespace:", err)
  38. os.Exit(1)
  39. }
  40. // create table
  41. tableName := hbase.TTableName{Ns: []byte("ns"), Qualifier: []byte("table1")}
  42. err = client.CreateTable(defaultCtx, &hbase.TTableDescriptor{TableName: &tableName, Columns: []*hbase.TColumnFamilyDescriptor{&hbase.TColumnFamilyDescriptor{Name: []byte("f")}}}, nil)
  43. if err != nil {
  44. fmt.Fprintln(os.Stderr, "error CreateTable:", err)
  45. os.Exit(1)
  46. }
  47. //做DML操作时,表名参数为bytes,表名的规则是namespace + 冒号 + 表名
  48. tableInbytes := []byte("ns:table1")
  49. // 插入数据
  50. err = client.Put(defaultCtx, tableInbytes, &hbase.TPut{Row: []byte("row1"), ColumnValues: []*hbase.TColumnValue{&hbase.TColumnValue{
  51. Family: []byte("f"),
  52. Qualifier: []byte("q1"),
  53. Value: []byte("value1")}}})
  54. if err != nil {
  55. fmt.Fprintln(os.Stderr, "error Put:", err)
  56. os.Exit(1)
  57. }
  58. //批量插入数据
  59. puts := []*hbase.TPut{&hbase.TPut{Row: []byte("row2"), ColumnValues: []*hbase.TColumnValue{&hbase.TColumnValue{
  60. Family: []byte("f"),
  61. Qualifier: []byte("q1"),
  62. Value: []byte("value2")}}}, &hbase.TPut{Row: []byte("row3"), ColumnValues: []*hbase.TColumnValue{&hbase.TColumnValue{
  63. Family: []byte("f"),
  64. Qualifier: []byte("q1"),
  65. Value: []byte("value3")}}}}
  66. err = client.PutMultiple(defaultCtx, tableInbytes, puts)
  67. if err != nil {
  68. fmt.Fprintln(os.Stderr, "error PutMultiple:", err)
  69. os.Exit(1)
  70. }
  71. // 单行查询数据
  72. result, err := client.Get(defaultCtx, tableInbytes, &hbase.TGet{Row: []byte("row1")})
  73. fmt.Println("Get result:")
  74. fmt.Println(result)
  75. // 批量单行查询
  76. gets := []*hbase.TGet{&hbase.TGet{Row: []byte("row2")}, &hbase.TGet{Row: []byte("row3")}}
  77. results, err := client.GetMultiple(defaultCtx, tableInbytes, gets)
  78. fmt.Println("GetMultiple result:")
  79. fmt.Println(results)
  80. //范围扫描
  81. //扫描时需要设置startRow和stopRow,否则会变成全表扫描
  82. startRow := []byte("row0")
  83. stopRow := []byte("row9")
  84. scan := &hbase.TScan{StartRow: startRow, StopRow: stopRow}
  85. //caching的大小为每次从服务器返回的行数,设置太大会导致服务器处理过久,太小会导致范围扫描与服务器做过多交互
  86. //根据每行的大小,caching的值一般设置为10到100之间
  87. caching := 2
  88. // 扫描的结果
  89. var scanResults []*hbase.TResult_
  90. for true {
  91. var lastResult *hbase.TResult_ = nil
  92. // getScannerResults会自动完成open,close 等scanner操作,HBase增强版必须使用此方法进行范围扫描
  93. currentResults, _ := client.GetScannerResults(defaultCtx, tableInbytes, scan, int32(caching))
  94. for _, tResult := range currentResults {
  95. lastResult = tResult
  96. scanResults = append(scanResults, tResult)
  97. }
  98. // 如果一行都没有扫描出来,说明扫描已经结束,我们已经获得startRow和stopRow之间所有的result
  99. if lastResult == nil {
  100. break
  101. } else {
  102. // 如果此次扫描是有结果的,我们必须构造一个比当前最后一个result的行大的最小row,继续进行扫描,以便返回所有结果
  103. nextStartRow := createClosestRowAfter(lastResult.Row)
  104. scan = &hbase.TScan{StartRow: nextStartRow, StopRow: stopRow}
  105. }
  106. }
  107. fmt.Println("Scan result:")
  108. fmt.Println(scanResults)
  109. //disable table
  110. err = client.DisableTable(defaultCtx, &tableName)
  111. if err != nil {
  112. fmt.Fprintln(os.Stderr, "error DisableTable:", err)
  113. os.Exit(1)
  114. }
  115. // delete table
  116. err = client.DeleteTable(defaultCtx, &tableName)
  117. if err != nil {
  118. fmt.Fprintln(os.Stderr, "error DisableTable:", err)
  119. os.Exit(1)
  120. }
  121. // delete namespace
  122. err = client.DeleteNamespace(defaultCtx, "ns")
  123. if err != nil {
  124. fmt.Fprintln(os.Stderr, "error DisableTable:", err)
  125. os.Exit(1)
  126. }
  127. //tableName, err := client.GetTableNamesByNamespace(defaultCtx, "default")
  128. if err != nil {
  129. fmt.Fprintln(os.Stderr, "error getting table:", err)
  130. os.Exit(1)
  131. }
  132. //s := string(tableName[0].Qualifier)
  133. //fmt.Println(s)
  134. }
  135. // 此函数可以找到比当前row大的最小row,方法是在当前row后加入一个0x00的byte
  136. // 从比当前row大的最小row开始scan,可以保证中间不会漏扫描数据
  137. func createClosestRowAfter(row []byte) []byte {
  138. var nextRow []byte
  139. var i int
  140. for i = 0; i < len(row); i++ {
  141. nextRow = append(nextRow, row[i])
  142. }
  143. nextRow = append(nextRow, 0x00)
  144. return nextRow
  145. }

php 示例

  1. //设置用户名和密码,ACCESSKEYID的值为用户名,ACCESSSIGNATURE的值为密码
  2. $headers = array('ACCESSKEYID' => 'root','ACCESSSIGNATURE' => 'root');
  3. //新建HTTP协议的客户端
  4. $socket = new THttpClient('hb-xxxx.aliyuncs.com', 9190);
  5. //将header设置到http client中,访问时携带用户名密码
  6. $socket->addHeaders$headers
  7. $transport = new TBufferedTransport($socket);
  8. $protocol = new TBinaryProtocol($transport);
  9. $client = new HbaseClient($protocol);
  10. $transport->open();

更多语言的使用请参见Thrfit官方文档