本文介绍MSE XXL-JOB性能测试方法。
测试环境准备
示例代码
创建任务:参考如下代码通过API批量创建压测任务。
根据实际情况修改如下参数。
说明通过调整时间表达式与创建的任务数,控制任务并发总量,以控制测试负载压力。
access_key_id:云账号 AccessKeyID。
access_key_secret:云账号 AccessKeySecret。
cluster_id:集群ID。
app_name:应用名称。
time_expression:时间表达式。
task_count:创建的任务数。
task_name_prefix:任务名前缀。
import os import sys from typing import List from alibabacloud_schedulerx320240624.client import Client as SchedulerX320240624Client from alibabacloud_credentials.client import Client as CredentialClient from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_schedulerx320240624 import models as scheduler_x320240624_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient class Sample: def __init__(self): pass @staticmethod def create_client() -> SchedulerX320240624Client: """ 使用凭据初始化账号Client @return: Client @throws Exception """ # 工程代码建议使用更安全的无AK方式,凭据配置方式请参见:https://help.aliyun.com/document_detail/378659.html。 config = open_api_models.Config( access_key_id=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID'), access_key_secret=os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET') ) # Endpoint 请参考 https://api.aliyun.com/product/SchedulerX3 config.endpoint = f'schedulerx3.cn-hangzhou.aliyuncs.com' return SchedulerX320240624Client(config) @staticmethod def create_multiple_jobs( job_count: int, base_name: str = 'test-job' ) -> None: """ 创建多个任务 @param job_count: 要创建的任务数量 @param base_name: 任务名称前缀 """ client = Sample.create_client() runtime = util_models.RuntimeOptions() for i in range(job_count): job_name = f'{base_name}{i:03d}' # 格式化任务名称,如 test-job000, test-job001... create_job_request = scheduler_x320240624_models.CreateJobRequest( cluster_id='YOUR_CLUSTER_ID', app_name='YOUR_APP_NAME', name=job_name, job_type='script_python', route_strategy=1, job_handler='testJobVoidHandler', time_type=1, time_expression='*/1 * * * * ?' ) try: # 复制代码运行请自行打印 API 的返回值 client.create_job_with_options(create_job_request, runtime) print(f"成功创建任务: {job_name}") except Exception as error: # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 # 错误 message print(f"创建任务 {job_name} 失败: {error.message}") # 诊断地址 if error.data and error.data.get("Recommend"): print(f"诊断地址: {error.data.get('Recommend')}") @staticmethod def main( args: List[str], ) -> None: client = Sample.create_client() create_job_request = scheduler_x320240624_models.CreateJobRequest( cluster_id='YOUR_CLUSTER_ID', app_name='YOUR_APP_NAME', name='test-job091', job_type='script_python', route_strategy=1, job_handler='testJobVoidHandler', time_type=1, time_expression='*/1 * * * * ?' ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 client.create_job_with_options(create_job_request, runtime) except Exception as error: # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 # 错误 message print(error.message) # 诊断地址 print(error.data.get("Recommend")) UtilClient.assert_as_string(error.message) @staticmethod async def main_async( args: List[str], ) -> None: client = Sample.create_client() create_job_request = scheduler_x320240624_models.CreateJobRequest( cluster_id='YOUR_CLUSTER_ID', app_name='YOUR_APP_NAME', name='test-job091', job_type='script_python', route_strategy=1, job_handler='testJobVoidHandler', time_type=1, time_expression='*/1 * * * * ?' ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 await client.create_job_with_options_async(create_job_request, runtime) except Exception as error: # 此处仅做打印展示,请谨慎对待异常处理,在工程项目中切勿直接忽略异常。 # 错误 message print(error.message) # 诊断地址 print(error.data.get("Recommend")) UtilClient.assert_as_string(error.message) if __name__ == '__main__': # 任务数与任务名前缀 task_count = 2 task_name_prefix = 'my-task-' Sample.create_multiple_jobs(task_count, task_name_prefix)
该文章对您有帮助吗?