通过Java SDK开发Spark应用
更新时间:
云原生数据仓库 AnalyticDB MySQL 版集群支持通过Java SDK开发Spark应用和Spark SQL作业。本文介绍通过Java SDK提交Spark作业、查询Spark作业的状态和日志信息、结束Spark作业以及查询Spark历史作业的操作步骤。
前提条件
JDK为1.8及以上版本。
集群的产品系列为企业版、基础版或湖仓版。
已在AnalyticDB for MySQL集群中创建Job型资源组。具体操作,请参见新建资源组。
已配置Spark日志的存储地址。
说明配置Spark日志存储地址的两种方法如下:
在AnalyticDB for MySQL控制台的Spark Jar开发页面,单击页面右上角的日志配置,设置Spark日志的存储地址。
使用配置项
spark.app.log.rootPath
指定一个OSS路径来存储Spark作业的执行日志。
操作步骤
在pom.xml中配置Maven依赖。示例代码如下:
<dependencies> <dependency> <groupId>com.aliyun</groupId> <artifactId>adb20211201</artifactId> <version>1.0.16</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> </dependencies>
说明AnalyticDB for MySQL Java SDK版本号建议填写为1.0.16。
设置环境变量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。具体操作,请参见在Linux、macOS和Windows系统配置环境变量。以下为提交Spark作业、查询Spark作业的状态和日志信息、结束Spark作业以及查询Spark历史作业的完整示例代码。
import com.aliyun.adb20211201.Client; import com.aliyun.adb20211201.models.*; import com.aliyun.teaopenapi.models.Config; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import lombok.SneakyThrows; import java.util.List; public class SparkExample { private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); /** 提交Spark作业 @param client: 阿里云客户端 * @param clusterId: 集群ID * @param rgName: 资源组名称 * @param type: Spark作业类型,取值:BATCH或SQL * @param data: 选择Batch后,需输入描述Spark作业的JSON 选择SQL后,需输入SQL语句 * @return: Spark作业ID **/ @SneakyThrows public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) { SubmitSparkAppRequest request = new SubmitSparkAppRequest(); request.setDBClusterId(clusterId); request.setResourceGroupName(rgName); request.setData(data); request.setAppType(type); System.out.println("Start to build Submit request " + gson.toJson(request)); SubmitSparkAppResponse submitSparkAppResponse = client.submitSparkApp(request); System.out.println("Submit app response: " + gson.toJson(submitSparkAppResponse)); return submitSparkAppResponse.getBody().getData().getAppId(); } /** * 查询Spark作业的状态 * * @param appId: Spark作业ID * @param client: 阿里云客户端 * @return: Spark作业的状态 */ @SneakyThrows public static String getAppState(String appId, Client client) { GetSparkAppStateRequest request = new GetSparkAppStateRequest(); request.setAppId(appId); System.out.println("Start to get app state request " + gson.toJson(request)); GetSparkAppStateResponse sparkAppState = client.getSparkAppState(request); System.out.println("App state response: " + gson.toJson(sparkAppState)); return sparkAppState.getBody().getData().getState(); } /** * 查询Spark作业的详细信息 * * @param appId: Spark作业ID * @param client: 阿里云客户端 * @return: Spark作业的详细信息 */ @SneakyThrows public static SparkAppInfo getAppInfo(String appId, Client client) { GetSparkAppInfoRequest request = new GetSparkAppInfoRequest(); request.setAppId(appId); System.out.println("Start to get app info request " + gson.toJson(request)); GetSparkAppInfoResponse sparkAppInfo = client.getSparkAppInfo(request); System.out.println("App info response: " + gson.toJson(sparkAppInfo)); return sparkAppInfo.getBody().getData(); } /** * 查询Spark作业的日志信息 * * @param appId: Spark作业ID * @param client: 阿里云客户端 * @return: Spark作业的日志信息 */ @SneakyThrows public static String getAppDriverLog(String appId, Client client) { GetSparkAppLogRequest request = new GetSparkAppLogRequest(); request.setAppId(appId); System.out.println("Start to get app log request " + gson.toJson(request)); GetSparkAppLogResponse sparkAppLog = client.getSparkAppLog(request); System.out.println("App log response: " + gson.toJson(sparkAppLog)); return sparkAppLog.getBody().getData().getLogContent(); } /** * 查询Spark历史作业 * @param dbClusterId: 集群ID * @param pageNumber: 页码,取值为正整数,默认值为1 * @param pageSize: 每页记录数 * @param client: 阿里云客户端 * @return: Spark作业详细信息 */ @SneakyThrows public static List<SparkAppInfo> listSparkApps(String dbClusterId, long pageNumber, long pageSize, Client client) { ListSparkAppsRequest request = new ListSparkAppsRequest(); request.setDBClusterId(dbClusterId); request.setPageNumber(pageNumber); request.setPageSize(pageSize); System.out.println("Start to list spark apps request " + gson.toJson(request)); ListSparkAppsResponse listSparkAppsResponse = client.listSparkApps(request); System.out.println("List spark apps response: " + gson.toJson(listSparkAppsResponse)); return listSparkAppsResponse.getBody().getData().getAppInfoList(); } /** * Example for submit a spark application * * @param args Access Key ID, Access Key Secret, ADB Cluster ID, ADB Resource Group Name, Submit data, Submit type * @throws Exception */ public static void main(String[] args) throws Exception { // 阿里云客户端 Config config = new Config(); // 从环境变量ALIBABA_CLOUD_ACCESS_KEY_ID中获取AccessKey ID config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // 从环境变量ALIBABA_CLOUD_ACCESS_KEY_SECRET中获取AccessKey Secret config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // 集群所在的地域ID config.setRegionId("cn-hangzhou"); // 连接地址,cn-hangzhou为集群所在的地域ID config.setEndpoint("adb.cn-hangzhou.aliyuncs.com"); Client client = new Client(config); // 集群ID String dbClusterId = "amv-bp1mhnosdb38****"; // 资源组名称 String resourceGroupName = "test"; // Spark作业内容 String data = "{\n" + " \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" + " \"args\": [\"1000\"],\n" + " \"file\": \"local:///tmp/spark-examples.jar\",\n" + " \"name\": \"SparkPi\",\n" + " \"className\": \"org.apache.spark.examples.SparkPi\",\n" + " \"conf\": {\n" + " \"spark.driver.resourceSpec\": \"medium\",\n" + " \"spark.executor.instances\": 2,\n" + " \"spark.executor.resourceSpec\": \"medium\"}\n" + "}\n"; // Spark作业类型 String type = "Batch"; // Spark 作业最大执行时间 long sparkAppMaxRunningTimeInMilli = 60000; // 每一轮扫描时间间隔 long getAppStateinterval = 2000; // 提交Spark作业 String appId = submitSparkApp(dbClusterId, resourceGroupName, data, type, client); // 查询Spark作业的状态 String state; long startTimeInMillis = System.currentTimeMillis(); do { state = getAppState(appId, client); if (System.currentTimeMillis() - startTimeInMillis > sparkAppMaxRunningTimeInMilli) { System.out.println("Timeout"); break; } else { System.out.println("Current state: " + state); Thread.sleep(getAppStateinterval); } } while (!"COMPLETED".equalsIgnoreCase(state) && !"FATAL".equalsIgnoreCase(state) && !"FAILED".equalsIgnoreCase(state)); // 查询Spark作业的详细信息 SparkAppInfo appInfo = getAppInfo(appId, client); String x = String.format("State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n", state, appInfo.getDetail().webUiAddress, appInfo.getDetail().submittedTimeInMillis, appInfo.getDetail().terminatedTimeInMillis); System.out.println(x); // 查询Spark作业的日志信息 String log = getAppDriverLog(appId, client); System.out.println(log); // 查询Spark历史作业 List<SparkAppInfo> sparkAppInfos = listSparkApps(dbClusterId, 1, 50, client); sparkAppInfos.forEach(sparkAppInfo -> { String y = String.format("AppId: %s\n State: %s\n WebUI: %s\n submit time: %s\n, terminated time: %s\n", appInfo.getAppId(), appInfo.getState(), appInfo.getDetail().webUiAddress, appInfo.getDetail().submittedTimeInMillis, appInfo.getDetail().terminatedTimeInMillis); System.out.println(y); }); // 结束Spark作业 KillSparkAppRequest request = new KillSparkAppRequest(); request.setAppId(appId); client.killSparkApp(request); } }
文档内容是否对您有帮助?