建一个包含自定义算子的任务

本文为您介绍如何在自动驾驶数据管理中编写一个用户自定义算子构建一个简单的自动驾驶云上数据预处理流程。

当平台内置的算子无法解决您的数据处理需求时,您需要自己开发一个算子(如天气识别算子)并打包成像,上传到自定义算子管理模块,点击上线后,该算子将会在任务配置的算子列表中出现,即可参与任务的编排。大致可分为以下两个步骤:

  1. 上传自定义算子

  2. 创建一个包含自定义算子的任务

视频教程

以上视频介绍建一个包含自定义算子任务的流程。下面为实际操作步骤

Step 1 上传自定义算子

自定义算子的上传分为以下三个步骤:

  • 在您的算子代码中引入平台提供的开发框架

  • 将您的算子打包成镜像并上传到阿里云ACR代码仓库

  • 自定义算子管理模块上传该镜像

以下为您详细介绍这三个步骤。

Step 1.1 引入平台提供的开发框架

1) 安装SDK

  • 项目根目录创建SDK目录,下载SDK文件并放置在该目录下:ali_autodrive-0.0.1.tar.gz

  • 安装模块:pip install sdk/ali_autodrive-0.0.1.tar.gz

2) 实现抽象类

  • 项目根目录创建模块(例如example),在example模块中创建数据处理类(如DataProcessor)

  • DataProcessor实现抽象方法DataProcessTaskAbstract,重写拆分方法和处理方法。

import json
from abc import ABC
from PIL import Image as pilImage
from ali_autodrive.parallel_compute.executor_agent.LogUtil import *
from ali_autodrive.parallel_compute.model.FileContent import FileContent
from ali_autodrive.parallel_compute.utils.tree_util import *
from ali_autodrive.parallel_compute.DataProcessTaskAbstract import DataProcessTaskAbstract
from ali_autodrive.parallel_compute.biz_enum.DataTypeEnum import DataTypeEnum

# 节点参数
NODE_DATA_TRANSFORM_PARAMS = "transformParams"
# 车辆ID
VEHICLE_ID = "vehicleId"


class DataProcessor(DataProcessTaskAbstract, ABC):
    def __init__(self):
        super(DataProcessor, self).__init__()

    # 拆分方法
    def data_partition(self, context):
        self.get_logger().info("Data partition start.")

        # 数据分片,每个文件拆分成一个分片的场景
        file_list = get_sub_node_file_list(self.get_file_tree())
        for file in file_list:
            content = FileContent()
            content.file_path = file

            # 每次保存生成一个分片
            self.save_data_partition([json.dumps(content.__dict__)])

        self.get_logger().info("Data partition end.")

    # 处理方法,图片格式转换为JPEG
    def data_process(self, context):
        self.get_logger().info("Data process start.")

        # 获取工作空间
        workspace = self.get_user_workspace()
        self.get_logger().info("Workspace is " + workspace)

        # 获取节点配置参数
        params = json.loads(self.get_parameters()[NODE_DATA_TRANSFORM_PARAMS])
        vehicle_id = params[VEHICLE_ID]
        self.get_logger().info("VehicleId is " + vehicle_id)

        # 获取当前分片文件本地路径列表
        local_file_list = get_sub_node_local_file_list(self.get_file_tree())
        # 转换图片格式,获取新文件列表
        file_contents = self.__convert_image_format(workspace, local_file_list)
        # 存储新文件
        self.save_data_partition(file_contents)

        self.get_logger().info("Data process end.")

    def __convert_image_format(self, workspace, local_file_list):
        # 工作空间创建临时图片目录
        image_path = os.path.join(workspace, "image")
        if not os.path.exists(image_path):
            os.makedirs(image_path)

        # 逐个转换图片格式,临时目录创建新文件保存
        image_file_list = []
        for file in local_file_list:
            new_file_name = os.path.basename(file).split(".")[-2] + ".JPEG"
            new_file_path = os.path.join(image_path, new_file_name)
            im = pilImage.open(file)
            im.save(new_file_path, format='JPEG')
            im.close()
            image_file_list.append(self.__get_file_content(new_file_path, im.height, im.width))

        return image_file_list

    # 获取文件描述信息,json结构, 会自动创建数据集,可以在自动驾驶平台数据检索和回放
    @staticmethod
    def __get_file_content(file_path, height, width):
        # 文件打标
        file_tag = {"header": {}}
        file_tag["header"]["data_type"] = DataTypeEnum.camera_img.name
        file_tag["height"] = height
        file_tag["width"] = width
        file_tag["format"] = 'JPEG'

        # 待保存文件描述信息
        content = FileContent()
        content.file_tag = file_tag
        content.file_path = file_path
        return json.dumps(content.__dict__)

