全部产品
云市场

数据实时写入

更新时间:2019-08-30 18:10:39

代码示例如下:

  1. public class RealtimeWriterExample extends Example{
  2. public static void main(String[] args){
  3. MaxGraph maxGraph = null;
  4. try {
  5. //Create MaxGraph Connection
  6. maxGraph = MaxGraph.newConnector(graphName, endpoint).setUserName(username)
  7. .setPassword(password).connect();
  8. //Realtime dataClient init
  9. RealtimeDataClient realtimeDataClient = maxGraph.createRealTimeClient(true);
  10. //write vertex
  11. writeDomain(realtimeDataClient);
  12. writeIp(realtimeDataClient);
  13. //write edge
  14. writeResolvedTo(realtimeDataClient);
  15. writeCnameTo(realtimeDataClient);
  16. //close
  17. realtimeDataClient.close();
  18. }catch (Exception ex) {
  19. ex.printStackTrace();
  20. }finally {
  21. if(maxGraph != null){
  22. maxGraph.close();
  23. }
  24. }
  25. }
  26. private static void writeDomain(RealtimeDataClient realtimeDataClient) throws Exception {
  27. List<VertexDataRecord> records = Lists.newArrayList();
  28. IntStream.range(1, 1001).forEach(i -> {
  29. Map<String, Object> properties = Maps.newHashMap();
  30. properties.put("domain_name", "domain_"+ i);
  31. DataRecord.VertexDataRecord domain = new DataRecord.VertexDataRecord("domain", properties)
  32. .insertOverwriteOrUpdate(true);
  33. records.add(domain);
  34. });
  35. realtimeDataClient.batchInsertOverwriteOrUpdateVertex(records, false);
  36. }
  37. private static void writeIp(RealtimeDataClient realtimeDataClient) throws Exception {
  38. List<VertexDataRecord> records = Lists.newArrayList();
  39. IntStream.range(1, 1001).forEach(i -> {
  40. Map<String, Object> properties = Maps.newHashMap();
  41. properties.put("ip", "ip_"+ i);
  42. DataRecord.VertexDataRecord ip = new DataRecord.VertexDataRecord("ip", properties)
  43. .insertOverwriteOrUpdate(true);
  44. records.add(ip);
  45. });
  46. realtimeDataClient.batchInsertOverwriteOrUpdateVertex(records, false);
  47. }
  48. private static void writeResolvedTo(RealtimeDataClient realtimeDataClient) throws Exception {
  49. final DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
  50. Map<String, Object> srcPKProperties = Maps.newHashMap();
  51. Map<String, Object> dstPKProperties = Maps.newHashMap();
  52. Map<String, Object> edgeProperties = Maps.newHashMap();
  53. srcPKProperties.put("domain_name", String.format("domain_%s", 1));
  54. dstPKProperties.put("ip", String.format("ip_%s", 1));
  55. edgeProperties.put("query_time", dateFormat.format(new Date()));
  56. DataRecord.EdgeDataRecord edgeDataRecord = new DataRecord.EdgeDataRecord()
  57. .edge("resolved_to", "domain", "ip")
  58. .srcPk(srcPKProperties)
  59. .dstPk(dstPKProperties)
  60. .properties(edgeProperties)
  61. .insertOrUpdate(true);
  62. List<DataRecord.EdgeDataRecord> edgeDataRecords = Lists.newArrayList(edgeDataRecord);
  63. realtimeDataClient.batchInsertOverwriteOrUpdateEdge(edgeDataRecords, false);
  64. }
  65. private static void writeCnameTo(RealtimeDataClient realtimeDataClient) throws Exception {
  66. // sample data : domain_1 -> domain_2 -> domain_3 -> domain_4 -> domain_5
  67. List<EdgeDataRecord> edgeDataRecords = Lists.newArrayList();
  68. IntStream.range(1, 5).forEach(i -> {
  69. Map<String, Object> srcPKProperties = Maps.newHashMap();
  70. Map<String, Object> dstPKProperties = Maps.newHashMap();
  71. srcPKProperties.put("domain_name", String.format("domain_%s", i));
  72. dstPKProperties.put("domain_name", String.format("domain_%s", i+1));
  73. DataRecord.EdgeDataRecord edgeDataRecord = new DataRecord.EdgeDataRecord()
  74. .edge("cname_to", "domain", "domain")
  75. .srcPk(srcPKProperties)
  76. .dstPk(dstPKProperties)
  77. .properties(Maps.newHashMap())
  78. .insertOrUpdate(true);
  79. edgeDataRecords.add(edgeDataRecord);
  80. });
  81. realtimeDataClient.batchInsertOverwriteOrUpdateEdge(edgeDataRecords, false);
  82. }
  83. }