自动驾驶场景设计ROS2 2025 ——激光雷达识别障碍
采用激光雷达进行障碍物识别并停车。
场景简介
采用激光雷达进行障碍物识别并停车。
背景知识
学生需要有一定的python基础来完成此实验。
实验室资源方式简介
进入实操前,请确保阿里云账号满足以下条件:
个人账号资源
使用您个人的云资源进行操作,资源归属于个人。
平台仅提供手册参考,不会对资源做任何操作。
确保已完成云工开物300元代金券领取。
已通过实名认证且账户余额≥100元。
本次实验将在您的账号下开通实操所需计算型实例规格族c7a,费用约为:3元/时(具体金额取决于实验完成的时间),需要您通过阿里云云工开物学生专属300元抵扣金兑换本次实操的云资源。
如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。
领取专属权益及创建实验资源
在开始实验之前,请先点击右侧屏幕的“进入实操”再进行后续操作
领取300元高校专属权益优惠券(若已领取请跳过)
实验产生的费用优先使用优惠券,优惠券使用完毕后需您自行承担。
实验步骤
1、服务部署
点击链接,进入部署页面
按下图设置【服务实例名称】、【地域】、【实例密码】
服务实例名称:test(可自定义命名)
地域:华东2(上海)
实例密码:Sjtu@520
说明输入实例密码时请注意大小写,请记住您设置的实例名称及对应密码,后续实验过程会用到。
完成填写后点击【下一步:确认订单】
核对实例信息及价格预览,无误请点击【立即创建】
重要领取300元优惠券后,资源应为0元/小时,且会提示【您当前账户的余额充足】!若提示余额不足等,请检查是否正确领取优惠券
创建成功,点击【去列表查看】
查看实例,并等待部署
点击左侧的图标展开目录,选择目录中的【云服务器ECS】
云服务器ECS—实例—远程连接
下拉展开更多登录方式,选择【通过VNC远程连接】
点击 ecs—user,并输入Sjtu@520(请输入您设置的密码)后回车
进入镜像后,点击左侧第3个图标,打开文件夹后点击car_ws文件夹
进入car_ws文件夹,在空白处点击鼠标右键,选择Open in Terminal
2、代码部分
在launch文件生成小车模型时,会同时调用servo_command.py文件。该文件中订阅其他文件发布的AckermannDriveStamped类型消息,并将其转化为Float64MultiArray类型消息,并发布给gazebo以控制场景中小车的行驶。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 阿克曼控制指令转换节点 功能:将上层控制指令转换为各独立执行器的控制命令 消息流: /cmd_vel (AckermannDriveStamped) → 转换处理 → 6个执行器控制器命令 """ # ROS2核心库 import rclpy from rclpy.node import Node # ROS消息类型 from std_msgs.msg import Float64MultiArray from ackermann_msgs.msg import AckermannDriveStamped class ServoCommand(Node): """执行器控制指令转换节点 主要功能: 1. 订阅阿克曼控制指令(速度+转向角) 2. 转换为6个独立执行器的控制命令: - 4个轮速控制器(前后左右车轮) - 2个转向铰链控制器(左右转向机构) """ def __init__(self, node_name): """节点初始化""" super().__init__(node_name) # -------------------------- # 创建消息订阅者(输入接口) # -------------------------- # 参数说明: # msg_type : AckermannDriveStamped 标准阿克曼控制消息 # topic_name : /cmd_vel 标准控制话题 # callback : self.set_throttle_steer 消息处理回调 # qos_profile : 10 队列深度(平衡实时性与可靠性) self.subscription = self.create_subscription( AckermannDriveStamped, '/cmd_vel', self.set_throttle_steer, 10) # -------------------------- # 创建命令发布者(输出接口) # -------------------------- # 控制器命名规范:<位置>_<类型>_controller/commands # 队列深度设为1保证最新指令优先 # 左后轮速度控制器 self.pub_vel_left_rear_wheel = self.create_publisher( Float64MultiArray, "/left_rear_wheel_velocity_controller/commands", 1) # 右后轮速度控制器 self.pub_vel_right_rear_wheel = self.create_publisher( Float64MultiArray, "/right_rear_wheel_velocity_controller/commands", 1) # 左前轮速度控制器 self.pub_vel_left_front_wheel = self.create_publisher( Float64MultiArray, "/left_front_wheel_velocity_controller/commands", 1) # 右前轮速度控制器 self.pub_vel_right_front_wheel = self.create_publisher( Float64MultiArray, "/right_front_wheel_velocity_controller/commands", 1) # 左转向铰链位置控制器 self.pub_pos_left_steering_hinge = self.create_publisher( Float64MultiArray, "/left_steering_hinge_position_controller/commands", 1) # 右转向铰链位置控制器 self.pub_pos_right_steering_hinge = self.create_publisher( Float64MultiArray, "/right_steering_hinge_position_controller/commands", 1) def set_throttle_steer(self, msg): """控制指令回调函数 参数: msg (AckermannDriveStamped): 输入控制指令,包含: - drive.speed: 纵向速度(m/s) - drive.steering_angle: 前轮转向角(rad) 处理逻辑: 1. 速度信号处理:放大28倍(根据电机特性调整) 2. 转向信号处理:直接传递角度值 3. 全轮驱动模式:四个车轮接收相同速度指令 4. 转向对称处理:左右转向机构接收相同角度 """ # 速度指令转换 throttle = Float64MultiArray() throttle.data = [msg.drive.speed * 28] # 转向指令转换 steer = Float64MultiArray() steer.data = [msg.drive.steering_angle] # -------------------------- # 发布到各执行器控制器 # -------------------------- # 速度指令发布(全轮相同速度) self.pub_vel_left_rear_wheel.publish(throttle) self.pub_vel_right_rear_wheel.publish(throttle) self.pub_vel_left_front_wheel.publish(throttle) self.pub_vel_right_front_wheel.publish(throttle) # 转向指令发布(对称转向) self.pub_pos_left_steering_hinge.publish(steer) self.pub_pos_right_steering_hinge.publish(steer) # 日志记录(INFO级别,带数据格式化) self.get_logger().info( f"指令已发布 | 速度: {throttle.data[0]:.1f} RPM | 转向角: {steer.data[0]:.2f} rad", throttle_once=True # 节流功能防止高频日志 ) def main(args=None): """ROS2节点主入口""" rclpy.init(args=args) # 节点初始化 servo_node = ServoCommand("servo_control_node") try: # 保持节点运行 rclpy.spin(servo_node) except KeyboardInterrupt: # 捕获Ctrl+C信号 servo_node.get_logger().info("节点被手动终止") finally: # 资源清理 servo_node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
激光雷达识别障碍由laser_scan.py实现。其中结合了视觉巡线和激光雷达避障功能。主要包含两个回调函数:一个是处理摄像头图像的camera_callback,用于巡线;另一个是处理激光雷达数据的laser_scan_callback,用于识别障碍物。此外,还有avoidance方法根据障碍物的位置调整运动策略,以及follow_line方法实现视觉巡线逻辑。
#!/usr/bin/env python3 """ 基于ROS2的多模态导航控制节点 功能:融合ZED摄像头视觉巡线与激光雷达动态避障 技术特性: 1. 视觉巡线采用HSV颜色空间阈值分割+ROI区域检测 2. 激光雷达采用动态窗口法进行障碍物检测 3. 双模式互斥切换机制保障行驶安全 """ import rclpy import cv2 import cv_bridge from rclpy.node import Node from sensor_msgs.msg import Image, LaserScan from geometry_msgs.msg import Twist from ackermann_msgs.msg import AckermannDriveStamped import numpy as np from itertools import * from operator import itemgetter import time import math def show_img(title: str, img: np.ndarray): """图像可视化辅助函数 参数: title : 窗口标题(支持中文) img : OpenCV图像矩阵(BGR格式) 功能: 1. 创建可调整窗口 2. 图像缩放50%显示 3. 保持5ms刷新频率 """ cv2.namedWindow(title, cv2.WINDOW_NORMAL) cv2.setWindowProperty(title, cv2.WND_PROP_AUTOSIZE, cv2.WINDOW_AUTOSIZE) img = cv2.resize(img, (int(img.shape[1] / 2), int(img.shape[0] / 2))) cv2.imshow(title, img) # 全局控制参数 turn_angle = 90 # 默认转向角度 THRESHOLD = 1.5 # 激光雷达安全距离阈值(米) PI = 3.14 # 圆周率近似值 Kp1 = 0.047 # 近距障碍比例系数 Kp2 = 0.0425 # 中距障碍比例系数 Kp3 = 2 # 远距障碍比例系数 class Follow(Node): """多模态导航主节点类 功能架构: 1. 视觉巡线模式:HSV颜色分割+质心跟踪 2. 激光避障模式:动态窗口障碍检测+角度控制 3. 安全互斥机制:双模式自动切换 """ def __init__(self, name): super().__init__(name) #-------------------------- # 传感器订阅者初始化 #-------------------------- # ZED左目相机话题(分辨率需匹配摄像头输出) self.img_sub = self.create_subscription( Image, "/zed_camera_left_sensor/image_raw", self.camera_callback, 10) # 激光雷达话题(需与雷达驱动配置一致) self.lidar_sub = self.create_subscription( LaserScan, "/scan", self.laser_scan_callback, 10) #-------------------------- # 控制指令发布者 #-------------------------- self.cmd_pub = self.create_publisher( AckermannDriveStamped, "/cmd_vel", 10) #-------------------------- # 控制参数初始化 #-------------------------- self.barrier_free = True # 安全状态标识(True:巡线模式 False:避障模式) self.akm = AckermannDriveStamped() # 阿克曼控制消息对象 self.x_direction_speed = 0.15 # 基准线速度(m/s) # 视觉巡线参数 self.lower_yellow = np.array([26, 43, 46]) # HSV低阈值(H:26-34对应黄色) self.upper_yellow = np.array([34, 255, 255]) self.refresh_rate = 5 # 图像刷新间隔(ms) # 扫描阈值 self.threshold = 1.5 def camera_callback(self, msg: Image): """视觉巡线主回调 处理流程: 1. ROS图像转OpenCV格式 2. 安全状态下执行巡线算法 """ bridge = cv_bridge.CvBridge() cv_img = bridge.imgmsg_to_cv2(msg, 'bgr8') if self.barrier_free: self.follow_line(cv_img) def laser_scan_callback(self, msg: LaserScan): """激光雷达避障回调 处理流程: 1. 检测超过安全阈值的扫描区域 2. 计算最大连续安全区域 3. 生成避障控制指令 """ # 获取超过安全距离的扫描点索引 # 检测扫描大于设置阈值的下标索引并返回list greater_thershold_scan_index = np.where( np.array(msg.ranges) > self.threshold)[0].tolist() # 定义列表储存数据 gap_list = list() # 对扫描大于阈值的结果做分类,分类依据为下表索引与顺序的关系 for _, g in groupby(enumerate(greater_thershold_scan_index), lambda temp: temp[0] - temp[1]):gap_list.append(list(map(itemgetter(1), g))) # 对列表进行排序并查询最大列 gap_list.sort(key=len) gap_max = gap_list[-1] # print(gap_max[0], gap_max[-1]) min_angle, max_angle = gap_max[0]*((msg.angle_increment)*180/math.pi), \ gap_max[-1]*((msg.angle_increment)*180/math.pi) # 计算平分角大小 gap_avg = (max_angle - min_angle) / 2 turn_angle = min_angle + gap_avg self.avoidance(gap_avg, turn_angle) def avoidance(self, gap_avg: float, turn_angle: float): if gap_avg < 40: self.barrier_free = False self.stop() elif turn_angle < 45 or turn_angle > 135: self.barrier_free = False LINX = 0.028 angz = Kp1 * (-1) * (90 - turn_angle) print("前方检测到障碍物") self.stop() elif turn_angle < 52 or turn_angle > 123: self.barrier_free = False LINX = 0.038 angz = Kp2 * (-1) * (90 - turn_angle) print("侧前方检测到障碍物") self.stop() elif turn_angle < 55 or turn_angle > 125: self.barrier_free = False self.barrier_free = False LINX = 0.087 angz = Kp3 / (90 - turn_angle) print("侧面检测到障碍物") self.stop() else: self.barrier_free = True def follow_line(self, image: np.ndarray): """视觉巡线核心算法 处理流程: 1. HSV颜色空间转换 2. 二值化阈值处理 3. ROI区域裁剪 4. 质心计算与控制量生成 """ # 图像预处理 hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) mask = cv2.inRange(hsv, self.lower_yellow, self.upper_yellow) # 设置ROI(聚焦下方1/4区域) mask = self.set_roi_forward(mask) show_img("region of interest", mask) # 计算图像矩获取质心 M = cv2.moments(mask) if M['m00'] > 0: # 有效检测区域 cx = int(M['m10']/M['m00']) - 235 # 摄像头安装位置补偿 cy = int(M['m01']/M['m00']) # 在质心画圆 cv2.circle(image, (cx, cy), radius=20, color=(0, 0, 255), thickness=-1) # PID控制 midpoint = mask.shape[1] / 2 err = cx - midpoint - 50 # 横向偏差计算 self.akm.drive.speed = self.x_direction_speed self.akm.drive.steering_angle = -float(err/2.0)/50 # 比例控制 self.cmd_pub.publish(self.akm) # 可视化输出 show_img("camera", image) cv2.waitKey(self.refresh_rate) def stop(self): """紧急停止协议 安全策略: 1. 立即发布零速指令 2. 锁定控制模式 3. 关闭节点 """ self.akm.drive.speed = 0.0 self.akm.drive.steering_angle = 0.0 self.cmd_pub.publish(self.akm) self.get_logger().info("stop now") self.barrier_free = False exit(0) def set_roi_forward(self, mask: np.ndarray) -> np.ndarray: """设置前方感兴趣区域(ROI) 参数: mask : 二值化图像矩阵 返回: 裁剪后的图像矩阵 设计要点: - 聚焦图像下方1/4区域(假设引导线位于视野前方) - 排除上部3/4区域干扰 """ h, w = mask.shape search_top = int(3 * h / 4) search_bot = search_top + 20 mask[:search_top, :] = 0 mask[search_bot:, :] = 0 return mask def main(args=None): """ROS2节点主入口""" rclpy.init(args=args) node = Follow("laser_scan") rclpy.spin(node) node.destroy_node() rclpy.shutdown()
3、实验操作
找到car_ws/src/car_bringup/launch/car_sim.launch.py文件,并取消其中spawn_obstacle的注释并保存文件。
在car_ws文件夹中新建终端,输入以下命令。
./laserscan.sh
当gazebo场景生成后,小车开始巡线行驶。
当检测到附近有障碍物时,小车停止行驶并退出程序。
清理资源
计算巢—服务实例—复制服务实例ID,点击【删除】
在弹窗粘贴服务实例ID,并进行勾选,点击【确定删除】
完成安全验证后,即可成功释放实例。
回到云服务器ECS——实例,检查是否成功释放资源
关闭实验
在完成实验后,点击 结束实操
点击 取消 回到实验页面,点击 确定 跳转实验评分
请为本次实验评分,并给出您的建议,点击 确认,结束本次实验