Use the jar command on the MaxCompute client to submit and run a Graph job. The command syntax mirrors the JAR command used in MapReduce.
Submit a job
The JAR command syntax is:
Usage: jar [<GENERIC_OPTIONS>] <MAIN_CLASS> [ARGS]
-conf <configuration_file> Specify an application configuration file
-classpath <local_file_list> classpaths used to run mainClass
-D <name>=<value> Property value pair, which will be used to run mainClass
-local Run job in local mode
-resources <resource_name_list> file/table resources used in graph, seperate by comma
JAR command options
| Option | Scope | Description |
|---|---|---|
-conf <configuration_file> |
— | JobConf configuration file for the job. |
-classpath <local_file_list> |
Local only | Classpath for running <MAIN_CLASS> locally. Specifies the JAR where the main function lives. Separate multiple paths with ; on Windows or : on Linux. |
-D <name>=<value> |
Local only | Java property passed to <MAIN_CLASS> at local execution. Specify multiple times for multiple properties. |
-local |
— | Runs the job in local mode for debugging. |
-resources <resource_name_list> |
Distributed only | Comma-separated list of MaxCompute resources the Graph job reads at runtime. For cross-project resources, prefix with PROJECT_NAME/resources/ — for example, -resources otherproject/resources/resfile. |
-resourcesand-classpathserve different purposes even when both reference the same JAR.-resourcesmakes the JAR available on distributed workers.-classpathmakes the JAR available to the local JVM that runs the main function.
Submit without the MaxCompute client
Run the main function directly to submit the job programmatically. The following example uses the PageRank algorithm to show a complete job setup using GraphJob:
public static void main(String[] args) throws Exception {
if (args.length < 2)
printUsage();
// Authenticate and connect to MaxCompute
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(endPoint);
odps.setDefaultProject(project);
SessionState ss = SessionState.get();
ss.setOdps(odps);
ss.setLocalRun(false);
// Configure the Graph job
String resource = "mapreduce-examples.jar";
GraphJob job = new GraphJob();
// Add the JAR to the class cache — equivalent to -libjars in the jar command
job.addCacheResourcesToClassPath(resource);
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
// Set input and output tables
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// Default max iteration is 30; override if a third argument is provided
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
// Run the job and report elapsed time
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0
+ " seconds");
}
Configure input and output
Graph job input and output must be MaxCompute tables. Custom input and output formats are not supported.
Job input
GraphJob job = new GraphJob();
// Entire table as input
job.addInput(TableInfo.builder().tableName("tblname").build());
// Specific partition as input
job.addInput(TableInfo.builder().tableName("tblname").partSpec("pt1=a/pt2=b").build());
// Specific columns from a partition — accessed via record.get(0) in GraphLoader.load()
// Columns are read in the order specified here (col2 -> index 0, col0 -> index 1)
job.addInput(
TableInfo.builder().tableName("tblname").partSpec("pt1=a/pt2=b").build(),
new String[]{"col2", "col0"}
);
Multiple inputs are supported. TheaddInputframework reads records from the input table and passes them to yourGraphLoaderimplementation. Partition filter conditions are not supported — see Limits of MaxCompute Graph for details.
Job output
GraphJob job = new GraphJob();
// Partitioned table output — the last partition level must be specified
job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").build());
// Labeled output with overwrite behavior:
// true = overwrite (equivalent to INSERT OVERWRITE)
// false = append (equivalent to INSERT INTO)
job.addOutput(
TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").lable("output1").build(),
true
);
Multiple outputs are supported. Label each output with.lable(String). During runtime, write to an output by callingWorkContext.write()with the corresponding label.
Read resources
There are two ways to make resources available to a Graph job.
Declare resources on GraphJob
Call these methods before job.run() to register resources:
void addCacheResources(String resourceNames)
void addCacheResourcesToClassPath(String resourceNames)
Read resources from WorkerContext
You can read resources from the WorkerContext object:
public byte[] readCacheFile(String resourceName) throws IOException;
public Iterable<byte[]> readCacheArchive(String resourceName) throws IOException;
public Iterable<byte[]> readCacheArchive(String resourceName, String relativePath) throws IOException;
public Iterable<WritableRecord> readResourceTable(String resourceName);
public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException;
public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName) throws IOException;
public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException;
Use the stream APIs (readCacheFileAsStream,readCacheArchiveAsStream) when processing large resources — they reduce memory consumption by reading and processing data incrementally rather than loading everything at once. Alternatively, read resources in thesetup()method ofWorkerComputer, store them inWorkerValue, and retrieve them later withgetWorkerValue.
What's next
-
SSSP example — Single Source Shortest Path implementation using MaxCompute Graph
-
PageRank algorithm — full PageRank implementation and explanation
-
Limits of MaxCompute Graph — constraints on inputs, outputs, and resources