Step 1.2 本地测试

1)创建配置文件

项目根目录创建配置文件(如config.ini)

from ali_autodrive.parallel_compute.service_startup import *

# ./config.ini   配置文件路径,可以是相对当前目录路径,也可以是绝对路径
# example.DataProcessor  算子所在模块
# DataProcessor  算子实现类
test("./config.ini", "example.DataProcessor", "DataProcessor")

2)创建启动脚本

项目根目录创建启动脚本(如test_start.py)

from ali_autodrive.parallel_compute.service_startup import *

# ./config.ini   配置文件路径,可以是相对当前目录路径,也可以是绝对路径
# example.DataProcessor  算子所在模块
# DataProcessor  算子实现类
test("./config.ini", "example.DataProcessor", "DataProcessor")

3)运行启动脚本

运行启动脚本

python test_start.py

Step 1.3创建镜像

1)打包模块

项目根目录创建setup.py文件,执行打包命令:python setup.py sdist

注意:打包生成的文件存储在dist目录下

# -*- coding:utf-8 -*-
from setuptools import (setup, find_packages)

setup(
    # 包名
    name="example",
    # 版本
    version="0.0.1",
    # 需要包含的子包列表
    packages=find_packages(),
    # 添加依赖
    install_requires=[
        #'python-lzf==0.2.4',
    ]
)

2)创建Dockerfile

项目根目录创建Dockerfile文件

FROM python:3.8
COPY . /app
WORKDIR /app

RUN pip install sdk/ali_autodrive-0.0.1.tar.gz
ADD   sdk/ali_autodrive-0.0.1.tar.gz  ali_autodrive
RUN pip install dist/example-0.0.1.tar.gz
WORKDIR /app/ali_autodrive/ali_autodrive-0.0.1/ali_autodrive/parallel_compute

EXPOSE 5000
#example.DataProcessor  算子所在模块 
#DataProcessor  算子实现类
CMD ["python","service_startup.py" ,"example.DataProcessor","DataProcessor"]
说明

为了提升镜像打包速度,可以将上一个版本当作基础镜像,基于基础镜像打包,节省模块安装耗时。

3)上传镜像

制作镜像并上传到ACR

docker login --username=jieran.gjj@city-brain-pro auto-driver-registry.cn-hangzhou.cr.aliyuncs.com
docker build -t parallel-compute-example:0.0.1 .
docker tag parallel-compute-example:0.0.1 auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1
docker push auto-driver-registry.cn-hangzhou.cr.aliyuncs.com/partition_compute/parallel-compute-example:0.0.1

其中auto-driver-registry.cn-hangzhou.cr.aliyuncs.com为ACR仓库地址,parallel_compute为ACR命名空间,需要替换为自己的地址,并使用自己的账号进行登录。

其中parallel-compute-example:0.0.1为镜像名:版本号,可以自定义名称和版本号。

Step 1.4 上传自定义算子

  • 打开产品左侧目录,在数据定义模块下面,找到自定义算子管理

  • 点击右上角的添加算子,选择对应类别,按照提示上传算子

  • 在算子列表中找到您刚刚上传的算子,点击上线

Step 2 创建包含自定义算子的任务

以上步骤完成后,您既可在任务配置页面的系统节点列表中,找到您刚刚上传的算子。此时,该算子和系统内置算子一样使用,可以单独使用,也可以和内置算子一起混合编排。

具体步骤,可参考 使用内置算子建一个简单的数据处理任务 .

阿里云首页 自动驾驶云开发平台 相关技术圈