全部产品

在程序中通过ADB Client高效写入数据

更新时间:2019-05-05 09:52:48

背景信息

ADB Client旨在为用户提供一个高效、简单地插入数据到分析型数据库MySQL版的方法。您只需要通过接口将数据提交给ADB Client,便可以直接插入数据到分析型数据库MySQL版中。使用ADB Client时,您无需关心分区聚合、连接池等问题,而且对数据实时写入也有更多的自主能力,不再强依赖DataHub等服务。

client

Maven repositories

通过Maven管理配置新的SDK版本,Maven的配置信息如下:

  1. <dependency>
  2. <groupId>com.alibaba.cloud.analyticdb</groupId>
  3. <artifactId>adbclient</artifactId>
  4. <version>1.0.2</version>
  5. </dependency>

接口列表

DatabaseConfig类

接口名 描述
setHost(String adbHost) 设置需要连接的分析型数据库MySQL版主机/域名。
setPort(int port) 设置需要连接的分析型数据库MySQL版端口。
setDatabase(String database) 需要连接的分析型数据库MySQL版库名。
setUser(String username) 设置连接分析型数据库MySQL版使用的用户名,请填AccessKeyId。

如何获取AccessKeyId,请参见账号与权限管理

setPassword(String pwd) 设置连接分析型数据库MySQL版使用的密码,请填AccessKeySecret。
setTable(List<String> table) 需要写入的表名List,建议小写。
setColumns(String tableName, List<String> columnList) 需要插入表的字段名,若是全字段插入,则使用columnList.add("*")即可。
table列表中的所有表都需要设置小写的字段名,否则无法通过检查。
setIngnoreInsertError(boolean isIngnoreInsertError) 针对配置的所有表,设置是否忽略插入时遇到的error,默认为false。
setInsertIgnore(boolean insertIgnore) 针对配置的所有表,请根据业务场景判断是否使用insert ignore into语句。
setEmptyAsNull(boolean emptyAsNull) 针对配置的所有表,将empty数据设置为null,默认为true。
setParallelNumber(int parallelNumber) 针对配置的所有表,设置写入分析型数据库MySQL版时的并发线程数,默认为4。
setLogger(Logger logger) 设置client中使用的logger对象,此处使用slf4j.Logger
setRetryTimes(int retryTimes) 设置提交时写入分析型数据库MySQL版出现异常时重试的次数,默认为0。
setRetryIntervalTime(long retryIntervalTime) 设置重试间隔的时间,单位是ms,默认为0。
setCommitSize(long commitSize) 设置自动提交的SQL长度(单位Byte),默认为32KB,一般不建议设置。
setInsertWithColumnName(boolean insertWithColumnName) 设置在拼接insert SQL时,是否需要带字段名,默认为true,即:
insert into tableName (col1,col2) values ('value1','value2');
如果为false,语句则类似insert into tableName values ('value1','value2');,此种情况下如果在表中加字段可能会导致写入失败。此设置只有在column列设置为“*”的情况下才会生效。

Row类

接口名 描述
setColumn(int index, Object value) 设置Row字段列表的值,必须确认字段的顺序。
此种方式下,Row实例不可复用,每条数据必须使用单独的Row实例。
setColumnValues(List<Object> values) 直接将List格式数据行写入Row中。
updateColumn(int index, Object value) 更新Row字段列表的值。
此方法中,Row实例可以复用,只需更新Row实例中的数据即可。

AdbClient类

