自动驾驶场景设计ROS2 2025 ——滑动窗口
订阅小车摄像头信息对于图像进行滑动窗口等图像操作进行沿黄线行驶。
场景简介
订阅小车摄像头信息对于图像进行滑动窗口等图像操作进行沿黄线行驶。
背景知识
学生需要有一定的python基础来完成此实验。
实验室资源方式简介
进入实操前,请确保阿里云账号满足以下条件:
个人账号资源
使用您个人的云资源进行操作,资源归属于个人。
平台仅提供手册参考,不会对资源做任何操作。
确保已完成云工开物300元代金券领取。
已通过实名认证且账户余额≥100元。
本次实验将在您的账号下开通实操所需计算型实例规格族c7a,费用约为:3元/时(具体金额取决于实验完成的时间),需要您通过阿里云云工开物学生专属300元抵扣金兑换本次实操的云资源。
如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。
领取专属权益及创建实验资源
在开始实验之前,请先点击右侧屏幕的“进入实操”再进行后续操作
领取300元高校专属权益优惠券(若已领取请跳过)
实验产生的费用优先使用优惠券,优惠券使用完毕后需您自行承担。
实验步骤
1、服务部署
点击链接,进入部署页面
按下图设置【服务实例名称】、【地域】、【实例密码】
服务实例名称:test(可自定义命名)
地域:华东2(上海)
实例密码:Sjtu@520
说明输入实例密码时请注意大小写,请记住您设置的实例名称及对应密码,后续实验过程会用到。
完成填写后点击【下一步:确认订单】
核对实例信息及价格预览,无误请点击【立即创建】
重要领取300元优惠券后,资源应为0元/小时,且会提示【您当前账户的余额充足】!若提示余额不足等,请检查是否正确领取优惠券
创建成功,点击【去列表查看】
查看实例,并等待部署
点击左侧的图标展开目录,选择目录中的【云服务器ECS】
云服务器ECS—实例—远程连接
下拉展开更多登录方式,选择【通过VNC远程连接】
点击 ecs—user,并输入Sjtu@520(请输入您设置的密码)后回车
进入镜像后,点击左侧第3个图标,打开文件夹后点击car_ws文件夹
进入car_ws文件夹,在空白处点击鼠标右键,选择Open in Terminal
2、代码部分
在launch文件生成小车模型时,会同时调用servo_command.py文件。该文件中订阅其他文件发布的AckermannDriveStamped类型消息,并将其转化为Float64MultiArray类型消息,并发布给gazebo以控制场景中小车的行驶。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 阿克曼控制指令转换节点 功能:将上层控制指令转换为各独立执行器的控制命令 消息流: /cmd_vel (AckermannDriveStamped) → 转换处理 → 6个执行器控制器命令 """ # ROS2核心库 import rclpy from rclpy.node import Node # ROS消息类型 from std_msgs.msg import Float64MultiArray from ackermann_msgs.msg import AckermannDriveStamped class ServoCommand(Node): """执行器控制指令转换节点 主要功能: 1. 订阅阿克曼控制指令(速度+转向角) 2. 转换为6个独立执行器的控制命令: - 4个轮速控制器(前后左右车轮) - 2个转向铰链控制器(左右转向机构) """ def __init__(self, node_name): """节点初始化""" super().__init__(node_name) # -------------------------- # 创建消息订阅者(输入接口) # -------------------------- # 参数说明: # msg_type : AckermannDriveStamped 标准阿克曼控制消息 # topic_name : /cmd_vel 标准控制话题 # callback : self.set_throttle_steer 消息处理回调 # qos_profile : 10 队列深度(平衡实时性与可靠性) self.subscription = self.create_subscription( AckermannDriveStamped, '/cmd_vel', self.set_throttle_steer, 10) # -------------------------- # 创建命令发布者(输出接口) # -------------------------- # 控制器命名规范:<位置>_<类型>_controller/commands # 队列深度设为1保证最新指令优先 # 左后轮速度控制器 self.pub_vel_left_rear_wheel = self.create_publisher( Float64MultiArray, "/left_rear_wheel_velocity_controller/commands", 1) # 右后轮速度控制器 self.pub_vel_right_rear_wheel = self.create_publisher( Float64MultiArray, "/right_rear_wheel_velocity_controller/commands", 1) # 左前轮速度控制器 self.pub_vel_left_front_wheel = self.create_publisher( Float64MultiArray, "/left_front_wheel_velocity_controller/commands", 1) # 右前轮速度控制器 self.pub_vel_right_front_wheel = self.create_publisher( Float64MultiArray, "/right_front_wheel_velocity_controller/commands", 1) # 左转向铰链位置控制器 self.pub_pos_left_steering_hinge = self.create_publisher( Float64MultiArray, "/left_steering_hinge_position_controller/commands", 1) # 右转向铰链位置控制器 self.pub_pos_right_steering_hinge = self.create_publisher( Float64MultiArray, "/right_steering_hinge_position_controller/commands", 1) def set_throttle_steer(self, msg): """控制指令回调函数 参数: msg (AckermannDriveStamped): 输入控制指令,包含: - drive.speed: 纵向速度(m/s) - drive.steering_angle: 前轮转向角(rad) 处理逻辑: 1. 速度信号处理:放大28倍(根据电机特性调整) 2. 转向信号处理:直接传递角度值 3. 全轮驱动模式:四个车轮接收相同速度指令 4. 转向对称处理:左右转向机构接收相同角度 """ # 速度指令转换 throttle = Float64MultiArray() throttle.data = [msg.drive.speed * 28] # 转向指令转换 steer = Float64MultiArray() steer.data = [msg.drive.steering_angle] # -------------------------- # 发布到各执行器控制器 # -------------------------- # 速度指令发布(全轮相同速度) self.pub_vel_left_rear_wheel.publish(throttle) self.pub_vel_right_rear_wheel.publish(throttle) self.pub_vel_left_front_wheel.publish(throttle) self.pub_vel_right_front_wheel.publish(throttle) # 转向指令发布(对称转向) self.pub_pos_left_steering_hinge.publish(steer) self.pub_pos_right_steering_hinge.publish(steer) # 日志记录(INFO级别,带数据格式化) self.get_logger().info( f"指令已发布 | 速度: {throttle.data[0]:.1f} RPM | 转向角: {steer.data[0]:.2f} rad", throttle_once=True # 节流功能防止高频日志 ) def main(args=None): """ROS2节点主入口""" rclpy.init(args=args) # 节点初始化 servo_node = ServoCommand("servo_control_node") try: # 保持节点运行 rclpy.spin(servo_node) except KeyboardInterrupt: # 捕获Ctrl+C信号 servo_node.get_logger().info("节点被手动终止") finally: # 资源清理 servo_node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
识别路牌停车由lane.py实现。基于ROS2的视觉巡线系统通过摄像头实时采集图像,利用OpenCV进行颜色分割和特征提取,计算转向角控制车辆沿路径行驶,并绘制显示滑动窗口。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 基于ROS2的几何视觉巡线节点 功能:通过摄像头识别蓝色引导线,基于几何变换计算转向角度,实现精准路径跟踪 技术要点: 1. 透视变换消除图像畸变 2. 滑动窗口检测车道线 3. 二次曲线拟合路径 4. 阿克曼几何控制模型 """ # ROS2核心库 import rclpy from rclpy.node import Node # 图像处理库 import cv2 import cv_bridge import numpy as np import math # ROS消息类型 from sensor_msgs.msg import Image from ackermann_msgs.msg import AckermannDriveStamped class Lane(Node): """几何视觉巡线主节点类 功能架构: 1. 图像采集与预处理 2. 透视变换校正 3. 车道线特征提取 4. 几何控制量计算 5. 阿克曼指令发布 """ def __init__(self, name): super().__init__(name) # -------------------------- # 图像订阅者初始化 # -------------------------- # ZED左目相机话题(分辨率需与摄像头输出匹配) self.img_sub = self.create_subscription( Image, "/zed_camera_left_sensor/image_raw", self.camera_callback, 10) # -------------------------- # 控制指令发布者 # -------------------------- self.cmd_pub = self.create_publisher( AckermannDriveStamped, "/cmd_vel", 10) # -------------------------- # 视觉参数初始化 # -------------------------- self.color_dist = { 'blue': { 'Lower': np.array([30, 0, 0]), # HSV低阈值(H:30-45对应蓝色) 'Upper': np.array([45, 255, 255]) # HSV高阈值 } } def camera_callback(self, msg: Image): """图像处理主回调""" # ROS图像转OpenCV格式 bridge = cv_bridge.CvBridge() cv_img = bridge.imgmsg_to_cv2(msg, 'bgr8') # 执行巡线算法 self.lane(cv_img) # 保持图像显示(5ms刷新) cv2.waitKey(5) def lane(self, img: np.ndarray): """核心巡线算法 处理流程: 1. 透视变换校正图像畸变 2. 滑动窗口检测车道线 3. 二次曲线拟合路径 4. 计算目标点转向角 """ # ================= 图像预处理阶段 ================= x_cmPerPixel = 34.0 / 640.0 y_cmPerPixel = 37.0 / 480.0 roadWidth = 570 y_offset = 0.0 # cm # 轴间距 I = 20.0 # 摄像头坐标系与车中心间距 D = 28.0 # 计算cmdSteer的系数 k = -1.5 aP = [0.0, 0.0] lastP = [0.0, 0.0] Timer = 0 # 定义透视变换区域(梯形校正区域) src_points = np.array([ [240., 200.], # 左上 [1., 479.], # 左下 [400., 200.], # 右上 [639., 479.] # 右下 ], dtype="float32") # 目标变换区域(矩形输出) dst_points = np.array([ [1., 1.], # 左上 [1., 479.], # 左下 [639., 1.], # 右上 [639., 479.] # 右下 ], dtype="float32") # 生成透视变换矩阵 M = cv2.getPerspectiveTransform(src_points, dst_points) img = cv2.circle(img, (int(src_points[0][0]), int( src_points[0][1])), 3, (0, 0, 255), -1) img = cv2.circle(img, (int(src_points[1][0]), int( src_points[1][1])), 3, (0, 0, 255), -1) img = cv2.circle(img, (int(src_points[2][0]), int( src_points[2][1])), 3, (0, 255, 0), -1) img = cv2.circle(img, (int(src_points[3][0]), int( src_points[3][1])), 3, (0, 255, 0), -1) # 高斯滤波降噪 blur = cv2.GaussianBlur(img, (1, 1), 0) # HSV颜色空间转换 hsv_img = cv2.cvtColor(blur, cv2.COLOR_BGR2HSV) # 腐蚀操作 kernel = np.ones((3, 3), dtype=np.uint8) erode_hsv = cv2.erode(hsv_img, kernel, 1) # 二值化提取蓝色区域 inRange_hsv = cv2.inRange(erode_hsv, self.color_dist['blue']['Lower'], self.color_dist['blue']['Upper']) # 透视变换校正图像 gray_img = cv2.warpPerspective( inRange_hsv, M, (640, 480), cv2.INTER_LINEAR) # 二值化 ret, origin_thr = cv2.threshold(gray_img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) binary_warped = origin_thr histogram_x = np.sum(binary_warped[int(binary_warped.shape[0] / 2):, :], axis=0) lane_base = np.argmax(histogram_x) midpoint_x = int(histogram_x.shape[0] / 2) midpoint_x = 320 histogram_y = np.sum(binary_warped[0:binary_warped.shape[0], :], axis=1) midpoint_y = 240 # 对颜色进行处理 upper_half_histSum = np.sum(histogram_y[0:midpoint_y]) lower_half_histSum = np.sum(histogram_y[midpoint_y:]) try: hist_sum_y_ratio = (upper_half_histSum) / (lower_half_histSum) except: hist_sum_y_ratio = 1 # ================= 车道线检测阶段 ================= # 7个滑动窗口 nwindows = 7 # 设置每个滑动窗口的高度 window_height = int(binary_warped.shape[0] / nwindows) nonzero = binary_warped.nonzero() nonzeroy = np.array(nonzero[0]) nonzerox = np.array(nonzero[1]) lane_current = lane_base # 滑动窗口宽度的一半 margin = 100 minpix = 25 # print(np.mean(nonzerox) < binary_warped.shape[1] / 2) # 判断车道线在小车左边还是右边 isLeft = True if nonzerox.shape[0] != 0: isLeft = np.mean(nonzerox) < binary_warped.shape[1] / 2 # print(isLeft) lane_inds = [] # 将图像作灰度转化 img1 = cv2.cvtColor(binary_warped, cv2.COLOR_GRAY2BGR) # show_img('img1', img1) # 对监测点进行逐一分析 for window in range(nwindows): # 设置滑动窗口坐标点 win_y_low = binary_warped.shape[0] - (window + 1) * window_height # 设置坐标位置 win_y_high = binary_warped.shape[0] - window * window_height win_x_low = lane_current - margin win_x_high = lane_current + margin good_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & (nonzerox >= win_x_low) & (nonzerox < win_x_high)).nonzero()[0] lane_inds.append(good_inds) # 设置矩形显示 img1 = cv2.rectangle(img1, (win_x_low, win_y_low), (win_x_high, win_y_high), (0, 255, 0), 3) if len(good_inds) > minpix: lane_current = int(np.mean(nonzerox[good_inds])) elif window >= 3: break lane_inds = np.concatenate(lane_inds) # 矩阵进行连接并提取信息 pixelX = nonzerox[lane_inds] pixelY = nonzeroy[lane_inds] # modified part try: a2, a1, a0 = np.polyfit(pixelY, pixelX, 2) except: # print("no image! The lane is " + str(isLeft)) # self.adjustOrientation() return 0 aveX = np.average(pixelX) frontDistance = np.argsort(pixelY)[int(len(pixelY) / 8)] aimLaneP = [pixelX[frontDistance], pixelY[frontDistance]] # 构建函数进行信息统计 ploty = np.array(list(set(pixelY))) plotx = a2 * ploty ** 2 + a1 * ploty + a0 num = 0 for num in range(len(ploty) - 1): cv2.line(img1, (int(plotx[num]), int(ploty[num])), (int(plotx[num + 1]), int(ploty[num + 1])), (0, 0, 255), 8) # 计算aimLaneP处斜率,从而得到目标点的像素坐标 lanePk = 2 * a2 * aimLaneP[0] + a1 # 判断当前基线中点的相对位置 if abs(lanePk) < 0.1: if lane_base >= midpoint_x: LorR = -1 # print('right1') else: if hist_sum_y_ratio < 0.1: LorR = -1 # print('right2') else: LorR = 1 # print('left1') aP[0] = aimLaneP[0] + LorR * roadWidth / 2 aP[1] = aimLaneP[1] else: # 由截距判断当前状态以及更新 x_intertcept = a2 * 480.0 ** 2 + a1 * 480.0 + a0 if x_intertcept > 320: LorR = -1 # print("right3") else: LorR = 1 # LeftLane # print("left2") k_ver = -1 / lanePk theta = math.atan(k_ver) aP[0] = aimLaneP[0] - math.sin(theta) * (LorR) * roadWidth / 2 aP[1] = aimLaneP[1] - math.cos(theta) * (LorR) * roadWidth / 2 # 车道线上绘制蓝色圆 img1 = cv2.circle(img1, (int(aimLaneP[0]), int(aimLaneP[1])), 10, (255, 0, 0), -1) # 质心绘制绿色圆 img1 = cv2.circle(img1, (int(aP[0]), int(aP[1])), 10, (0, 255, 0), -1) show_img('img2', img1) aP[0] = (aP[0] - 320.0) * x_cmPerPixel aP[1] = (480.0 - aP[1]) * y_cmPerPixel + y_offset # 计算目标点的真实坐标 if lastP[0] > 0.001 and lastP[1] > 0.001: if (((aP[0] - lastP[0]) ** 2 + (aP[1] - lastP[1]) ** 2 > 2500) and Timer < 2): # To avoid the mislead by walkers aP = lastP[:] Timer += 1 else: Timer = 0 # 通过计算求解转向幅度 lastP = aP[:] steerAngle = k * \ math.atan(2 * I * aP[0] / (aP[0] * aP[0] + (aP[1] + D) * (aP[1] + D))) # 设置转向幅度 st = steerAngle * 4.0 / 3.1415 if st > 1: st = 1 if st < -1: st = -1 # 发布当前数据 msg = AckermannDriveStamped() msg.drive.speed = 0.3 msg.drive.steering_angle = steerAngle * 0.5 print("Speed:", msg.drive.speed) print("Steering_Angle:", msg.drive.steering_angle) self.cmd_pub.publish(msg) # 偏离太多,降速调整方向 def adjustOrientation(self): global isLeft # 偏离过程速度恒定 msg = AckermannDriveStamped() msg.drive.speed = 0.15 # 查看偏离方向 if isLeft: msg.drive.steering_angle = 0.1 * 3.14 else: msg.drive.steering_angle = -0.1 * 3.14 # 发布 self.cmd_pub.publish(msg) def show_img(title: str, img: np.ndarray): """实时可视化辅助函数 参数: title : 窗口标题 img : OpenCV图像矩阵 功能: 1. 创建可调整窗口 2. 50%缩放显示 """ cv2.namedWindow(title, cv2.WINDOW_NORMAL) cv2.setWindowProperty(title, cv2.WND_PROP_AUTOSIZE, cv2.WINDOW_AUTOSIZE) img = cv2.resize(img, (int(img.shape[1] / 2), int(img.shape[0] / 2))) cv2.imshow(title, img) def main(args=None): """ROS2节点主入口""" rclpy.init(args=args) node = Lane('get_camera') rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
3、实验操作
找到car_ws/src/car_bringup/launch/car_sim.launch.py文件,并注释其中的spawn_obstacle一行并保存文件。
在car_ws文件夹中新建终端,输入以下命令。
./lane.sh
当gazebo场景生成后,小车开始巡线行驶。
清理资源
计算巢—服务实例—复制服务实例ID,点击【删除】
在弹窗粘贴服务实例ID,并进行勾选,点击【确定删除】
完成安全验证后,即可成功释放实例。
回到云服务器ECS——实例,检查是否成功释放资源
关闭实验
在完成实验后,点击 结束实操
点击 取消 回到实验页面,点击 确定 跳转实验评分
请为本次实验评分,并给出您的建议,点击 确认,结束本次实验