Instructions

更新时间:
复制 MD 格式

Use the jar command on the MaxCompute client to submit and run a Graph job. The command syntax mirrors the JAR command used in MapReduce.

Submit a job

The JAR command syntax is:

Usage: jar [<GENERIC_OPTIONS>] <MAIN_CLASS> [ARGS]
    -conf <configuration_file>         Specify an application configuration file
    -classpath <local_file_list>       classpaths used to run mainClass
    -D <name>=<value>                  Property value pair, which will be used to run mainClass
    -local                             Run job in local mode
    -resources <resource_name_list>    file/table resources used in graph, seperate by comma

JAR command options

Option Scope Description
-conf <configuration_file> JobConf configuration file for the job.
-classpath <local_file_list> Local only Classpath for running <MAIN_CLASS> locally. Specifies the JAR where the main function lives. Separate multiple paths with ; on Windows or : on Linux.
-D <name>=<value> Local only Java property passed to <MAIN_CLASS> at local execution. Specify multiple times for multiple properties.
-local Runs the job in local mode for debugging.
-resources <resource_name_list> Distributed only Comma-separated list of MaxCompute resources the Graph job reads at runtime. For cross-project resources, prefix with PROJECT_NAME/resources/ — for example, -resources otherproject/resources/resfile.
-resources and -classpath serve different purposes even when both reference the same JAR. -resources makes the JAR available on distributed workers. -classpath makes the JAR available to the local JVM that runs the main function.

Submit without the MaxCompute client

Run the main function directly to submit the job programmatically. The following example uses the PageRank algorithm to show a complete job setup using GraphJob:

public static void main(String[] args) throws Exception {
  if (args.length < 2)
    printUsage();

  // Authenticate and connect to MaxCompute
  Account account = new AliyunAccount(accessId, accessKey);
  Odps odps = new Odps(account);
  odps.setEndpoint(endPoint);
  odps.setDefaultProject(project);

  SessionState ss = SessionState.get();
  ss.setOdps(odps);
  ss.setLocalRun(false);

  // Configure the Graph job
  String resource = "mapreduce-examples.jar";
  GraphJob job = new GraphJob();

  // Add the JAR to the class cache — equivalent to -libjars in the jar command
  job.addCacheResourcesToClassPath(resource);
  job.setGraphLoaderClass(PageRankVertexReader.class);
  job.setVertexClass(PageRankVertex.class);

  // Set input and output tables
  job.addInput(TableInfo.builder().tableName(args[0]).build());
  job.addOutput(TableInfo.builder().tableName(args[1]).build());

  // Default max iteration is 30; override if a third argument is provided
  job.setMaxIteration(30);
  if (args.length >= 3)
    job.setMaxIteration(Integer.parseInt(args[2]));

  // Run the job and report elapsed time
  long startTime = System.currentTimeMillis();
  job.run();
  System.out.println("Job Finished in "
      + (System.currentTimeMillis() - startTime) / 1000.0
      + " seconds");
}

Configure input and output

Graph job input and output must be MaxCompute tables. Custom input and output formats are not supported.

Job input

GraphJob job = new GraphJob();

// Entire table as input
job.addInput(TableInfo.builder().tableName("tblname").build());

// Specific partition as input
job.addInput(TableInfo.builder().tableName("tblname").partSpec("pt1=a/pt2=b").build());

// Specific columns from a partition — accessed via record.get(0) in GraphLoader.load()
// Columns are read in the order specified here (col2 -> index 0, col0 -> index 1)
job.addInput(
    TableInfo.builder().tableName("tblname").partSpec("pt1=a/pt2=b").build(),
    new String[]{"col2", "col0"}
);
Multiple inputs are supported. The addInput framework reads records from the input table and passes them to your GraphLoader implementation. Partition filter conditions are not supported — see Limits of MaxCompute Graph for details.

Job output

GraphJob job = new GraphJob();

// Partitioned table output — the last partition level must be specified
job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").build());

// Labeled output with overwrite behavior:
// true  = overwrite (equivalent to INSERT OVERWRITE)
// false = append   (equivalent to INSERT INTO)
job.addOutput(
    TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").lable("output1").build(),
    true
);
Multiple outputs are supported. Label each output with .lable(String). During runtime, write to an output by calling WorkContext.write() with the corresponding label.

Read resources

There are two ways to make resources available to a Graph job.

Declare resources on GraphJob

Call these methods before job.run() to register resources:

void addCacheResources(String resourceNames)
void addCacheResourcesToClassPath(String resourceNames)

Read resources from WorkerContext

You can read resources from the WorkerContext object:

public byte[] readCacheFile(String resourceName) throws IOException;
public Iterable<byte[]> readCacheArchive(String resourceName) throws IOException;
public Iterable<byte[]> readCacheArchive(String resourceName, String relativePath) throws IOException;
public Iterable<WritableRecord> readResourceTable(String resourceName);
public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException;
public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName) throws IOException;
public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException;
Use the stream APIs (readCacheFileAsStream, readCacheArchiveAsStream) when processing large resources — they reduce memory consumption by reading and processing data incrementally rather than loading everything at once. Alternatively, read resources in the setup() method of WorkerComputer, store them in WorkerValue, and retrieve them later with getWorkerValue.

What's next