自动驾驶场景设计ROS2 2025 ——激光雷达识别障碍

更新时间:

采用激光雷达进行障碍物识别并停车。

场景简介

  • 采用激光雷达进行障碍物识别并停车。

背景知识

  • 学生需要有一定的python基础来完成此实验。

实验室资源方式简介

进入实操前,请确保阿里云账号满足以下条件:

  • 个人账号资源

    • 使用您个人的云资源进行操作,资源归属于个人。

    • 平台仅提供手册参考,不会对资源做任何操作。

  • 确保已完成云工开物300元代金券领取。

  • 已通过实名认证且账户余额≥100元。

本次实验将在您的账号下开通实操所需计算型实例规格族c7a,费用约为:3元/时(具体金额取决于实验完成的时间),需要您通过阿里云云工开物学生专属300元抵扣金兑换本次实操的云资源。

如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。

领取专属权益及创建实验资源

在开始实验之前,请先点击右侧屏幕的“进入实操”再进行后续操作

image

领取300元高校专属权益优惠券(若已领取请跳过)

image

重要

实验产生的费用优先使用优惠券,优惠券使用完毕后需您自行承担。

学生认证

实验步骤

  • 1、服务部署

    • 点击链接,进入部署页面

    • 按下图设置【服务实例名称】、【地域】、【实例密码】

      • 服务实例名称:test(可自定义命名)

      • 地域华东2(上海)

      • 实例密码:Sjtu@520

        说明

        输入实例密码时请注意大小写,请记住您设置的实例名称及对应密码,后续实验过程会用到。

        image

    • 完成填写后点击【下一步:确认订单】

      image

    • 核对实例信息及价格预览,无误请点击【立即创建】

      image

      重要

      领取300元优惠券后,资源应为0元/小时,且会提示【您当前账户的余额充足】!若提示余额不足等,请检查是否正确领取优惠券

    • 创建成功,点击【去列表查看】

      image

    • 查看实例,并等待部署

      image

      点击左侧的图标展开目录,选择目录中的【云服务器ECS】

      image

    • 云服务器ECS—实例—远程连接

      image

    • 下拉展开更多登录方式,选择【通过VNC远程连接】

      image

    • 点击 ecs—user,并输入Sjtu@520(请输入您设置的密码)后回车

      image

      image

    • 进入镜像后,点击左侧第3图标,打开文件夹后点击car_ws文件夹

      image

    • 进入car_ws文件夹,在空白处点击鼠标右键,选择Open in Terminal

      image

      image

  • 2、代码部分

    launch文件生成小车模型时,会同时调用servo_command.py文件。该文件中订阅其他文件发布的AckermannDriveStamped类型消息,并将其转化为Float64MultiArray类型消息,并发布给gazebo以控制场景中小车的行驶。

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    
    """
    阿克曼控制指令转换节点
    功能:将上层控制指令转换为各独立执行器的控制命令
    消息流:
    /cmd_vel (AckermannDriveStamped) → 转换处理 → 6个执行器控制器命令
    """
    
    # ROS2核心库
    import rclpy
    from rclpy.node import Node
    
    # ROS消息类型
    from std_msgs.msg import Float64MultiArray
    from ackermann_msgs.msg import AckermannDriveStamped
    
    class ServoCommand(Node):
        """执行器控制指令转换节点
        主要功能:
        1. 订阅阿克曼控制指令(速度+转向角)
        2. 转换为6个独立执行器的控制命令:
           - 4个轮速控制器(前后左右车轮)
           - 2个转向铰链控制器(左右转向机构)
        """
        
        def __init__(self, node_name):
            """节点初始化"""
            super().__init__(node_name)
            
            # --------------------------
            # 创建消息订阅者(输入接口)
            # --------------------------
            # 参数说明:
            # msg_type : AckermannDriveStamped 标准阿克曼控制消息
            # topic_name : /cmd_vel 标准控制话题
            # callback : self.set_throttle_steer 消息处理回调
            # qos_profile : 10 队列深度(平衡实时性与可靠性)
            self.subscription = self.create_subscription(
                AckermannDriveStamped,
                '/cmd_vel',
                self.set_throttle_steer,
                10)
            
            # --------------------------
            # 创建命令发布者(输出接口)
            # --------------------------
            # 控制器命名规范:<位置>_<类型>_controller/commands
            # 队列深度设为1保证最新指令优先
            
            # 左后轮速度控制器
            self.pub_vel_left_rear_wheel = self.create_publisher(
                Float64MultiArray, 
                "/left_rear_wheel_velocity_controller/commands", 
                1)
            
            # 右后轮速度控制器
            self.pub_vel_right_rear_wheel = self.create_publisher(
                Float64MultiArray,
                "/right_rear_wheel_velocity_controller/commands",
                1)
            
            # 左前轮速度控制器 
            self.pub_vel_left_front_wheel = self.create_publisher(
                Float64MultiArray,
                "/left_front_wheel_velocity_controller/commands",
                1)
            
            # 右前轮速度控制器
            self.pub_vel_right_front_wheel = self.create_publisher(
                Float64MultiArray,
                "/right_front_wheel_velocity_controller/commands",
                1)
            
            # 左转向铰链位置控制器
            self.pub_pos_left_steering_hinge = self.create_publisher(
                Float64MultiArray,
                "/left_steering_hinge_position_controller/commands",
                1)
            
            # 右转向铰链位置控制器
            self.pub_pos_right_steering_hinge = self.create_publisher(
                Float64MultiArray,
                "/right_steering_hinge_position_controller/commands",
                1)
    
        def set_throttle_steer(self, msg):
            """控制指令回调函数
            参数:
                msg (AckermannDriveStamped): 输入控制指令,包含:
                    - drive.speed: 纵向速度(m/s)
                    - drive.steering_angle: 前轮转向角(rad)
            处理逻辑:
                1. 速度信号处理:放大28倍(根据电机特性调整)
                2. 转向信号处理:直接传递角度值
                3. 全轮驱动模式:四个车轮接收相同速度指令
                4. 转向对称处理:左右转向机构接收相同角度
            """
            # 速度指令转换
            throttle = Float64MultiArray()
            throttle.data = [msg.drive.speed * 28]
            
            # 转向指令转换
            steer = Float64MultiArray() 
            steer.data = [msg.drive.steering_angle]
            
            # --------------------------
            # 发布到各执行器控制器
            # --------------------------
            # 速度指令发布(全轮相同速度)
            self.pub_vel_left_rear_wheel.publish(throttle)
            self.pub_vel_right_rear_wheel.publish(throttle)
            self.pub_vel_left_front_wheel.publish(throttle)
            self.pub_vel_right_front_wheel.publish(throttle)
            
            # 转向指令发布(对称转向)
            self.pub_pos_left_steering_hinge.publish(steer)
            self.pub_pos_right_steering_hinge.publish(steer)
            
            # 日志记录(INFO级别,带数据格式化)
            self.get_logger().info(
                f"指令已发布 | 速度: {throttle.data[0]:.1f} RPM | 转向角: {steer.data[0]:.2f} rad",
                throttle_once=True  # 节流功能防止高频日志
            )
    
    def main(args=None):
        """ROS2节点主入口"""
        rclpy.init(args=args)
        
        # 节点初始化
        servo_node = ServoCommand("servo_control_node")
        
        try:
            # 保持节点运行
            rclpy.spin(servo_node)
        except KeyboardInterrupt:
            # 捕获Ctrl+C信号
            servo_node.get_logger().info("节点被手动终止")
        finally:
            # 资源清理
            servo_node.destroy_node()
            rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
    

    激光雷达识别障碍由laser_scan.py实现。其中结合了视觉巡线和激光雷达避障功能。主要包含两个回调函数:一个是处理摄像头图像的camera_callback,用于巡线;另一个是处理激光雷达数据的laser_scan_callback,用于识别障碍物。此外,还有avoidance方法根据障碍物的位置调整运动策略,以及follow_line方法实现视觉巡线逻辑。

    #!/usr/bin/env python3
    """
    基于ROS2的多模态导航控制节点
    功能:融合ZED摄像头视觉巡线与激光雷达动态避障
    技术特性:
    1. 视觉巡线采用HSV颜色空间阈值分割+ROI区域检测
    2. 激光雷达采用动态窗口法进行障碍物检测
    3. 双模式互斥切换机制保障行驶安全
    """
    
    import rclpy
    import cv2
    import cv_bridge
    from rclpy.node import Node
    from sensor_msgs.msg import Image, LaserScan
    from geometry_msgs.msg import Twist
    from ackermann_msgs.msg import AckermannDriveStamped
    import numpy as np
    from itertools import *
    from operator import itemgetter
    import time
    import math
    
    def show_img(title: str, img: np.ndarray):
        """图像可视化辅助函数
        参数:
            title : 窗口标题(支持中文)
            img   : OpenCV图像矩阵(BGR格式)
        功能:
            1. 创建可调整窗口
            2. 图像缩放50%显示
            3. 保持5ms刷新频率
        """
        cv2.namedWindow(title, cv2.WINDOW_NORMAL)
        cv2.setWindowProperty(title, cv2.WND_PROP_AUTOSIZE, cv2.WINDOW_AUTOSIZE)
        img = cv2.resize(img, (int(img.shape[1] / 2), int(img.shape[0] / 2)))
        cv2.imshow(title, img)
    
    # 全局控制参数
    turn_angle = 90       # 默认转向角度
    THRESHOLD = 1.5       # 激光雷达安全距离阈值(米)
    PI = 3.14             # 圆周率近似值
    Kp1 = 0.047           # 近距障碍比例系数
    Kp2 = 0.0425          # 中距障碍比例系数 
    Kp3 = 2               # 远距障碍比例系数
    
    class Follow(Node):
        """多模态导航主节点类
        功能架构:
        1. 视觉巡线模式:HSV颜色分割+质心跟踪
        2. 激光避障模式:动态窗口障碍检测+角度控制
        3. 安全互斥机制:双模式自动切换
        """
        def __init__(self, name):
            super().__init__(name)
            
            #--------------------------
            # 传感器订阅者初始化
            #--------------------------
            # ZED左目相机话题(分辨率需匹配摄像头输出)
            self.img_sub = self.create_subscription(
                Image,
                "/zed_camera_left_sensor/image_raw",
                self.camera_callback,
                10)
            
            # 激光雷达话题(需与雷达驱动配置一致)
            self.lidar_sub = self.create_subscription(
                LaserScan,
                "/scan",
                self.laser_scan_callback,
                10)
    
            #--------------------------
            # 控制指令发布者
            #--------------------------
            self.cmd_pub = self.create_publisher(
                AckermannDriveStamped,
                "/cmd_vel",
                10)
    
            #--------------------------
            # 控制参数初始化
            #--------------------------
            self.barrier_free = True  # 安全状态标识(True:巡线模式 False:避障模式)
            self.akm = AckermannDriveStamped()  # 阿克曼控制消息对象
            self.x_direction_speed = 0.15       # 基准线速度(m/s)
            
            # 视觉巡线参数
            self.lower_yellow = np.array([26, 43, 46])  # HSV低阈值(H:26-34对应黄色)
            self.upper_yellow = np.array([34, 255, 255])
            self.refresh_rate = 5              # 图像刷新间隔(ms)
            # 扫描阈值
            self.threshold = 1.5
    
        def camera_callback(self, msg: Image):
            """视觉巡线主回调
            处理流程:
            1. ROS图像转OpenCV格式
            2. 安全状态下执行巡线算法
            """
            bridge = cv_bridge.CvBridge()
            cv_img = bridge.imgmsg_to_cv2(msg, 'bgr8')
            if self.barrier_free:
                self.follow_line(cv_img)
    
        def laser_scan_callback(self, msg: LaserScan):
            """激光雷达避障回调
            处理流程:
            1. 检测超过安全阈值的扫描区域
            2. 计算最大连续安全区域
            3. 生成避障控制指令
            """
            # 获取超过安全距离的扫描点索引
            # 检测扫描大于设置阈值的下标索引并返回list
            greater_thershold_scan_index = np.where(
                np.array(msg.ranges) > self.threshold)[0].tolist()
            # 定义列表储存数据
            gap_list = list()
            # 对扫描大于阈值的结果做分类,分类依据为下表索引与顺序的关系
            for _, g in groupby(enumerate(greater_thershold_scan_index), lambda temp: temp[0] - temp[1]):gap_list.append(list(map(itemgetter(1), g)))
            # 对列表进行排序并查询最大列
            gap_list.sort(key=len)
            gap_max = gap_list[-1]
            # print(gap_max[0], gap_max[-1])
            min_angle, max_angle = gap_max[0]*((msg.angle_increment)*180/math.pi), \
                gap_max[-1]*((msg.angle_increment)*180/math.pi)
            # 计算平分角大小
            gap_avg = (max_angle - min_angle) / 2
            turn_angle = min_angle + gap_avg
            self.avoidance(gap_avg, turn_angle)
    
    
        def avoidance(self, gap_avg: float, turn_angle: float):
            if gap_avg < 40:
                self.barrier_free = False
                self.stop()
            elif turn_angle < 45 or turn_angle > 135:
                self.barrier_free = False
                LINX = 0.028
                angz = Kp1 * (-1) * (90 - turn_angle)
                print("前方检测到障碍物")
                self.stop()
            elif turn_angle < 52 or turn_angle > 123:
                self.barrier_free = False
                LINX = 0.038
                angz = Kp2 * (-1) * (90 - turn_angle)
                print("侧前方检测到障碍物")
                self.stop()
            elif turn_angle < 55 or turn_angle > 125:
                self.barrier_free = False
                self.barrier_free = False
                LINX = 0.087
                angz = Kp3 / (90 - turn_angle)
                print("侧面检测到障碍物")
                self.stop()
            else:
                self.barrier_free = True
    
    
        def follow_line(self, image: np.ndarray):
            """视觉巡线核心算法
            处理流程:
            1. HSV颜色空间转换
            2. 二值化阈值处理
            3. ROI区域裁剪
            4. 质心计算与控制量生成
            """
            # 图像预处理
            hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
            mask = cv2.inRange(hsv, self.lower_yellow, self.upper_yellow)
            
            # 设置ROI(聚焦下方1/4区域)
            mask = self.set_roi_forward(mask)
            show_img("region of interest", mask)
    
            # 计算图像矩获取质心
            M = cv2.moments(mask)
            if M['m00'] > 0:  # 有效检测区域
                cx = int(M['m10']/M['m00']) - 235  # 摄像头安装位置补偿
                cy = int(M['m01']/M['m00'])
                
                # 在质心画圆
                cv2.circle(image, (cx, cy), radius=20, color=(0, 0, 255), thickness=-1)
                
                # PID控制
                midpoint = mask.shape[1] / 2
                err = cx - midpoint - 50  # 横向偏差计算
                self.akm.drive.speed = self.x_direction_speed
                self.akm.drive.steering_angle = -float(err/2.0)/50  # 比例控制
                
                self.cmd_pub.publish(self.akm)
    
            # 可视化输出
            show_img("camera", image)
            cv2.waitKey(self.refresh_rate)
    
        def stop(self):
            """紧急停止协议
            安全策略:
                1. 立即发布零速指令
                2. 锁定控制模式
                3. 关闭节点
            """
            self.akm.drive.speed = 0.0
            self.akm.drive.steering_angle = 0.0
            self.cmd_pub.publish(self.akm)
            self.get_logger().info("stop now")
            self.barrier_free = False
            exit(0)
    
        def set_roi_forward(self, mask: np.ndarray) -> np.ndarray:
            """设置前方感兴趣区域(ROI)
            参数:
                mask : 二值化图像矩阵
            返回:
                裁剪后的图像矩阵
            设计要点:
                - 聚焦图像下方1/4区域(假设引导线位于视野前方)
                - 排除上部3/4区域干扰
            """
            h, w = mask.shape
            search_top = int(3 * h / 4)
            search_bot = search_top + 20
            mask[:search_top, :] = 0
            mask[search_bot:, :] = 0
            return mask
    
    def main(args=None):
        """ROS2节点主入口"""
        rclpy.init(args=args)
        node = Follow("laser_scan")
        rclpy.spin(node)
        node.destroy_node()
        rclpy.shutdown()
    
  • 3、实验操作

    找到car_ws/src/car_bringup/launch/car_sim.launch.py文件,并取消其中spawn_obstacle的注释并保存文件。

    image

    car_ws文件夹中新建终端,输入以下命令。

    ./laserscan.sh

    image

    gazebo场景生成后,小车开始巡线行驶。

    image

    当检测到附近有障碍物时,小车停止行驶并退出程序。

    image

清理资源

  • 计算巢—服务实例—复制服务实例ID,点击【删除】

    image

  • 在弹窗粘贴服务实例ID,并进行勾选,点击【确定删除】

    image

  • 完成安全验证后,即可成功释放实例。

    image

  • 回到云服务器ECS——实例,检查是否成功释放资源

    image

关闭实验

  • 在完成实验后,点击 结束实操

    image

  • 点击 取消 回到实验页面,点击 确定 跳转实验评分

    image

  • 请为本次实验评分,并给出您的建议,点击 确认,结束本次实验

    image