SQL query optimization

更新时间:
复制 MD 格式

When your SQL query in MaxCompute runs slowly, the root cause is often not the SQL logic itself, but how the workload is distributed. An improperly configured degree of parallelism (DOP)—the number of parallel instances executing your job—can create severe performance bottlenecks. This can cause all data to be forced onto a single worker, crippling performance, or create excessive overhead from launching too many instances for a small task, leading to long queue times. This guide provides practical techniques to diagnose and tune the DOP for your specific workload. By correctly aligning the DOP with your data and query structure, you can dramatically improve execution speed and optimize resource utilization.

Degree of parallelism (DOP) optimization

The degree of parallelism (DOP) measures how many parallel instances run a job. For example, if a task with the ID M1 uses 1,000 instances, the DOP of M1 is 1000. Properly configuring the DOP can significantly improve job execution efficiency.

The following sections describe common DOP optimization scenarios.

Force a single instance for execution

Certain operations force a job onto a single instance, eliminating all parallelism and creating a severe bottleneck. These operations include:

  • Aggregating without a GROUP BY clause, or with a GROUP BY clause on a constant.

  • Using a window function where the OVER clause specifies PARTITION BY a constant. Omitting PARTITION BY entirely has the same effect—all data is sorted and aggregated on a single instance.

  • Using a DISTRIBUTE BY or CLUSTER BY clause on a constant.

To avoid this bottleneck, review whether a global aggregation is required. If it is, use a two-stage aggregation pattern: first group by a high-cardinality key to run a partial aggregation across all instances in parallel, then perform the final aggregation on the much smaller intermediate result set.

Impact of incorrect instance count

Important

Higher parallelism does not always lead to better performance. Using too many instances can slow execution for two reasons:

  • Excessive parallelism increases resource contention and queue wait times.

  • Each instance has an initialization phase. At high DOP, the cumulative initialization overhead reduces the time available for actual computation.

The following scenarios commonly produce a suboptimal instance count:

  • Reading a large number of small partitions: If a query scans 10,000 partitions, the system may launch 10,000 instances. Each instance finishes in milliseconds but spends most of that time waiting in queue.

    Reduce the partition scan count by applying effective partition pruning early in the query, filtering out unnecessary partitions, or splitting the query into smaller, more targeted jobs.

  • A small mapper split size: The default split size of 256 MB divides large input datasets into many small instances. Each instance runs for only a short time, most of which is consumed by resource queuing rather than computation.

    Increase the split size so each mapper instance processes more data. You can also set the reducer instance count explicitly.

    SET odps.stage.mapper.split.size=<256>;
    SET odps.stage.reducer.num=<Maximum number of concurrent instances>;

Configure the number of instances

  • For table read tasks (mappers)

    • Method 1: Set a global split size parameter.

      -- Configure the maximum amount of input data per mapper instance. Unit: MB.
      -- Default value: 256. Valid values: [1,Integer.MAX_VALUE].
      SET odps.sql.mapper.split.size=<value>;
    • Method 2: Use a query hint for per-table control. The split_size hint overrides the global parameter for a specific table read operation, giving you finer-grained control without affecting the rest of the query.

      -- Split the src table into subtasks of 1 MB each.
      SELECT a.key FROM src a /*+split_size(1)*/ JOIN src2 b ON a.key=b.key;
    • Method 3: Split data at the table level by size, row count, or a specified DOP.

    The odps.sql.mapper.split.size parameter applies globally to all mapper stages and has a minimum value of 1 MB. For cases where rows are small but computationally expensive—meaning you want more instances without changing the data volume per split—use the following table-level parameters instead.

    Use the following parameters for table-level DOP tuning:

    • Set the data split size per instance at the table level.

      SET odps.sql.split.size = {"table1": 1024, "table2": 512};
    • Set the row count processed per instance at the table level.

      SET odps.sql.split.row.count = {"table1": 100, "table2": 500};
    • Set the DOP directly at the table level.

      SET odps.sql.split.dop = {"table1": 1, "table2": 5};
    Note

    The odps.sql.split.row.count and odps.sql.split.dop parameters apply only to internal tables, non-transactional tables, and non-clustered tables.

  • For non-read tasks (reducers and joiners)

    • Method 1: Set the reducer instance count. This setting applies to all reducer tasks in the query.

      -- Set the number of reducer instances.
      -- Valid values: [1,99999].
      SET odps.stage.reducer.num=<value>;
    • Method 2: Set the joiner instance count. This setting applies to all joiner tasks in the query.

      -- Set the number of joiner instances.
      -- Valid values: [1,99999].
      SET odps.stage.joiner.num=<value>;
    • Method 3: Adjust the upstream mapper count. The reducer instance count is derived from its preceding mapper stage. Increasing the mapper count indirectly increases reducer parallelism.

Window function optimization

Each window function in a query typically triggers a separate reduce job. When a query contains multiple window functions, this multiplies resource consumption significantly. MaxCompute automatically merges multiple window functions into a single reduce job when both conditions are met:

  • The OVER clauses are identical—same PARTITION BY and ORDER BY conditions.

  • The window functions appear in the same SELECT statement.

The following query qualifies for automatic merging because both RANK() and ROW_NUMBER() share the same OVER clause and appear in the same SELECT statement. MaxCompute runs them in a single reduce job instead of two.

SELECT
RANK() OVER (PARTITION BY A ORDER BY B desc) AS RANK,
ROW_NUMBER() OVER (PARTITION BY A ORDER BY B desc) AS row_num
FROM MyTable;

Subquery optimization

Consider a query that filters using an IN subquery:

SELECT * FROM table_a a WHERE a.col1 IN (SELECT col1 FROM table_b b WHERE xxx);

If the subquery on table_b returns more than 9,999 values for col1, MaxCompute reports: records returned from subquery exceeded limit of 9999. Rewrite the query as a JOIN to remove this limit:

SELECT a.* FROM table_a a JOIN (SELECT DISTINCT col1 FROM table_b b WHERE xxx) c ON (a.col1 = c.col1);
Note
  • Omitting DISTINCT can cause duplicate col1 values from subquery c to multiply rows from table a, producing more results than expected.

  • DISTINCT forces the subquery onto a single reducer, which becomes a bottleneck for large datasets.

  • If your business logic guarantees that col1 values are unique, remove DISTINCT to avoid the single-reducer bottleneck.

JOIN statement optimization

For MaxCompute to apply partition pruning during a JOIN, filter partitioned tables before the join executes—not after. Without early filtering, the system performs the JOIN across all partitions first, then applies the filter, scanning far more data than necessary.

Follow these rules:

  • Apply partition-limiting conditions on the primary table inside a subquery before the JOIN.

  • Place other WHERE clauses that filter the primary table at the end of the SQL statement.

  • Apply partition-limiting conditions on the secondary table in the ON clause or in a subquery—not in the final WHERE clause.

The following examples illustrate these practices.

SELECT * FROM A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id WHERE A.dt=20150301;
SELECT * FROM A JOIN B ON B.id=A.id WHERE B.dt=20150301; -- We recommend that you do not use this statement. The system performs the JOIN operation before it performs partition pruning. This increases the amount of data and causes the query performance to deteriorate. 
SELECT * FROM (SELECT * FROM A WHERE dt=20150301)A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id;

Aggregate function optimization

For string aggregation, wm_concat generally outperforms collect_list. The following examples show equivalent operations using each function.

-- Implement the collect_list function.
SELECT concat_ws(',', sort_array(collect_list(key))) FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) WITHIN GROUP (ORDER BY key) FROM src;

-- Implement the collect_list function.
SELECT array_join(collect_list(key), ',') FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) FROM src;