自动驾驶场景设计ROS2 2025 ——键盘控制

更新时间:

实现键盘控制gazebo中小车行驶。

场景简介

  • 实现键盘控制gazebo中小车行驶。

背景知识

  • 学生需要有一定的python基础来完成此实验。

实验室资源方式简介

进入实操前,请确保阿里云账号满足以下条件:

  • 个人账号资源

    • 使用您个人的云资源进行操作,资源归属于个人。

    • 平台仅提供手册参考,不会对资源做任何操作。

  • 确保已完成云工开物300元代金券领取。

  • 已通过实名认证且账户余额≥100元。

本次实验将在您的账号下开通实操所需计算型实例规格族c7a,费用约为:3元/时(具体金额取决于实验完成的时间),需要您通过阿里云云工开物学生专属300元抵扣金兑换本次实操的云资源。

如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。

领取专属权益及创建实验资源

在开始实验之前,请先点击右侧屏幕的“进入实操”再进行后续操作

image

领取300元高校专属权益优惠券(若已领取请跳过)

image

重要

实验产生的费用优先使用优惠券,优惠券使用完毕后需您自行承担。

学生认证

实验步骤

  • 1、服务部署

    • 点击链接,进入部署页面

    • 按下图设置【服务实例名称】、【地域】、【实例密码】

      • 服务实例名称:test(可自定义命名)

      • 地域华东2(上海)

      • 实例密码:Sjtu@520

        说明

        输入实例密码时请注意大小写,请记住您设置的实例名称及对应密码,后续实验过程会用到。

        image

    • 完成填写后点击【下一步:确认订单】

      image

    • 核对实例信息及价格预览,无误请点击【立即创建】

      image

      重要

      领取300元优惠券后,资源应为0元/小时,且会提示【您当前账户的余额充足】!若提示余额不足等,请检查是否正确领取优惠券

    • 创建成功,点击【去列表查看】

      image

    • 查看实例,并等待部署

      image

      点击左侧的图标展开目录,选择目录中的【云服务器ECS】

      image

    • 云服务器ECS—实例—远程连接

      image

    • 下拉展开更多登录方式,选择【通过VNC远程连接】

      image

    • 点击 ecs—user,并输入Sjtu@520(请输入您设置的密码)后回车

      image

      image

    • 进入镜像后,点击左侧第3图标,打开文件夹后点击car_ws文件夹

      image

    • 进入car_ws文件夹,在空白处点击鼠标右键,选择Open in Terminal

      image

      image

  • 2、代码部分

    launch文件生成小车模型时,会同时调用servo_command.py文件。该文件中订阅其他文件发布的AckermannDriveStamped类型消息,并将其转化为Float64MultiArray类型消息,并发布给gazebo以控制场景中小车的行驶。

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    
    """
    阿克曼控制指令转换节点
    功能:将上层控制指令转换为各独立执行器的控制命令
    消息流:
    /cmd_vel (AckermannDriveStamped) → 转换处理 → 6个执行器控制器命令
    """
    
    # ROS2核心库
    import rclpy
    from rclpy.node import Node
    
    # ROS消息类型
    from std_msgs.msg import Float64MultiArray
    from ackermann_msgs.msg import AckermannDriveStamped
    
    class ServoCommand(Node):
        """执行器控制指令转换节点
        主要功能:
        1. 订阅阿克曼控制指令(速度+转向角)
        2. 转换为6个独立执行器的控制命令:
           - 4个轮速控制器(前后左右车轮)
           - 2个转向铰链控制器(左右转向机构)
        """
        
        def __init__(self, node_name):
            """节点初始化"""
            super().__init__(node_name)
            
            # --------------------------
            # 创建消息订阅者(输入接口)
            # --------------------------
            # 参数说明:
            # msg_type : AckermannDriveStamped 标准阿克曼控制消息
            # topic_name : /cmd_vel 标准控制话题
            # callback : self.set_throttle_steer 消息处理回调
            # qos_profile : 10 队列深度(平衡实时性与可靠性)
            self.subscription = self.create_subscription(
                AckermannDriveStamped,
                '/cmd_vel',
                self.set_throttle_steer,
                10)
            
            # --------------------------
            # 创建命令发布者(输出接口)
            # --------------------------
            # 控制器命名规范:<位置>_<类型>_controller/commands
            # 队列深度设为1保证最新指令优先
            
            # 左后轮速度控制器
            self.pub_vel_left_rear_wheel = self.create_publisher(
                Float64MultiArray, 
                "/left_rear_wheel_velocity_controller/commands", 
                1)
            
            # 右后轮速度控制器
            self.pub_vel_right_rear_wheel = self.create_publisher(
                Float64MultiArray,
                "/right_rear_wheel_velocity_controller/commands",
                1)
            
            # 左前轮速度控制器 
            self.pub_vel_left_front_wheel = self.create_publisher(
                Float64MultiArray,
                "/left_front_wheel_velocity_controller/commands",
                1)
            
            # 右前轮速度控制器
            self.pub_vel_right_front_wheel = self.create_publisher(
                Float64MultiArray,
                "/right_front_wheel_velocity_controller/commands",
                1)
            
            # 左转向铰链位置控制器
            self.pub_pos_left_steering_hinge = self.create_publisher(
                Float64MultiArray,
                "/left_steering_hinge_position_controller/commands",
                1)
            
            # 右转向铰链位置控制器
            self.pub_pos_right_steering_hinge = self.create_publisher(
                Float64MultiArray,
                "/right_steering_hinge_position_controller/commands",
                1)
    
        def set_throttle_steer(self, msg):
            """控制指令回调函数
            参数:
                msg (AckermannDriveStamped): 输入控制指令,包含:
                    - drive.speed: 纵向速度(m/s)
                    - drive.steering_angle: 前轮转向角(rad)
            处理逻辑:
                1. 速度信号处理:放大28倍(根据电机特性调整)
                2. 转向信号处理:直接传递角度值
                3. 全轮驱动模式:四个车轮接收相同速度指令
                4. 转向对称处理:左右转向机构接收相同角度
            """
            # 速度指令转换
            throttle = Float64MultiArray()
            throttle.data = [msg.drive.speed * 28]
            
            # 转向指令转换
            steer = Float64MultiArray() 
            steer.data = [msg.drive.steering_angle]
            
            # --------------------------
            # 发布到各执行器控制器
            # --------------------------
            # 速度指令发布(全轮相同速度)
            self.pub_vel_left_rear_wheel.publish(throttle)
            self.pub_vel_right_rear_wheel.publish(throttle)
            self.pub_vel_left_front_wheel.publish(throttle)
            self.pub_vel_right_front_wheel.publish(throttle)
            
            # 转向指令发布(对称转向)
            self.pub_pos_left_steering_hinge.publish(steer)
            self.pub_pos_right_steering_hinge.publish(steer)
            
            # 日志记录(INFO级别,带数据格式化)
            self.get_logger().info(
                f"指令已发布 | 速度: {throttle.data[0]:.1f} RPM | 转向角: {steer.data[0]:.2f} rad",
                throttle_once=True  # 节流功能防止高频日志
            )
    
    def main(args=None):
        """ROS2节点主入口"""
        rclpy.init(args=args)
        
        # 节点初始化
        servo_node = ServoCommand("servo_control_node")
        
        try:
            # 保持节点运行
            rclpy.spin(servo_node)
        except KeyboardInterrupt:
            # 捕获Ctrl+C信号
            servo_node.get_logger().info("节点被手动终止")
        finally:
            # 资源清理
            servo_node.destroy_node()
            rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
    

    键盘控制由key_operator.py实现,通过WASD键发布AckermannDriveStamped消息到/cmd_vel话题,控制车辆前进、后退、左转和右转,并支持紧急停止与退出功能。

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    
    """
    ROS2键盘控制节点
    功能:通过键盘输入控制车辆运动(阿克曼模型)
    键位映射:
    w - 前进    s - 后退
    a - 左转    d - 右转
    x - 停止    o - 退出
    CTRL+C - 强制终止
    """
    
    # ROS2核心库
    import rclpy
    from rclpy.node import Node
    
    # 系统库(用于终端输入处理)
    import sys
    import select
    import termios  # POSIX终端控制接口
    import tty      # 终端模式设置
    
    # ROS消息类型
    from geometry_msgs.msg import Twist
    from ackermann_msgs.msg import AckermannDriveStamped
    
    # 获取终端原始设置
    settings = termios.tcgetattr(sys.stdin)
    
    def getKey():
        """非阻塞式获取单个键盘输入
        技术要点:
        1. 设置终端为原始模式(不缓冲输入)
        2. 使用select实现非阻塞读取
        3. 恢复终端原始设置
        """
        tty.setraw(sys.stdin.fileno())  # 设置原始模式
        select.select([sys.stdin], [], [], 0)
        key = sys.stdin.read(1)
        # 恢复终端设置(避免异常状态)
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
        return key
    
    class KeyOperator(Node):
        """键盘控制节点主类
        功能架构:
        1. 创建阿克曼指令发布者
        2. 持续监听键盘输入
        3. 转换键位为控制指令
        4. 发布车辆控制指令
        """
        def __init__(self, name):
            super().__init__(name)
            
            # --------------------------
            # 创建指令发布者(输出接口)
            # --------------------------
            # 参数说明:
            # msg_type : AckermannDriveStamped 标准车辆控制消息
            # topic_name : /cmd_vel 标准控制话题
            # qos_profile : 10 队列深度
            self.pub = self.create_publisher(
                AckermannDriveStamped,
                "/cmd_vel",
                10)
            
            # 初始化控制指令对象
            self.akm = AckermannDriveStamped()
            self.twist = Twist()  # 备用Twist消息
            
            # 启动键盘监听循环
            self.loop()
    
        def loop(self):
            """主控制循环
            处理流程:
            1. 持续获取键盘输入
            2. 根据键位设置速度/转向参数
            3. 发布控制指令
            4. 处理退出信号
            """
            while True:
                # 初始化控制参数
                temp_x, temp_a = 0.0, 0.0  # 速度(m/s)和转向角(rad)
                
                # 获取键盘输入(非阻塞)
                key = getKey()
                
                # 记录输入日志(DEBUG级别)
                self.get_logger().debug("get Key : [{}]".format(key.upper()))
                
                # --------------------------
                # 键位映射处理
                # --------------------------
                if key == "w":  # 前进
                    self.get_logger().info("move => Front")
                    temp_x = 0.3  # 0.3 m/s
                elif key == "s":  # 后退
                    self.get_logger().info("move => Back")
                    temp_x = -0.2  # -0.2 m/s
                elif key == "a":  # 左转
                    self.get_logger().info("move => Front Left")
                    temp_x = 0.2   # 0.2 m/s
                    temp_a = 2.0    # 2 rad(原始值)
                elif key == "d":  # 右转
                    self.get_logger().info("move => Front right")
                    temp_x = 0.2   # 0.2 m/s
                    temp_a = -2.0   # -2 rad(原始值)
                elif key == "x":  # 停止
                    self.get_logger().info("move => Stop")
                    temp_x, temp_a = 0.0, 0.0
                elif key == "o":  # 退出程序
                    self.get_logger().info("move => Exit")
                    break
                elif key == '\x03':  # CTRL+C
                    break
                else:
                    continue  # 忽略无效输入
                
                # --------------------------
                # 指令转换与发布
                # --------------------------
                # 设置阿克曼指令参数
                self.akm.drive.speed = temp_x  # 纵向速度
                self.akm.drive.steering_angle = temp_a / 5  # 归一化转向角
                
                # 发布控制指令
                self.pub.publish(self.akm)
    
    def main(args=None):
        """ROS2节点主入口"""
        rclpy.init(args=args)
        
        # 节点初始化
        node = KeyOperator("pub_cmd")
        # 保持节点运行
        rclpy.spin(node)
        node.destroy_node()
        rclpy.shutdown()
    
    if __name__ == '__main__':
        main()
    
  • 3、实验操作

    找到car_ws/src/car_bringup/launch/car_sim.launch.py文件,并注释其中的spawn_obstacle一行并保存文件。

    image

    car_ws文件夹中新建终端,输入以下命令。

    ./keyboard_control.sh

    image

    gazebo场景生成后,在终端内输入特定按键即可控制小车行驶。

    image

清理资源

  • 计算巢—服务实例—复制服务实例ID,点击【删除】

    image

  • 在弹窗粘贴服务实例ID,并进行勾选,点击【确定删除】

    image

  • 完成安全验证后,即可成功释放实例。

    image

  • 回到云服务器ECS——实例,检查是否成功释放资源

    image

关闭实验

  • 在完成实验后,点击 结束实操

    image

  • 点击 取消 回到实验页面,点击 确定 跳转实验评分

    image

  • 请为本次实验评分,并给出您的建议,点击 确认,结束本次实验

    image