SpringBatch 连接 OceanBase 数据库

本文介绍 SpringBatch 连接示例。

环境配置

  • JDK 1.8。

  • OceanBase 3.x(MySQL 模式)。

  • 基于 spring-boot 集成 spring-batch。

配置依赖

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!--数据库驱动-->
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<!--连接池-->
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>druid</artifactId>
<version>1.1.22</version>
</dependency>
<!--基于实体类自动建表-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<!--spring-boot核心等其他依赖省略。。。。-->

配置文件

#yml 文件:
spring:
  datasource:
    url: jdbc:oceanbase://xxx.xxx.xxx.xxx:3306/test
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: a****
    password: ******
  jpa:
    hibernate:
      ddl-auto: update

示例代码

通过调用数据库写入类方法写入数据,具体步骤如下:

  1. 创建需要的类。

    实体类:PeoplePeopleDESC

    //基本 people 类
    
    @Entity
    @Table(name = "PEOPLE")
    public class People {
        @Id
        @GeneratedValue
        private int id;
        private String lastName;
        private String firstName;
        //略
    }
    //加工后的 peopledesc 类(增加 desc 属性)
    
    @Entity
    @Table(name = "OK_PEOPLE")
    public class PeopleDESC {
    
        @Id
        @GeneratedValue  //strategy = AUTO / IDENTITY / SEQUENCE  三种主键生成策略均不支持
        private int id;
        private String lastName;
        private String firstName;
        private String desc1;
        //略
    }

    批处理类:AddPeopleDescProcessorAddDescPeopleWriter

    //用于给 people 添加 desc 属性返回一个 peopledesc 类的处理器
    
    public class AddPeopleDescProcessor implements ItemProcessor<People, PeopleDESC> {
    
        public PeopleDESC process(People item) throws Exception {
    
            return new PeopleDESC(item.getId() , 
                                  item.getLastName(), 
                                  item.getFirstName(), 
                                  Thread.currentThread().getName());
        }
    }
  2. 将 peopledesc 写入数据库的类。

    //将 peopledesc 写入数据库的类
    
    public class AddDescPeopleWriter implements ItemWriter<PeopleDESC> {
        private JdbcTemplate jdbcTemplate;
        public void setDataSource(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }
    
        public JdbcTemplate getJdbcTemplate() {
            return jdbcTemplate;
        }
    
        public void write(List<? extends PeopleDESC> items) throws Exception {
            for (PeopleDESC peopleDESC : items) {
                this.jdbcTemplate
                    .update("insert into ok_people (id,first_name,last_name,desc1) values (?, ?, ?, ?)",
                            peopleDESC.getId(), peopleDESC.getFirstName(),
                            peopleDESC.getLastName(), peopleDESC.getDesc());
            }
        }
    }
  3. 启动工程 spring-data-jpa 会根据实体类 PeoplePeopleDESC 自动建表,示例结果如下。

    SpringBatch 连接示例1

  4. 批量添加测试数据至 people

    public void addData(){
        jdbcTemplate = new JdbcTemplate(dataSource);
        StringBuilder stringBuilder = new StringBuilder("");
        for (int i = 1 ; i<=100 ; i++){
            stringBuilder.append("insert into people values ("+i+",'first_test"+i+"','last_test"+i+"');");
            jdbcTemplate.execute(stringBuilder.toString());
            stringBuilder.delete(0, stringBuilder.length());
        }
    }

    测试下结果,显示如下:

    SpringBatch 连接示例2

  5. 执行批处理方法。

    public void writerTest() throws Exception {
        //获得 people 表的结果集
        String sql = "select * from people;";
        preparedStatement = dataSource.getConnection().prepareStatement(sql);
        ResultSet resultSet = preparedStatement.executeQuery();
        List list = new ArrayList<PeopleDESC>();
        //基于添加 desc 属性的处理器对结果集进行加工,并封装成一个 List<PeopleDESC>
        while (resultSet.next()){
            People people = peopleRowMapper.mapRow(resultSet, resultSet.getRow());
            PeopleDESC desc = addPeopleDescProcessor.process(people);
            list.add(desc);
        }
        //调用数据库写入类方法写入数据
        addDescPeopleWriter.setDataSource(dataSource);
        addDescPeopleWriter.write(list);
    }