全部产品
云市场
云游戏

从MaxCompute获取数据

更新时间:2019-11-04 15:59:11

前提条件介绍

GraphCompute 支持将MaxCompute 表批量导入,但是这个过程中涉及到读取用户MaxCompute表的操作,所以需要用户提前授权GraphCompute公共账号有读取用户MaxCompute表的权限,具体的操作步骤如下所示:

  1. -- 1. 切换到用户MaxCompute表所在的project
  2. use project ***;
  3. -- 2. MaxGraph公共账号进行授权
  4. add user ALIYUN$big_graph@aliyun-inner.com;
  5. GRANT Describe,Select ON TABLE table_name TO USER ALIYUN$big_graph@aliyun-inner.com;

使用步骤及示例

使用SDK进行图数据的导入,需要先完成Client创建和Schema构建。

在此基础上, 本文介绍如何初始化mapping信息(maxCompute表的一行记录中的哪个字段写到graphCompute 点/边的哪个属性上)及bulk load的一些配置信息;

步骤1,准备好源数据

我们假设maxCompute上存在如下数据,可作为图模型的源数据。

maxCompute表person结构如下,可以作为GraphCompute中图模型的点person的源数据

字段名 字段类型
id bigint
name string
age int

maxCompute表knows结构如下,可以作为GraphCompute中图模型的点knows的源数据

字段名 字段类型
id bigint
srcid bigint
dstid bigint
weight double

步骤2,指定对应关系

用户需要在sdk中做好图属性和MaxCompute表项的对应关系。代码示例如下:

  1. private String odpsEndpoint = "***";
  2. private String yourOdpsAccessId = "***";
  3. private String yourOdpsAccessKey = "**";
  4. private String yourBizId = "**";
  5. private String yourOdpsProject = "**";
  6. // odps点表的table name
  7. private String yourOdpsVertexTable = "**";
  8. // odps边表的table name
  9. private String yourOdpsEdgeTable = "**";
  10. // odps中点表的colume
  11. private String odpsPersonIdField = "**";
  12. private String odpsPersonNameField = "**";
  13. private String odpsPersonAgeField = "**";
  14. // maxgraph 中点的属性
  15. private String personIdProp = "**";
  16. private String personNameProp = "**";
  17. private String personAgeProp = "**";
  18. // odps 中边表的colume
  19. private String odpsKnowsIdField = "**";
  20. private String odpsKnowsWeightField = "**";
  21. // maxgaph 中边的属性
  22. private String knowsIdProp = "**";
  23. private String knowsWeightProp = "**";
  24. // maxgraph 中边的起点主键和终点主键
  25. private String knowsSrcPersonIdField = "**";
  26. private String knowsDstPersonIdField = "**";

步骤3,从MaxCompute表批量导入点数据到person中

  1. ResultSet resultSet = client.submit("graph.bulkloadVertexFromOdps('person')" +
  2. ".endpoint('" + odpsEndpoint + "')" +
  3. ".accessId('" + yourOdpsAccessId + "')" +
  4. ".accessKey('" + yourOdpsAccessKey + "')" +
  5. ".bizOwnerId('" + yourBizId + "')" +
  6. ".project('" + yourOdpsProject + "')" +
  7. ".table('" + yourOdpsVertexTable + "')" +
  8. //".partition('" + yourOdpsPartition + "')" +
  9. //".maxInvalidDataCount(10)" + // Default invalid data count is 0
  10. ".mappingColumn('" + odpsPersonIdField + "', '" + personIdProp + "')" +
  11. // mappingColumn 定义了将odps表的一行记录的colume与maxgraph中点的属性的映射关系
  12. ".mappingColumn('" + odpsPersonNameField + "', '" + personNameProp + "')" +
  13. ".mappingColumn('" + odpsPersonAgeField + "', '" + personAgeProp + "')" +
  14. ".signature('" + getCredentialsManager().getPassword() + "')");

步骤4,从MaxCompute表批量导入边数据knows中

  1. ResultSet resultSet = client.submit("graph.bulkloadEdgeFromOdps('knows')" +
  2. ".endpoint('" + odpsEndpoint + "')" +
  3. ".accessId('" + yourOdpsAccessId + "')" +
  4. ".accessKey('" + yourOdpsAccessKey + "')" +
  5. ".bizOwnerId('" + yourBizId + "')" +
  6. ".project('" + yourOdpsProject + "')" +
  7. ".table('" + yourOdpsEdgeTable + "')" +
  8. //".partition('" + yourOdpsPartition + "')" +
  9. //".maxInvalidDataCount(10)" + // Default invalid data count is 0
  10. ".mappingColumn('" + odpsKnowsIdField + "', '" + knowsIdProp + "')" +
  11. ".mappingColumn('" + odpsKnowsWeightField + "', '" + knowsWeightProp + "')" +
  12. ".srcVertex('person')" +
  13. ".mappingSrcPrimaryKey('" + knowsSrcPersonIdField + "','" + personIdProp + "')" +
  14. ".dstVertex('person')" +
  15. ".mappingDstPrimaryKey('" + knowsDstPersonIdField + "','" + personIdProp + "')" +
  16. ".signature('" + getCredentialsManager().getPassword() + "')");