接口名 描述
addRow(String tableName, Row row) / addRows(String tableName, List<Row> rows) 插入对应表的Row格式化的数据,即一条记录。数据会按照分区聚合的形式在sdk的内存中缓存,等待提交。
如果SQL长度已满,则会在addRow或者addMap的时候做一次自动提交,然后将最新的数据新增进来。
如果在自动提交时失败,调用方需要处理异常,并且会在异常中得到失败的数据列表。
addMap(String tableName, Map<String, String> dataMap) / addMaps(String tableName, List<Map<String, String>> dataMaps) 对应于addRow,支持map格式数据的写入。
如果SQL长度已满,则会在addRow或者addMap时做一次自动提交,然后将最新的数据新增进来。
如果在自动c提交时失败,调用方需要处理此异常,并且会在异常中得到失败的数据列表。
addStrictMap(String table, Map<String, String> dataMap) / addStrictMaps(String tableName, List<Map<String, String>> dataMaps) 作用和addMap/addMaps类似,不同之处是传入的Map数据的key必须是表的字段。如果不是,会重新获取一次表结构信息,如果还没有改key的字段,则直接报错。
其他功能与addMap/addMaps一致。
如果没有动态表结构变更,不建议使用改接口,性能会有所损耗。
commit() 将缓存的数据进行提交,写入分析型数据库MySQL版中。若提交失败,会把执行错误的语句放在异常中抛出,调用方需要对此异常进行处理。
TableInfo getTableInfo(String tableName) 获取对应表的结构信息。
List<ColumnInfo> getColumnInfo(String tableName) 获取对应表的字段列表信息,字段类是ColumnInfo,可以通过columnInfo.isNullable()获取该字段是否能为null。
Connection getConnection() throws SQLException 从客户端连接池获取mysql Connection连接,调用方可以使用获得的Connection做非插入操作,使用方式和jdbc的连接使用方式一致。
注意:使用结束后需要释放掉相应的资源(如ResultSet、Statement、Connection)。

ColumnInfo类

接口名 描述
boolean isNullable() 判断该字段是否能为null。

AdbClientException错误码

错误码名 错误码值 描述
SQL_LENGTH_LIMIT 100 SQL长度超过限制的长度,默认为32k。
COMMIT_ERROR_DATA_LIST 101 提交中某些数据出现异常,会返回异常的数据,通过e.getErrData()即可获得异常数据List<String>。
此错误码在addMap(s)、addRow(s)和提交操作的时候都可能会发生,需要单独处理此错误码的异常。
COMMIT_ERROR_OTHER 102 commit提交中的其他异常。
ADD_DATA_ERROR 103 新增数据过程中出现的异常。
CREATE_CONNECTION_ERROR 104 创建连接出现异常。
CLOSE_CONNECTION_ERROR 105 关闭连接出现异常。
CONFIG_ERROR 106 配置DatabaseConfig出现配置错误。
STOP_ERROR 107 停止实例时的报错。
OTHER 999 默认异常错误码。

