全部产品
弹性计算 会员服务 网络 安全 移动云 数加·大数据分析及展现 数加·大数据应用 管理与监控 云通信 阿里云办公 培训与认证 更多
存储与CDN 数据库 域名与网站(万网) 应用服务 数加·人工智能 数加·大数据基础服务 互联网中间件 视频服务 开发者工具 解决方案 物联网 智能硬件
数据传输服务DTS

订阅数据 SQL 封装简介

更新时间:2018-01-30 10:12:37

数据订阅 SDK 订阅到的数据为 DTS 自定义格式,本小节简单介绍各种类型 SQL 语句解析的代码实现。

DDL 解析

如果一个 Record 是 DDL 语句,那么这个 Record 的操作类型为 DDL,且 DDL 语句存储在第一列的 value 中。获取 DDL 语句的代码示例如下:

  1. String ddl_string;
  2. Record.Type type=record.getOpt();
  3. if(type.equals(Record.Type.DDL)){
  4. List<DataMessage.Record.Field> fields = record.getFieldList();
  5. ddl_string = fields.get(0).getValue().toString();
  6. }

Insert 解析

如果一个 Record 是 Insert 语句,那么这个 Record 的操作类型为 INSERT。获取 Insert 完整语句的代码示例如下:

  1. StringBuilder insert_string=new StringBuilder();
  2. Record.Type type=record.getOpt();
  3. DataMessage.Record.Field field;
  4. StringBuilder FieldName=new StringBuilder();
  5. StringBuilder FieldValue = new StringBuilder();
  6. if(type.equals(Record.Type.INSERT)){
  7. int i=0;
  8. List<DataMessage.Record.Field> fields = record.getFieldList();
  9. for (; i < fields.size(); i++) {
  10. field = fields.get(i); FieldName.append('`'+field.getFieldname().toLowerCase()+'`');
  11. FieldValue.append(field.getValue());
  12. if (i != fields.size() - 1) {
  13. FieldName.append(',');
  14. FieldValue.append(',');
  15. }
  16. }
  17. insert_string.append("insert "+ record.getTablename()+"("+FieldName.toString()+") values("+FieldValue.toString()+");");
  18. }

Update 解析

如果一个 Record 是 Update 语句,那么这个 Record 的操作类型为 UPDATE。

update 更新前的字段存储在 Record.getFieldList() 中索引为偶数的 Field,更新后的字段值存储在索引为奇数的 Field。

下面的示例代码介绍当 update 的表有主键时,如何获取 Update 完整语句:

  1. StringBuilder update_string=new StringBuilder();
  2. Record.Type type=record.getOpt();
  3. DataMessage.Record.Field field;
  4. StringBuilder SetValue = new StringBuilder();
  5. StringBuilder WhereCondition = new StringBuilder();
  6. String ConditionStr;
  7. boolean hasPk=false;
  8. boolean pkMode=false;
  9. boolean hasSet=false;
  10. if(type.equals(Record.Type.UPDATE)){
  11. int i=0;
  12. DataMessage.Record.Field OldField = null;
  13. DataMessage.Record.Field NewField = null;
  14. List<DataMessage.Record.Field> fields = record.getFieldList();
  15. for (; i <fields.size() ; i++) {
  16. if (i % 2 == 0) {
  17. OldField = fields.get(i);
  18. continue;
  19. }
  20. NewField = fields.get(i);
  21. field = NewField;
  22. if (field.isPrimary()) {
  23. if (hasPk) {
  24. WhereCondition.append(" and ");
  25. }
  26. //where old value
  27. ConditionStr = getFieldValue(OldField);
  28. if(ConditionStr==null){ WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`" + " " + "is null");
  29. }else{
  30. WhereCondition.append("`"+field.getFieldname().toLowerCase()+"`"+" = "+ NewField.getValue());
  31. }
  32. hasPk = true;
  33. }
  34. if (hasSet) {
  35. SetValue.append(COMMA);
  36. }
  37. SetValue.append("`"+field.getFieldname().toLowerCase()+"`" + " = " + field.getValue());
  38. String setStr = getFieldValue(field);
  39. hasSet = true;
  40. }
  41. update_string.append("Update "+record.getTablename() +" Set " + SetValue + " Where "+WhereCondition +";");
  42. }
  43. protected String getFieldValue(Field field) throws Exception {
  44. ByteString byteString = field.getValue();
  45. if (byteString == null) {
  46. return null;
  47. }
  48. else {
  49. String value;
  50. if (field.getType() == com.taobao.drc.client.message.DataMessage.Record.Field.Type.STRING && field.getEncoding() != null && field.getEncoding() != "ASCII") {
  51. value = field.getValue().toString(field.getEncoding());
  52. }
  53. else {
  54. value = byteString.toString();
  55. }
  56. return value;
  57. }
  58. }

Delete 解析

如果一个 Record 是 Delete 语句,那么这个 Record 的操作类型为 DELETE。下面的代码示例介绍当 Delete 对应的表有主键时,如何获取完整的 Delete 语句:

  1. StringBuilder delete_string=new StringBuilder();
  2. Record.Type type=record.getOpt();
  3. DataMessage.Record.Field field;
  4. StringBuilder FieldName=new StringBuilder();
  5. StringBuilder FieldValue = new StringBuilder();
  6. StringBuilder DeleteCondition = new StringBuilder();
  7. boolean hasPk=false;
  8. boolean pkMode=false;
  9. if(type.equals(Record.Type.DELETE)){
  10. int i=0;
  11. List<DataMessage.Record.Field> fields = record.getFieldList();
  12. delete_string.append("Delete From" + record.getTablename() + "where");
  13. // 表是否有主键?
  14. if (record.getPrimaryKeys() != null) {
  15. pkMode = record.getPrimaryKeys().length() > 0 ? true : false;
  16. }
  17. for (; i < fields.size(); i++) {
  18. if ((pkMode && !field.isPrimary())) {
  19. continue;
  20. }
  21. if (hasPk) {
  22. delete_string.append(" and ");
  23. }
  24. delete_string.append(field.getFieldname() + "=" + field.getValue());
  25. hasPk = true;
  26. }
  27. delete_string.append(";");
  28. }

Replace 解析

如果源库执行了 Replace 语句,那么这个 Record 的操作类型为 UPDATE 或 INSERT。当 Replace 设置的值不存在时,Record 的操作类型为 INSERT,当 Replace 设置的值存在时,Record 的操作类型为 UPDATE。

Begin 解析

如果一个 Record 是 Begin 语句,那么这个 Record 的操作类型为 BEGIN。begin 语句没有实际内容,所以不需要对 Field 做处理,只需要判断操作为 begin 即可,代码示例如下:

  1. StringBuilder sql_string = new StringBuilder();
  2. Record.Type type = record.getOpt();
  3. if(type.equals(Record.Type.BEGIN)){
  4. sql_string.append("Begin");
  5. }

Commit 解析

如果一个 Record 是 Commit 语句,那么这个 Record 的操作类型为 COMMIT。commit 语句没有实际内容,所以不需要对 Field 做处理,只需要判断操作为 commit 即可,代码示例如下:

  1. StringBuilder sql_string = new StringBuilder();
  2. Record.Type type = record.getOpt();
  3. if(type.equals(Record.Type.COMMIT)){
  4. sql_string.append("commit");
  5. }
本文导读目录