全部产品
云市场

从MaxCompute获取数据

更新时间:2019-08-30 18:11:12

从MaxCompute导入数据到点:

  1. public class BulkLoadFromODPSExample extends Example{
  2. public static void main(String[] args) {
  3. MaxGraph maxGraph = null;
  4. try {
  5. maxGraph = MaxGraph.newConnector(graphName, endpoint).setUserName(username)
  6. .setPassword(password).connect();
  7. //init BulkLoadClient
  8. BulkLoadClient bulkLoadClient = maxGraph.bulkLoadClient();
  9. //config property->column mapping
  10. Parameter.MappingMessage domain_mapping = new Parameter.MappingMessage("domain_name", "domain_name");
  11. //config import job
  12. Parameter jobParameter = new Parameter("domain", VERTEX, "biggraph_dev.domain", "", 0,
  13. Platform.ODPS, OnlineMode.PARTITION, Lists.newArrayList(domain_mapping));
  14. //build
  15. BuildResp buildResp = bulkLoadClient.build(jobParameter);
  16. if(buildResp.getResponse().getErrCode() !=0 ){
  17. System.out.println("import build err. msg:"+buildResp.getResponse().getErrMsg());
  18. System.exit(-1);
  19. }
  20. Status buildStatus = bulkLoadClient.buildStatus(buildResp.getBuildId(), Platform.ODPS).getStatus();
  21. while(!buildStatus.equals(JobStatus.Status.SUCCESS)) {
  22. Thread.sleep(10000);
  23. buildStatus = bulkLoadClient.buildStatus(buildResp.getBuildId(), Platform.ODPS).getStatus();
  24. System.out.println("waiting for build finish. logview:"+buildResp.getLogview());
  25. if(buildStatus.equals(Status.FAILED)){
  26. System.out.println("import build failed. logview:"+buildResp.getLogview());
  27. System.exit(-1);
  28. }
  29. }
  30. //online
  31. Response onlineResp = bulkLoadClient.online(jobParameter, buildResp.getOnlineId());
  32. if(onlineResp.getErrCode() !=0 ){
  33. System.out.println("import online err. msg:"+onlineResp.getErrMsg());
  34. System.exit(-1);
  35. }
  36. Status onlineStatus = bulkLoadClient.onlineStatus(buildResp.getOnlineId()).getStatus();
  37. while(! onlineStatus.equals(JobStatus.Status.SUCCESS)) {
  38. Thread.sleep(10000);
  39. onlineStatus = bulkLoadClient.onlineStatus(buildResp.getOnlineId()).getStatus();
  40. System.out.println("waiting for online finish. msg:"+onlineResp.getErrMsg());
  41. if(onlineStatus.equals(Status.FAILED)){
  42. System.out.println("import online failed. msg:"+onlineResp.getErrMsg());
  43. System.exit(-1);
  44. }
  45. }
  46. System.out.println("import job success.");
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. } finally {
  50. if(maxGraph != null){
  51. maxGraph.close();
  52. }
  53. }
  54. }
  55. }

从MaxCompute导入数据到边:

  1. public void loadEdgeTest() {
  2. try (MaxGraph maxGraph = MaxGraph.newConnector(graphName, webUrl)
  3. .setUserName(userName)
  4. .setPassword(password)
  5. .connect()) {
  6. BulkLoadClient bulkLoadClient = maxGraph.bulkLoadClient();
  7. // 起点的主键 和 column name
  8. Parameter.MappingMessage srcPKMapping = new Parameter.MappingMessage("id", "person_id");
  9. // 终点的 主键 和 column name
  10. Parameter.MappingMessage dstPKMapping = new Parameter.MappingMessage("id", "person_id2");
  11. List<Parameter.MappingMessage> srcPrimaryList = new ArrayList<>();
  12. List<Parameter.MappingMessage> dstPrimaryList = new ArrayList<>();
  13. srcPrimaryList.add(srcPKMapping);
  14. dstPrimaryList.add(dstPKMapping);
  15. Parameter.MappingMessage pro_id = new Parameter.MappingMessage("id", "id");
  16. Parameter.MappingMessage pro_weight = new Parameter.MappingMessage("weight", "weight");
  17. List<Parameter.MappingMessage> propertyList = new ArrayList<>();
  18. propertyList.add(pro_id);
  19. propertyList.add(pro_weight);
  20. Parameter.UserOdpsProject userOdpsProject = new Parameter.UserOdpsProject(userAccessId, userAccessKey, userProject, userEndpoint);
  21. Parameter edgeParameter = new Parameter("knows", EDGE, "person", "person", "maxgraph_dev.maxgraph_demo_knows", "", 10,
  22. Platform.ODPS, OnlineMode.PARTITION, propertyList, srcPrimaryList, dstPrimaryList, userOdpsProject);
  23. BuildResp buildResp = bulkLoadClient.build(edgeParameter);
  24. String buildId = buildResp.getBuildId();
  25. int errCode = buildResp.getResponse().getErrCode();
  26. if(errCode != ExceptionUtils.ErrorCode.OK.toInt()) {
  27. System.out.println("build failed");
  28. System.out.println(buildResp.getResponse().getErrMsg());
  29. return;
  30. }
  31. int onlineId = buildResp.getOnlineId();
  32. String logview = buildResp.getLogview();
  33. System.out.println("logView: " + logview);
  34. System.out.println("build id: " + buildId);
  35. while(! bulkLoadClient.buildStatus(buildId, userOdpsProject, Platform.ODPS).getStatus().equals(JobStatus.Status.SUCCESS)) {
  36. if(bulkLoadClient.buildStatus(buildId, userOdpsProject, Platform.ODPS).getStatus().equals(JobStatus.Status.FAILED)) {
  37. System.out.println("build failed");
  38. return;
  39. } else if(bulkLoadClient.buildStatus(buildId, userOdpsProject, Platform.ODPS).getStatus().equals(JobStatus.Status.UNKNOWN)) {
  40. System.out.println("build unknown");
  41. } else if (bulkLoadClient.buildStatus(buildId, userOdpsProject, Platform.ODPS).getStatus().equals(JobStatus.Status.RUNNING)){
  42. Thread.sleep(10000);
  43. System.out.println("building");
  44. } else {
  45. System.out.println("error");
  46. }
  47. }
  48. Response response = bulkLoadClient.online(edgeParameter, onlineId);
  49. if(response.getErrCode() != ExceptionUtils.ErrorCode.OK.toInt()) {
  50. System.out.println("submit online failed");
  51. System.out.println(response.getErrMsg());
  52. return;
  53. }
  54. while(! bulkLoadClient.onlineStatus(onlineId).getStatus().equals(JobStatus.Status.SUCCESS)) {
  55. if(bulkLoadClient.onlineStatus(onlineId).getStatus().equals(JobStatus.Status.FAILED)) {
  56. System.out.println("online failed");
  57. } else if(bulkLoadClient.onlineStatus(onlineId).getStatus().equals(JobStatus.Status.FAILED)) {
  58. System.out.println("online unknown");
  59. }
  60. else {
  61. Thread.sleep(10000);
  62. System.out.println("onlining");
  63. }
  64. }
  65. } catch (Exception e) {
  66. e.printStackTrace();
  67. }
  68. }