示例代码

  1. public class AdbClientUsage {
  2. public void demo() {
  3. DatabaseConfig databaseConfig = new DatabaseConfig();
  4. // 分析型数据库MySQL版的主机名或者URL
  5. databaseConfig.setHost("100.100.100.100");
  6. // 分析型数据库MySQL版的连接端口号
  7. databaseConfig.setPort(8888);
  8. // 分析型数据库MySQL版的所属账号的AccessKeyId
  9. databaseConfig.setUser("your db username");
  10. // 分析型数据库MySQL版的所属账号的AccessKeySecret
  11. databaseConfig.setPassword("your db password");
  12. databaseConfig.setDatabase("your db name");
  13. // 设置需要写入的表名列表
  14. List<String> tables = new ArrayList<String>();
  15. tables.add("your table name");
  16. tables.add("your table name 2");
  17. // 一旦new Client实例之后,表配置是不可修改的
  18. databaseConfig.setTable(tables);
  19. // 设置需要写入的表字段
  20. List<String> columns = new ArrayList<String>();
  21. columns.add("column1");
  22. columns.add("column2");
  23. // 如果是所有字段,字段列表使用columns.add("*")即可
  24. databaseConfig.setColumns("your table name", columns);
  25. databaseConfig.setColumns("your table name 2", Collections.singletonList("*"));
  26. // 如果出现插入失败,是否跳过
  27. //忽略插入失败可能导致数据丢失
  28. databaseConfig.setIgnoreInsertError(false);
  29. //如果该列的值为空,则直接设置为null即可。
  30. databaseConfig.setEmptyAsNull(true);
  31. // 使用insert ignore into方式进行插入
  32. databaseConfig.setInsertIgnore(true);
  33. // 提交时,写入分析型数据库MySQL版出现异常时重试的3次
  34. databaseConfig.setRetryTimes(3);
  35. // 重试间隔的时间为1s,单位是ms
  36. databaseConfig.setRetryIntervalTime(1000);
  37. // 初始化AdbClient,初始化实例之后,databaseConfig的配置信息无法再进行修改
  38. AdbClient adbClient = new AdbClient(databaseConfig);
  39. // 数据需要攒批,分多次添加,再提交
  40. for (int i = 0; i < 10; i++) {
  41. // Add row(s) to buffer. One instance for one record
  42. Row row = new Row(columns.size());
  43. // Set column
  44. // the column index must be same as the sequence of columns
  45. // the column value can be any type, internally it will be formatted according to column type
  46. row.setColumn(0, i); // Number value
  47. row.setColumn(1, "string value"); // String value
  48. // 如果SQL长度已满,则会在addRow或者addMap的时进行一次自动提交
  49. // 如果提交失败,则返回AdbClientException异常,错误码为COMMIT_ERROR_DATA_LIST
  50. adbClient.addRow("your table name", row);
  51. }
  52. Row row = new Row();
  53. row.setColumn(0, 10); // Number value
  54. row.setColumn(1, "2018-01-01 08:00:00"); //参数2的数据类型为 Date、Timestamp或者Time value
  55. adbClient.addRow("your table name 2", row);
  56. // Update column. Row实例可复用
  57. row.updateColumn(0, 11);
  58. row.updateColumn(1, "2018-01-02 08:00:00");
  59. adbClient.addRow("your table name 2", row);
  60. // 将map加入缓存中
  61. Map<String, String> rowMap = new HashMap<String, String>();
  62. rowMap.put("column1", "124");
  63. rowMap.put("column2", "string value");
  64. // 数据需要攒批,最好多次添加之后再进行提交
  65. adbClient.addMap("your table name", rowMap);
  66. //将缓存中的数据提交到分析型数据库MySQL版
  67. //数据成功提交到分析型数据库MySQL版之后,缓存中的数据会被清除
  68. try {
  69. adbClient.commit();
  70. } catch (Exception e) {
  71. } finally {
  72. adbClient.stop();
  73. }
  74. }
  75. }

注意事项

  • ADB Client本身依赖 druid(1.1.10)、mysql-connector-java(5.1.45)、commons-lang3(3.4)、slf4j-api(1.7.25)、slf4j-log4j12(1.7.25),如果在使用过程中,出现版本冲突,请检查这几个包的版本并解决冲突。

  • ADB Client SDK是非线程安全的,如果多线程调用时,需要每个线程维护自己的Client对象。

强烈不建议多线程共用SDK实例,除了线程安全问题外,还容易使Client成为写入性能的瓶颈。

  • 数据必须在调用commit成功后才算成功写入分析型数据库MySQL版。

  • 针对Client抛出的异常,调用方要根据错误码的意义自行判断如何处理。如果是数据写入有问题,可以重复提交或者记录下有问题的数据后跳过。

  • 很多时候写入线程并不是越多越好,因为业务程序会涉及到攒数据的场景,所以对内存的消耗比较明显,业务调用方一定要多关注应用程序的GC情况。

  • 数据攒批数量不要太小,如果太小,分区聚合写意义就不大了。最好通过消息数*消息长度≈分区数*32k大概得出攒批的消息数。

  • DatabaseConfig配置在new AdbClient成功之后是无法进行修改,所有配置项必须在Client对象初始化之前完成配置。