自动驾驶场景设计ROS2 2025 ——键盘控制
实现键盘控制gazebo中小车行驶。
场景简介
实现键盘控制gazebo中小车行驶。
背景知识
学生需要有一定的python基础来完成此实验。
实验室资源方式简介
进入实操前,请确保阿里云账号满足以下条件:
个人账号资源
使用您个人的云资源进行操作,资源归属于个人。
平台仅提供手册参考,不会对资源做任何操作。
确保已完成云工开物300元代金券领取。
已通过实名认证且账户余额≥100元。
本次实验将在您的账号下开通实操所需计算型实例规格族c7a,费用约为:3元/时(具体金额取决于实验完成的时间),需要您通过阿里云云工开物学生专属300元抵扣金兑换本次实操的云资源。
如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。
领取专属权益及创建实验资源
在开始实验之前,请先点击右侧屏幕的“进入实操”再进行后续操作
领取300元高校专属权益优惠券(若已领取请跳过)
实验产生的费用优先使用优惠券,优惠券使用完毕后需您自行承担。
实验步骤
1、服务部署
点击链接,进入部署页面
按下图设置【服务实例名称】、【地域】、【实例密码】
服务实例名称:test(可自定义命名)
地域:华东2(上海)
实例密码:Sjtu@520
说明输入实例密码时请注意大小写,请记住您设置的实例名称及对应密码,后续实验过程会用到。
完成填写后点击【下一步:确认订单】
核对实例信息及价格预览,无误请点击【立即创建】
重要领取300元优惠券后,资源应为0元/小时,且会提示【您当前账户的余额充足】!若提示余额不足等,请检查是否正确领取优惠券
创建成功,点击【去列表查看】
查看实例,并等待部署
点击左侧的图标展开目录,选择目录中的【云服务器ECS】
云服务器ECS—实例—远程连接
下拉展开更多登录方式,选择【通过VNC远程连接】
点击 ecs—user,并输入Sjtu@520(请输入您设置的密码)后回车
进入镜像后,点击左侧第3个图标,打开文件夹后点击car_ws文件夹
进入car_ws文件夹,在空白处点击鼠标右键,选择Open in Terminal
2、代码部分
在launch文件生成小车模型时,会同时调用servo_command.py文件。该文件中订阅其他文件发布的AckermannDriveStamped类型消息,并将其转化为Float64MultiArray类型消息,并发布给gazebo以控制场景中小车的行驶。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 阿克曼控制指令转换节点 功能:将上层控制指令转换为各独立执行器的控制命令 消息流: /cmd_vel (AckermannDriveStamped) → 转换处理 → 6个执行器控制器命令 """ # ROS2核心库 import rclpy from rclpy.node import Node # ROS消息类型 from std_msgs.msg import Float64MultiArray from ackermann_msgs.msg import AckermannDriveStamped class ServoCommand(Node): """执行器控制指令转换节点 主要功能: 1. 订阅阿克曼控制指令(速度+转向角) 2. 转换为6个独立执行器的控制命令: - 4个轮速控制器(前后左右车轮) - 2个转向铰链控制器(左右转向机构) """ def __init__(self, node_name): """节点初始化""" super().__init__(node_name) # -------------------------- # 创建消息订阅者(输入接口) # -------------------------- # 参数说明: # msg_type : AckermannDriveStamped 标准阿克曼控制消息 # topic_name : /cmd_vel 标准控制话题 # callback : self.set_throttle_steer 消息处理回调 # qos_profile : 10 队列深度(平衡实时性与可靠性) self.subscription = self.create_subscription( AckermannDriveStamped, '/cmd_vel', self.set_throttle_steer, 10) # -------------------------- # 创建命令发布者(输出接口) # -------------------------- # 控制器命名规范:<位置>_<类型>_controller/commands # 队列深度设为1保证最新指令优先 # 左后轮速度控制器 self.pub_vel_left_rear_wheel = self.create_publisher( Float64MultiArray, "/left_rear_wheel_velocity_controller/commands", 1) # 右后轮速度控制器 self.pub_vel_right_rear_wheel = self.create_publisher( Float64MultiArray, "/right_rear_wheel_velocity_controller/commands", 1) # 左前轮速度控制器 self.pub_vel_left_front_wheel = self.create_publisher( Float64MultiArray, "/left_front_wheel_velocity_controller/commands", 1) # 右前轮速度控制器 self.pub_vel_right_front_wheel = self.create_publisher( Float64MultiArray, "/right_front_wheel_velocity_controller/commands", 1) # 左转向铰链位置控制器 self.pub_pos_left_steering_hinge = self.create_publisher( Float64MultiArray, "/left_steering_hinge_position_controller/commands", 1) # 右转向铰链位置控制器 self.pub_pos_right_steering_hinge = self.create_publisher( Float64MultiArray, "/right_steering_hinge_position_controller/commands", 1) def set_throttle_steer(self, msg): """控制指令回调函数 参数: msg (AckermannDriveStamped): 输入控制指令,包含: - drive.speed: 纵向速度(m/s) - drive.steering_angle: 前轮转向角(rad) 处理逻辑: 1. 速度信号处理:放大28倍(根据电机特性调整) 2. 转向信号处理:直接传递角度值 3. 全轮驱动模式:四个车轮接收相同速度指令 4. 转向对称处理:左右转向机构接收相同角度 """ # 速度指令转换 throttle = Float64MultiArray() throttle.data = [msg.drive.speed * 28] # 转向指令转换 steer = Float64MultiArray() steer.data = [msg.drive.steering_angle] # -------------------------- # 发布到各执行器控制器 # -------------------------- # 速度指令发布(全轮相同速度) self.pub_vel_left_rear_wheel.publish(throttle) self.pub_vel_right_rear_wheel.publish(throttle) self.pub_vel_left_front_wheel.publish(throttle) self.pub_vel_right_front_wheel.publish(throttle) # 转向指令发布(对称转向) self.pub_pos_left_steering_hinge.publish(steer) self.pub_pos_right_steering_hinge.publish(steer) # 日志记录(INFO级别,带数据格式化) self.get_logger().info( f"指令已发布 | 速度: {throttle.data[0]:.1f} RPM | 转向角: {steer.data[0]:.2f} rad", throttle_once=True # 节流功能防止高频日志 ) def main(args=None): """ROS2节点主入口""" rclpy.init(args=args) # 节点初始化 servo_node = ServoCommand("servo_control_node") try: # 保持节点运行 rclpy.spin(servo_node) except KeyboardInterrupt: # 捕获Ctrl+C信号 servo_node.get_logger().info("节点被手动终止") finally: # 资源清理 servo_node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
键盘控制由key_operator.py实现,通过WASD键发布AckermannDriveStamped消息到/cmd_vel话题,控制车辆前进、后退、左转和右转,并支持紧急停止与退出功能。
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ ROS2键盘控制节点 功能:通过键盘输入控制车辆运动(阿克曼模型) 键位映射: w - 前进 s - 后退 a - 左转 d - 右转 x - 停止 o - 退出 CTRL+C - 强制终止 """ # ROS2核心库 import rclpy from rclpy.node import Node # 系统库(用于终端输入处理) import sys import select import termios # POSIX终端控制接口 import tty # 终端模式设置 # ROS消息类型 from geometry_msgs.msg import Twist from ackermann_msgs.msg import AckermannDriveStamped # 获取终端原始设置 settings = termios.tcgetattr(sys.stdin) def getKey(): """非阻塞式获取单个键盘输入 技术要点: 1. 设置终端为原始模式(不缓冲输入) 2. 使用select实现非阻塞读取 3. 恢复终端原始设置 """ tty.setraw(sys.stdin.fileno()) # 设置原始模式 select.select([sys.stdin], [], [], 0) key = sys.stdin.read(1) # 恢复终端设置(避免异常状态) termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings) return key class KeyOperator(Node): """键盘控制节点主类 功能架构: 1. 创建阿克曼指令发布者 2. 持续监听键盘输入 3. 转换键位为控制指令 4. 发布车辆控制指令 """ def __init__(self, name): super().__init__(name) # -------------------------- # 创建指令发布者(输出接口) # -------------------------- # 参数说明: # msg_type : AckermannDriveStamped 标准车辆控制消息 # topic_name : /cmd_vel 标准控制话题 # qos_profile : 10 队列深度 self.pub = self.create_publisher( AckermannDriveStamped, "/cmd_vel", 10) # 初始化控制指令对象 self.akm = AckermannDriveStamped() self.twist = Twist() # 备用Twist消息 # 启动键盘监听循环 self.loop() def loop(self): """主控制循环 处理流程: 1. 持续获取键盘输入 2. 根据键位设置速度/转向参数 3. 发布控制指令 4. 处理退出信号 """ while True: # 初始化控制参数 temp_x, temp_a = 0.0, 0.0 # 速度(m/s)和转向角(rad) # 获取键盘输入(非阻塞) key = getKey() # 记录输入日志(DEBUG级别) self.get_logger().debug("get Key : [{}]".format(key.upper())) # -------------------------- # 键位映射处理 # -------------------------- if key == "w": # 前进 self.get_logger().info("move => Front") temp_x = 0.3 # 0.3 m/s elif key == "s": # 后退 self.get_logger().info("move => Back") temp_x = -0.2 # -0.2 m/s elif key == "a": # 左转 self.get_logger().info("move => Front Left") temp_x = 0.2 # 0.2 m/s temp_a = 2.0 # 2 rad(原始值) elif key == "d": # 右转 self.get_logger().info("move => Front right") temp_x = 0.2 # 0.2 m/s temp_a = -2.0 # -2 rad(原始值) elif key == "x": # 停止 self.get_logger().info("move => Stop") temp_x, temp_a = 0.0, 0.0 elif key == "o": # 退出程序 self.get_logger().info("move => Exit") break elif key == '\x03': # CTRL+C break else: continue # 忽略无效输入 # -------------------------- # 指令转换与发布 # -------------------------- # 设置阿克曼指令参数 self.akm.drive.speed = temp_x # 纵向速度 self.akm.drive.steering_angle = temp_a / 5 # 归一化转向角 # 发布控制指令 self.pub.publish(self.akm) def main(args=None): """ROS2节点主入口""" rclpy.init(args=args) # 节点初始化 node = KeyOperator("pub_cmd") # 保持节点运行 rclpy.spin(node) node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
3、实验操作
找到car_ws/src/car_bringup/launch/car_sim.launch.py文件,并注释其中的spawn_obstacle一行并保存文件。
在car_ws文件夹中新建终端,输入以下命令。
./keyboard_control.sh
当gazebo场景生成后,在终端内输入特定按键即可控制小车行驶。
清理资源
计算巢—服务实例—复制服务实例ID,点击【删除】
在弹窗粘贴服务实例ID,并进行勾选,点击【确定删除】
完成安全验证后,即可成功释放实例。
回到云服务器ECS——实例,检查是否成功释放资源
关闭实验
在完成实验后,点击 结束实操
点击 取消 回到实验页面,点击 确定 跳转实验评分
请为本次实验评分,并给出您的建议,点击 确认,结束本次实验