全部产品
云市场

BulkLoad数据导入

更新时间:2019-11-01 12:04:24

准备工作

该工具通过文件流接口快速导入数据到cassandra集群,是最快地将线下数据迁移到线上cassandra集群方法之一,准备工作如下:

  • 线上cassandra集群
  • 线下数据,sstable格式或者csv格式。
  • 同vpc准备一台独立的ecs,开放安全组,确保能访问cassandra集群

1. 准备同vpc下客户端ecs

建议独立的ecs,不要和线上cassandra集群混用,混用会影响线上服务。

2. 创建schema

  1. $ cqlsh -f schema.cql -u USERNAME -p PASSWORD [host]

3. 准备数据

3.1 sstable数据格式

按data/${keyspace}/${table} 格式组织目录,将sstable数据放入目录,如下示例

  1. ls /tmp/quote/historical_prices/
  2. md-1-big-CompressionInfo.db md-1-big-Data.db md-1-big-Digest.crc32 md-1-big-Filter.db md-1-big-Index.db md-1-big-Statistics.db md-1-big-Summary.db md-1-big-TOC.txt

上述示例中keyspace为quota,table为historical_prices

导入数据

执行sstableloader,在cassandra发行包bin目录下,指定数据目录 data/${ks}/${table}

  1. ${cassandra_home}/bin/sstableloader -d <ip address of the node> data/${ks}/${table}

静候sstable数据导入成功,使用cqlsh检查bin/cqlsh -u USERNAME -p PASSWORD [host]

  1. $ bin/cqlsh
  2. cqlsh> select * from quote.historical_prices;
  3. ticker | date | adj_close | close | high | low | open | volume
  4. --------+---------------------------------+-----------+-----------+-----------+-----------+-----------+--------
  5. ORCL | 2019-10-29 16:00:00.000000+0000 | 26.160000 | 26.160000 | 26.809999 | 25.629999 | 26.600000 | 181000
  6. ORCL | 2019-10-28 16:00:00.000000+0000 | 26.559999 | 26.559999 | 26.700001 | 22.600000 | 22.900000 | 555000

3.2 csv数据格式

csv格式数据需要先将csv数据转成sstable格式,cassandra给我们提供了CQLSSTableWriter工具,用于生成生成sstable,通过它可以将任意格式数据转化成sstable格式。因为csv格式也是需要自己预先组织,所以需要自己编写csv格式解析代码,然后编译执行。该工具使用示例代码如下,完整工具参考git repo

  1. // Prepare SSTable writer
  2. CQLSSTableWriter.Builder builder = CQLSSTableWriter.builder();
  3. // set output directory
  4. builder.inDirectory(outputDir)
  5. // set target schema
  6. .forTable(SCHEMA)
  7. // set CQL statement to put data
  8. .using(INSERT_STMT)
  9. // set partitioner if needed
  10. // default is Murmur3Partitioner so set if you use different one.
  11. .withPartitioner(new Murmur3Partitioner());
  12. CQLSSTableWriter writer = builder.build();
  13. //TODO: 读取csv文件,迭代读取每一行
  14. while ((line = csvReader.read()) != null)
  15. {
  16. writer.addRow(ticker,
  17. DATE_FORMAT.parse(line.get(0)),
  18. new BigDecimal(line.get(1)),
  19. new BigDecimal(line.get(2)),
  20. new BigDecimal(line.get(3)),
  21. new BigDecimal(line.get(4)),
  22. Long.parseLong(line.get(6)),
  23. new BigDecimal(line.get(5)));
  24. }
  25. writer.close();

执行自定义程序生成sstable格式数据后,照3.1 章节导入数据。