When your SQL query in MaxCompute runs slowly, the root cause is often not the SQL logic itself, but how the workload is distributed. An improperly configured degree of parallelism (DOP)—the number of parallel instances executing your job—can create severe performance bottlenecks. This can cause all data to be forced onto a single worker, crippling performance, or create excessive overhead from launching too many instances for a small task, leading to long queue times. This guide provides practical techniques to diagnose and tune the DOP for your specific workload. By correctly aligning the DOP with your data and query structure, you can dramatically improve execution speed and optimize resource utilization.
Degree of parallelism (DOP) optimization
The degree of parallelism (DOP) measures how many parallel instances run a job. For example, if a task with the ID M1 uses 1,000 instances, the DOP of M1 is 1000. Properly configuring the DOP can significantly improve job execution efficiency.
The following sections describe common DOP optimization scenarios.
Force a single instance for execution
Certain operations force a job onto a single instance, eliminating all parallelism and creating a severe bottleneck. These operations include:
Aggregating without a
GROUP BYclause, or with aGROUP BYclause on a constant.Using a window function where the
OVERclause specifiesPARTITION BYa constant. OmittingPARTITION BYentirely has the same effect—all data is sorted and aggregated on a single instance.Using a
DISTRIBUTE BYorCLUSTER BYclause on a constant.
To avoid this bottleneck, review whether a global aggregation is required. If it is, use a two-stage aggregation pattern: first group by a high-cardinality key to run a partial aggregation across all instances in parallel, then perform the final aggregation on the much smaller intermediate result set.
Impact of incorrect instance count
Higher parallelism does not always lead to better performance. Using too many instances can slow execution for two reasons:
Excessive parallelism increases resource contention and queue wait times.
Each instance has an initialization phase. At high DOP, the cumulative initialization overhead reduces the time available for actual computation.
The following scenarios commonly produce a suboptimal instance count:
-
Reading a large number of small partitions: If a query scans 10,000 partitions, the system may launch 10,000 instances. Each instance finishes in milliseconds but spends most of that time waiting in queue.
Reduce the partition scan count by applying effective partition pruning early in the query, filtering out unnecessary partitions, or splitting the query into smaller, more targeted jobs.
-
A small mapper split size: The default split size of
256 MBdivides large input datasets into many small instances. Each instance runs for only a short time, most of which is consumed by resource queuing rather than computation.Increase the split size so each mapper instance processes more data. You can also set the reducer instance count explicitly.
SET odps.stage.mapper.split.size=<256>; SET odps.stage.reducer.num=<Maximum number of concurrent instances>;
Configure the number of instances
-
For table read tasks (mappers)
-
Method 1: Set a global split size parameter.
-- Configure the maximum amount of input data per mapper instance. Unit: MB. -- Default value: 256. Valid values: [1,Integer.MAX_VALUE]. SET odps.sql.mapper.split.size=<value>; -
Method 2: Use a query hint for per-table control. The
split_sizehint overrides the global parameter for a specific table read operation, giving you finer-grained control without affecting the rest of the query.-- Split the src table into subtasks of 1 MB each. SELECT a.key FROM src a /*+split_size(1)*/ JOIN src2 b ON a.key=b.key; Method 3: Split data at the table level by size, row count, or a specified DOP.
The
odps.sql.mapper.split.sizeparameter applies globally to all mapper stages and has a minimum value of 1 MB. For cases where rows are small but computationally expensive—meaning you want more instances without changing the data volume per split—use the following table-level parameters instead.Use the following parameters for table-level DOP tuning:
-
Set the data split size per instance at the table level.
SET odps.sql.split.size = {"table1": 1024, "table2": 512}; -
Set the row count processed per instance at the table level.
SET odps.sql.split.row.count = {"table1": 100, "table2": 500}; -
Set the DOP directly at the table level.
SET odps.sql.split.dop = {"table1": 1, "table2": 5};
NoteThe
odps.sql.split.row.countandodps.sql.split.dopparameters apply only to internal tables, non-transactional tables, and non-clustered tables. -
-
For non-read tasks (reducers and joiners)
-
Method 1: Set the reducer instance count. This setting applies to all reducer tasks in the query.
-- Set the number of reducer instances. -- Valid values: [1,99999]. SET odps.stage.reducer.num=<value>; -
Method 2: Set the joiner instance count. This setting applies to all joiner tasks in the query.
-- Set the number of joiner instances. -- Valid values: [1,99999]. SET odps.stage.joiner.num=<value>; Method 3: Adjust the upstream mapper count. The reducer instance count is derived from its preceding mapper stage. Increasing the mapper count indirectly increases reducer parallelism.
-
Window function optimization
Each window function in a query typically triggers a separate reduce job. When a query contains multiple window functions, this multiplies resource consumption significantly. MaxCompute automatically merges multiple window functions into a single reduce job when both conditions are met:
The
OVERclauses are identical—samePARTITION BYandORDER BYconditions.The window functions appear in the same
SELECTstatement.
The following query qualifies for automatic merging because both RANK() and ROW_NUMBER() share the same OVER clause and appear in the same SELECT statement. MaxCompute runs them in a single reduce job instead of two.
SELECT
RANK() OVER (PARTITION BY A ORDER BY B desc) AS RANK,
ROW_NUMBER() OVER (PARTITION BY A ORDER BY B desc) AS row_num
FROM MyTable;
Subquery optimization
Consider a query that filters using an IN subquery:
SELECT * FROM table_a a WHERE a.col1 IN (SELECT col1 FROM table_b b WHERE xxx);
If the subquery on table_b returns more than 9,999 values for col1, MaxCompute reports: records returned from subquery exceeded limit of 9999. Rewrite the query as a JOIN to remove this limit:
SELECT a.* FROM table_a a JOIN (SELECT DISTINCT col1 FROM table_b b WHERE xxx) c ON (a.col1 = c.col1);
Omitting
DISTINCTcan cause duplicatecol1values from subquerycto multiply rows from tablea, producing more results than expected.DISTINCTforces the subquery onto a single reducer, which becomes a bottleneck for large datasets.If your business logic guarantees that
col1values are unique, removeDISTINCTto avoid the single-reducer bottleneck.
JOIN statement optimization
For MaxCompute to apply partition pruning during a JOIN, filter partitioned tables before the join executes—not after. Without early filtering, the system performs the JOIN across all partitions first, then applies the filter, scanning far more data than necessary.
Follow these rules:
Apply partition-limiting conditions on the primary table inside a subquery before the JOIN.
Place other
WHEREclauses that filter the primary table at the end of the SQL statement.Apply partition-limiting conditions on the secondary table in the
ONclause or in a subquery—not in the finalWHEREclause.
The following examples illustrate these practices.
SELECT * FROM A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id WHERE A.dt=20150301;
SELECT * FROM A JOIN B ON B.id=A.id WHERE B.dt=20150301; -- We recommend that you do not use this statement. The system performs the JOIN operation before it performs partition pruning. This increases the amount of data and causes the query performance to deteriorate.
SELECT * FROM (SELECT * FROM A WHERE dt=20150301)A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id;
Aggregate function optimization
For string aggregation, wm_concat generally outperforms collect_list. The following examples show equivalent operations using each function.
-- Implement the collect_list function.
SELECT concat_ws(',', sort_array(collect_list(key))) FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) WITHIN GROUP (ORDER BY key) FROM src;
-- Implement the collect_list function.
SELECT array_join(collect_list(key), ',') FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) FROM src;