本文通过示例为您介绍如何在E-MapReduce on ACK的Flink集群中配置OSS来存储Flink作业的Checkpoint和Savepoint。
前提条件
已在E-MapReduce on ACK控制台创建Flink集群,详情请参见快速入门。
操作步骤
Flink on ACK使用的默认镜像已处理好读写OSS所需的依赖,您只需按文档配置好相应的参数即可。
- 通过kubectl连接Kubernetes集群,详情请参见通过kubectl工具连接集群。您也可以通过API等方式连接Kubernetes集群,详情请参见使用Kubernetes API。
- 新建basic-emr-oss-example.yaml文件,文件内容如下。
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-emr-oss-example spec: flinkVersion: v1_13 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.savepoints.dir: oss://xxxxx state.checkpoints.dir: oss://xxxxx fs.oss.endpoint: <endpoint, e.g. oss-cn-hangzhou-internal.aliyuncs.com> fs.oss.accessKeyId: <accessKeyId> fs.oss.accessKeySecret: <accessKeySecret> serviceAccount: flink podTemplate: spec: serviceAccount: flink containers: - name: flink-main-container volumeMounts: - mountPath: /flink-data name: flink-volume volumes: - name: flink-volume emptyDir: {} jobManager: replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: stateless
说明- 文件名您可以自定义,本文以basic-emr-oss-example.yaml为例介绍。
- 本文以Flink 1.13版本为例,如果您使用其他版本请修改flinkVersion的配置。
以下参数需要您手动替换。参数 描述 state.savepoints.dir Savepoint的保存目录。 state.checkpoints.dir Checkpoint的保存目录。 fs.oss.endpoint OSS的Endpoint。例如,oss-cn-***-internal.aliyuncs.com。 fs.oss.accessKeyId OSS的AccessKey ID。 fs.oss.accessKeySecret OSS的AccessKey Secret。 - 执行以下命令,提交作业。
kubectl apply -f basic-emr-oss-example.yaml
提交成功后,您可以通过OSS或者Flink Web UI查看Checkpoint的使用与更新。Flink Web UI的访问方式请参见访问Flink Web UI。