调度实验
本实验将场景划分为四个区域,分别是环岛区域和通过路口与环岛连接的其他区域。当位于环岛的传感器检测到车辆密度高于阈值时,会向各区域的车辆发送类似“拥堵”的消息
场景简介
本实验将场景划分为四个区域,分别是环岛区域和通过路口与环岛连接的其他区域。当位于环岛的传感器检测到车辆密度高于阈值时,会向各区域的车辆发送类似“拥堵”的消息
实验室资源方式简介
进入实操前,请确保阿里云账号满足以下条件:
个人账号资源
使用您个人的云资源进行操作,资源归属于个人。
平台仅提供手册参考,不会对资源做任何操作。
确保已完成云工开物300元代金券领取。
已通过实名认证且账户余额≥100元。
本次实验将在您的账号下开通实操所需计算型实例规格族c7a,费用约为:17元/时(具体金额取决于实验完成的时间),需要您通过阿里云云工开物学生专属300元抵扣金兑换本次实操的云资源。
如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。
领取专属权益及创建实验资源
在开始实验之前,请先点击右侧屏幕的“进入实操”再进行后续操作

本次实验需要您通过领取阿里云云工开物学生专属300元抵扣券兑换本次实操的云资源,如未领取请先点击领取。(若已领取请跳过)

实验产生的费用优先使用优惠券,优惠券使用完毕后需您自行承担。

实验步骤
1、服务部署
点击链接,进入部署页面
按下图设置【服务实例名称】、【地域】、【实例密码】
服务实例名称:test(可自定义命名)
地域:华东2(上海)
实例密码:Sjtu@520
说明输入实例密码时请注意大小写,请记住您设置的实例名称及对应密码,后续实验过程会用到。

完成填写后点击【下一步:确认订单】

核对实例信息及价格预览,无误请点击【立即创建】
重要领取300元优惠券后,资源应为0元/小时,且会提示【您当前账户的余额充足】!若提示余额不足等,请检查是否正确领取优惠券
创建成功,点击【去列表查看】

查看实例,并等待部署

点击左侧的图标展开目录,选择目录中的【云服务器ECS】

云服务器ECS—实例—远程连接

下拉展开更多登录方式,选择【通过VNC远程连接】

选择computenest,并输入实例密码:computenest,回车

进入镜像后,点击左侧第3个图标,打开文件夹后点击catkin_ws文件夹

进入catkin_ws文件夹,在空白处点击鼠标右键,选择Open in Terminal


2、代码部分
生成gazebo仿真环境需要调用spawn.launch
<launch> <!-- 定义启动文件中的参数 --> <arg name="paused" default="false"/> <!-- 是否暂停仿真,默认为false --> <arg name="use_sim_time" default="true"/> <!-- 是否使用仿真时间,默认为true --> <arg name="gui" default="true"/> <!-- 是否启动GUI,默认为true --> <arg name="headless" default="false"/> <!-- 是否无头模式,默认为false --> <arg name="debug" default="false"/> <!-- 是否调试模式,默认为false --> <arg name="verbose" default="false"/> <!-- 是否详细输出,默认为false --> <arg name="model" value="$(find site_model)/urdf/car.urdf"/> <!-- 机器人的URDF模型路径 --> <!-- Start gazebo and load the world --> <include file="$(find gazebo_ros)/launch/empty_world.launch" > <arg name="paused" value="$(arg paused)"/> <arg name="use_sim_time" value="$(arg use_sim_time)"/> <arg name="gui" value="$(arg gui)"/> <arg name="headless" value="$(arg headless)"/> <arg name="debug" value="$(arg debug)"/> <arg name="verbose" value="$(arg verbose)"/> </include> <!-- Spawn the site --> <node name="tf_footprint_base" pkg="tf" type="static_transform_publisher" args="0 0 0 0 0 0 base_link base_footprint 40" /> <arg name="gpu" default="false"/> <arg name="organize_cloud" default="false"/> <param name="robot_description" command="$(find xacro)/xacro --inorder '$(find site_model)/urdf/site_model.urdf.xacro' organize_cloud:=$(arg organize_cloud) gpu:=$(arg gpu)" /> <node pkg="gazebo_ros" type="spawn_model" name="spawn_model" args="-urdf -param /robot_description -model site_model"/> <node pkg="robot_state_publisher" type="robot_state_publisher" name="robot_state_publisher"> <param name="publish_frequency" type="double" value="30.0" /> </node> <node name="fake_joint_calibration" pkg="rostopic" type="rostopic" args="pub /calibrated std_msgs/Bool true"/> <!-- 从yaml文件加载联合控制器的参数 --> <rosparam file="$(find pkg)/config/ctrl.yaml" command="load"/> <!-- 加载机器人模型描述参数 --> <group ns="car1"> <param name="robot_description" textfile="$(find pkg)/urdf/car1.urdf"/> <node name= "robot_state_publisher" pkg= "robot_state_publisher" type= "robot_state_publisher"> <param name="publish_frequency" type="double" value="20.0" /> </node> <node name="urdf_spawner" pkg="gazebo_ros" type="spawn_model" respawn="false" output="screen" args="-urdf -model car1 -param robot_description -x -1.5 -y -3 -z 0.05"/> <node name="controller_manager" pkg="controller_manager" type="spawner" respawn="true" output="screen" ns="/car1" args="left_rear_wheel_velocity_controller right_rear_wheel_velocity_controller left_front_wheel_velocity_controller right_front_wheel_velocity_controller left_steering_hinge_position_controller right_steering_hinge_position_controller joint_state_controller"/> </group> <group ns="car2"> <param name="robot_description" textfile="$(find pkg)/urdf/car2.urdf"/> <node name= "robot_state_publisher" pkg= "robot_state_publisher" type= "robot_state_publisher"> <param name="publish_frequency" type="double" value="20.0" /> </node> <node name="urdf_spawner" pkg="gazebo_ros" type="spawn_model" respawn="false" output="screen" args="-urdf -model car2 -param robot_description -x -1.5 -y -4 -z 0.05"/> <node name="controller_manager" pkg="controller_manager" type="spawner" respawn="true" output="screen" ns="/car2" args="left_rear_wheel_velocity_controller right_rear_wheel_velocity_controller left_front_wheel_velocity_controller right_front_wheel_velocity_controller left_steering_hinge_position_controller right_steering_hinge_position_controller joint_state_controller"/> </group> <node pkg="site_model" type="ServoCommands.py" name="ServoCommands" output="screen"/> </launch>
场景生成后调用model.launch整理数据文件。
<launch> <!-- 启动文件,用于启动ROS节点 --> <!-- 启动名为Set_param的节点,该节点属于site_model包,执行的脚本为Set_param.py,输出信息将显示在屏幕上 --> <node pkg="site_model" type="Set_param.py" name="Set_param" output="screen"/> <!-- 启动名为Car01的节点,该节点属于site_model包,执行的脚本为Car01.py,输出信息将显示在屏幕上 --> <node pkg="site_model" type="Car01.py" name="Car01" output="screen"/> <!-- 启动名为Car02的节点,该节点属于site_model包,执行的脚本为Car02.py,输出信息将显示在屏幕上 --> <node pkg="site_model" type="Car02.py" name="Car02" output="screen"/> <!-- 启动名为Car03的节点,该节点属于site_model包,执行的脚本为Car03.py,输出信息将显示在屏幕上 --> <node pkg="site_model" type="Car03.py" name="Car03" output="screen"/> <!-- 启动名为Main的节点,该节点属于site_model包,执行的脚本为Main.py,输出信息将显示在屏幕上 --> <node pkg="site_model" type="Main.py" name="Main" output="screen"/> </launch>model.launch运行了多个py文件,其中Set_param.py如下:
#! /usr/bin/env python # 指定脚本使用Python解释器执行 import rospy # 导入ROS的Python库,用于与ROS系统进行交互 if __name__=='__main__': # 确保脚本在被直接运行时执行以下代码 rospy.init_node('Set_param') # 初始化一个名为'Set_param'的ROS节点 rospy.set_param('p_strict_1', 0.7) # 设置ROS参数'p_strict_1'的值为0.7 rospy.set_param('p_strict_2', 0.8) # 设置ROS参数'p_strict_2'的值为0.8 rospy.set_param('p_strict_3', 0.6) # 设置ROS参数'p_strict_3'的值为0.6 rospy.set_param('strict_selected', 0.) # 设置ROS参数'strict_selected'的值为0.0 rospy.set_param('use_mab', 1) # 设置ROS参数'use_mab'的值为1以Car01.py为例:
#! /usr/bin/env python # 指定脚本使用Python解释器执行 from Car import Car # 导入自定义的Car类 import time # 导入时间库 import rospy # 导入ROS Python库 if __name__=='__main__': # 确保脚本作为主程序运行时执行以下代码 # 定义数据文件路径 path = '/home/computenest/catkin_ws/src/site_model/src/scripts_wx/RMMDet_MAB/data_MAB_car01_04.txt' # 创建Car对象,传入模型名称、起始索引和数据文件路径 car1 = Car('car1', 1, path) # 调用Car对象的run方法,启动节点并开始运行 car1.run()其中调用的Car.py内容如下:
#! /usr/bin/env python # 指定脚本使用Python解释器执行 import rospy, cv2, cv_bridge # 导入ROS、OpenCV和cv_bridge库 import numpy as np # 导入NumPy库 from sensor_msgs.msg import Image # 导入ROS图像消息类型 from ackermann_msgs.msg import AckermannDriveStamped # 导入Ackermann驱动消息类型 from math import * # 导入数学函数 from gazebo_msgs.msg import ModelStates # 导入Gazebo模型状态消息类型 import message_filters # 导入消息过滤器库 import time # 导入时间库 from msgs.msg import StrictIndex # 导入自定义的StrictIndex消息类型 class Car: def __init__(self, model_name, start_index, data_path): # 初始化Car类,设置发布者、模型名称、严格索引、数据文件路径等 self.cmd_vel_pub = rospy.Publisher("/"+model_name+"/ackermann_cmd_mux/output", AckermannDriveStamped, queue_size=10) self.strict_index_pub = rospy.Publisher('/'+model_name+'/strict_index', StrictIndex, queue_size=10) self.model_index = None # 车辆索引 self.model_name = model_name # 车辆名称 self.strict_index = start_index # 开始时的严格索引 self.data_file = open(data_path, 'a+') # 打开数据文件以追加写入 self.circle_start = 0 # 圆形区域开始时间 self.circle_end = 0 # 圆形区域结束时间 self.time_in_circle = 0 # 在圆形区域的时间 self.time_start = 0 # 启动时间 self.print_flag = 1 # 打印标志 def run(self): # 运行节点,初始化ROS节点并订阅相机和模型状态 self.time_start = round(time.time(), 2) rospy.init_node("RunningCar_"+self.model_name) m1 = message_filters.Subscriber("/"+self.model_name+"/"+self.model_name+"/camera/zed_left/image_rect_color_left", Image) m2 = message_filters.Subscriber("/gazebo/model_states", ModelStates) ts = message_filters.ApproximateTimeSynchronizer([m1,m2], 1, 1, allow_headerless=True) ts.registerCallback(self.callback) rospy.spin() def callback(self, image_msg, model_states_msg): # 回调函数,处理接收到的图像和模型状态消息 now = round(time.time(), 2) time_used = now-self.time_start self.data_file.write('Time used: '+str(time_used)+', Time in circle: '+str(self.time_in_circle)+'\n') # 从参数服务器获取参数 self.p_strict_1 = rospy.get_param('p_strict_1', 0.7) self.p_strict_2 = rospy.get_param('p_strict_2', 0.8) self.p_strict_3 = rospy.get_param('p_strict_3', 0.6) # 将自己的位置写进参数服务器 rospy.set_param(self.model_name+'/strict_index', self.strict_index) # 判断自己是否应该降低概率 use_mab = rospy.get_param('use_mab') if use_mab: tensive_strict = rospy.get_param('strict_selected', 0) if tensive_strict==self.strict_index: if self.print_flag: print(self.model_name+': Parameters changed!') self.print_flag = 0 self.adjust_param() bridge = cv_bridge.CvBridge() frame = bridge.imgmsg_to_cv2(image_msg, 'bgr8') if not self.model_index: self.model_index = self.get_model_index(model_states_msg) model_pos = model_states_msg.pose[self.model_index].position self.running(frame, model_pos) def reset_param(self): # 重置参数 self.print_flag = 1 rospy.set_param('p_strict_1', 0.7) rospy.set_param('p_strict_2', 0.8) rospy.set_param('p_strict_3', 0.6) def adjust_param(self): # 调整参数 rospy.set_param('p_strict_1', 0.5) rospy.set_param('p_strict_2', 0.5) rospy.set_param('p_strict_3', 0.5) pass def running(self, image, pos): # 根据严格索引调用不同的运行逻辑 pos_x = round(pos.x,3) pos_y = round(pos.y,3) if self.strict_index == 1: self.runningcar_1(image, pos_x, pos_y) # 执行区域1的逻辑 elif self.strict_index == 2: self.runningcar_2(image, pos_x, pos_y) # 执行区域2的逻辑 elif self.strict_index == 3: self.runningcar_3(image, pos_x, pos_y) # 执行区域3的逻辑 elif self.strict_index == 0: self.runningcar_0(image, pos_x, pos_y) # 执行默认逻辑 def record(self): # 记录时间 self.circle_end = round(time.time(), 2) self.time_in_circle += self.circle_end-self.circle_start self.circle_end = 0 self.circle_start = 0 def set_roi_forward(self, h, w, mask): # 设置感兴趣区域 search_top = 400 search_bot = search_top + 80 mask[0:search_top, 0:w] = 0 # 上部区域置为0 mask[search_bot:h, 0:w] = 0 # 下部区域置为0 return mask def get_model_index(self, msg): # 获取模型索引 for i,name in enumerate(msg.name): if self.model_name==name: return i def roll(self, p): # 根据概率p返回True或False if np.random.rand()<=p: return True else: return False def follow_line(self, image): # 跟随白色线 hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV) # 转换颜色空间 lower_white = np.array([0, 0, 200]) # 白色的下界 upper_white = np.array([180, 30, 255]) # 白色的上界 mask = cv2.inRange(hsv, lower_white, upper_white) # 创建白色掩膜 h,w = mask.shape mask = self.set_roi_forward(h, w, mask) # 设置ROI M = cv2.moments(mask) # 计算图像矩 if M['m00']>0: cx = int(M['m10'] / M['m00'])-0 cy = int(M['m01'] / M['m00']) cv2.circle(image, (cx, cy), 20, (0, 0, 255), -1) # 在图像上绘制中心点 err = cx - w / 2 - 50 # 计算偏差 x = 0.1 # 前进速度 z = -float(err / 2.0) / 50 # 转向角度 else: # 如果未检测到白线,根据严格索引调整 if self.strict_index==2: print('Adjust '+self.model_name+' in strict 2...') x = 0.1 z = 0.7 elif self.strict_index==0: print('Adjust '+self.model_name+' in strict 0...') x = 0.0 z = -0.0 else: x = 0. z = 0. akm = AckermannDriveStamped() # 创建AckermannDriveStamped消息 akm.drive.speed = x*1.80 # 设置速度 if x==0.0: akm.drive.steering_angle = 0 # 停止时转向角度为0 else: akm.drive.steering_angle = atan(z/x*0.133) # 计算转向角度 self.cmd_vel_pub.publish(akm) # 发布控制命令 def keep_running(self, x, a, duration): # 保持运行状态 akm = AckermannDriveStamped() akm.drive.speed = x akm.drive.steering_angle = a self.cmd_vel_pub.publish(akm) time.sleep(duration) # 等待指定时间 # 环内的逻辑 def turn_0_1(self): # 在环内执行转弯动作 self.keep_running(0.2, 0.6, 5) # 向左转 self.keep_running(0.1, -0.7, 1) # 向后转 self.keep_running(0.1, 0.7, 2) # 向右转 def turn_0_2(self): # 在环内执行转弯动作 self.keep_running(0.3, -0.25, 1) # 向左轻微转 def turn_0_2_(self): # 在环内执行转弯动作 self.keep_running(0.3, 0.25, 3) # 向右轻微转 self.keep_running(0.2, -0.06, 3) # 向后微调 def turn_0_3(self): # 在环内执行转弯动作 self.keep_running(0.3, 0.5, 4) # 向左转 def runningcar_0(self, image, pos_x, pos_y): # 在环内运行的逻辑 err_x_1 = pos_x-0.69 err_y_1 = pos_y+1.27 err_x_2 = pos_x+0.32 err_y_2 = pos_y+0.33 err_x_3 = pos_x+0.12 err_y_3 = pos_y+1.77 # 根据位置误差判断是否进入新区域 if abs(err_x_2)<0.03 and abs(err_y_2<0.03): if self.roll(0.5): self.turn_0_2() else: self.turn_0_2_() self.record() self.strict_index = 3 print(self.model_name+" enters strict 3!") elif abs(err_x_1)<0.03 and abs(err_y_1)<0.03: if self.roll(0.5): self.turn_0_1() self.record() self.strict_index = 2 print(self.model_name+" enters strict 2!") else: self.follow_line(image) elif abs(err_x_3)<0.03 and abs(err_y_3)<0.03: if self.roll(0.5): self.turn_0_3() self.record() self.strict_index = 1 print(self.model_name+" enters strict 1!") else: pass else: self.follow_line(image) # 继续跟踪线 # 第一个路口的逻辑 def turn_1(self): # 在第一个路口执行转弯动作 self.keep_running(0.3, 0.7, 1.2) # 向左转 self.keep_running(0.3, 0.0, 1) # 直行 self.keep_running(0.3, 0.8, 1.6) # 向右转 def keep_1(self): # 在第一个路口保持直行 self.keep_running(0.3, 0.7, 1.1) # 继续向左转 self.keep_running(0.3, 0.0, 2.7) # 直行 def runningcar_1(self, image, pos_x, pos_y): # 在第一个路口运行的逻辑 err_x = pos_x-1.46 err_y = pos_y+2.68 if abs(err_x)<0.05 and abs(err_y)<0.03: if self.roll(self.p_strict_1): self.turn_1() self.circle_start = round(time.time(), 2) print(self.model_name+" enters strict 0!") self.strict_index = 0 else: self.keep_1() print(self.model_name+" enters strict 2!") self.strict_index = 2 self.reset_param() else: self.follow_line(image) # 继续跟踪线 # 第二个路口的逻辑 def turn_2(self): # 在第二个路口执行转弯动作 self.keep_running(0.3, -0.3, 2.5) # 向左转 self.keep_running(0.3, 0.7, 1.5) # 向右转 self.keep_running(0.3, 0., 1) # 直行 self.keep_running(0.3, 0.7, 1.0) # 向右转 def keep_2(self): # 在第二个路口保持直行 self.keep_running(0.3, -0.5, 4.5) def runningcar_2(self, image, pos_x, pos_y): # 在第二个路口运行的逻辑 err_x_2 = pos_x-0.8 err_y_2 = pos_y-1.1 if abs(err_x_2)<0.03 and abs(err_y_2)<0.03: if self.roll(self.p_strict_2): self.turn_2() self.circle_start = round(time.time(), 2) print(self.model_name+" enters strict 0!") self.strict_index = 0 else: self.keep_2() print(self.model_name+" enters strict 3!") self.strict_index = 3 self.reset_param() else: self.follow_line(image) # 继续跟踪线 # 第三个路口的逻辑 def keep_3(self): # 在第三个路口保持直行 self.keep_running(0.2, -0.02, 6) # 向左转 def turn_3(self): # 在第三个路口执行转弯动作 self.keep_running(0.3, 0.6, 3.5) # 向左转 self.keep_running(0.3, -0.5, 1) # 向右转 self.keep_running(0.3, 0.7, 1.5) # 向左转 def runningcar_3(self, image, pos_x, pos_y): # 在第三个路口运行的逻辑 err_x_1 = pos_x+1.74 err_y_1 = pos_y-1.65 err_x_2 = pos_x+1.77 err_y_2 = pos_y+1.83 if abs(err_x_1)<0.03 and abs(err_y_1)<0.03: self.keep_3() elif abs(err_x_2)<0.03 and abs(err_y_2)<0.03: if self.roll(self.p_strict_3): self.turn_3() self.circle_start = round(time.time(), 2) print(self.model_name+" enters strict 0!") self.strict_index = 0 else: print(self.model_name+" enters strict 1!") self.strict_index = 1 self.reset_param() else: self.follow_line(image) # 继续跟踪线Main.py内容如下:
#! /usr/bin/env python # 指定脚本使用Python解释器执行 import rospy, cv2, cv_bridge # 导入ROS、OpenCV和cv_bridge库 import numpy as np # 导入NumPy库 from sensor_msgs.msg import Image # 导入ROS图像消息类型 from ackermann_msgs.msg import AckermannDriveStamped # 导入Ackermann驱动消息类型 from math import * # 导入数学函数 from gazebo_msgs.msg import ModelStates # 导入Gazebo模型状态消息类型 import message_filters # 导入消息过滤器库 import time # 导入时间库 ''' 区域1下降0.2 区域2下降0.3 区域3下降0.1 ''' def roll(p): # 定义一个函数,用于模拟概率事件 if np.random.rand()<=p: # 如果随机数小于等于p return 1 # 返回1 else: return 0 # 返回0 def count(indexes): # 定义一个函数,用于统计索引分布 density = [0,0,0,0] # 初始化密度数组 for i in indexes: # 遍历索引 density[int(i)] += 1 # 统计每个索引的出现次数 return np.array(density) # 返回统计结果 if __name__=='__main__': # 确保脚本作为主程序运行时执行以下代码 rospy.init_node('Main') # 初始化ROS节点,命名为'Main' beta_a = np.ones(3) # 初始化Beta分布的参数a beta_b = np.ones(3) # 初始化Beta分布的参数b probility = np.array([0.2, 0.3, 0.1]) # 定义每个区域的概率 regret = 0 # 初始化懊悔值 regret_file = open('/home/computenest/catkin_ws/src/site_model/src/scripts_wx/RMMDet_MAB/Regret_Record_04.txt', 'w') # 打开文件用于记录懊悔值 try: while True: # 进入主循环 time.sleep(1) # 每次循环暂停1秒 # MAB算法部分 indexes = [] # 初始化索引列表 for i in range(3): # 遍历3个区域 indexes.append(int(rospy.get_param('car'+str(i+1)+'/strict_index'))) # 获取每个区域的索引 indexes = np.array(indexes) # 将索引列表转换为NumPy数组 density = np.delete(count(indexes)*1000,0) # 计算每个区域的密度并删除第一个元素 award_predicted = density*np.random.beta(beta_a, beta_b) # 计算预测奖励 strict_selected = np.argmax(award_predicted)+1 # 选择奖励最大的区域 # 将参数设置为选中的区域 rospy.set_param('strict_selected', float(strict_selected)) # 设置ROS参数为选中的区域 award = roll(probility[strict_selected-1]) # 模拟奖励事件 beta_a[strict_selected-1] = beta_a[strict_selected-1]+award # 更新Beta分布的参数a beta_b[strict_selected-1] = beta_b[strict_selected-1]+1-award # 更新Beta分布的参数b # 计算懊悔 real_award = probility*density # 计算实际奖励 regret += np.max(real_award)-real_award[strict_selected-1] # 更新懊悔值 regret = round(regret, 2) # 四舍五入懊悔值 print('Regret now: ', regret) # 打印当前懊悔值 regret_file.write(str(regret)+'\n') # 将懊悔值写入文件 except: regret_file.close() # 关闭文件 pass # 捕获异常并忽略运行完model.launch文件后,分别调用ServoCommands.py和KeyControl.py控制小车行驶。
ServoCommand.py如下:
#!/usr/bin/python3 # 指定脚本使用 Python 3 解释器执行 import rospy # 导入 ROS Python 客户端库,用于与 ROS 系统交互 from std_msgs.msg import Float64 # 导入 ROS 标准消息类型 Float64,用于发布浮点数数据 from ackermann_msgs.msg import AckermannDriveStamped # 导入 AckermannDriveStamped 消息类型,用于接收速度和转向命令 def set_throttle_steer(data): # 定义回调函数,用于处理接收到的 AckermannDriveStamped 消息 # 创建发布者对象,用于向各个车轮速度控制器发布速度命令 pub_vel_left_rear_wheel = rospy.Publisher("/car1/left_rear_wheel_velocity_controller/command", Float64, queue_size=10) pub_vel_right_rear_wheel = rospy.Publisher("/car1/right_rear_wheel_velocity_controller/command", Float64, queue_size=10) pub_vel_left_front_wheel = rospy.Publisher("/car1/left_front_wheel_velocity_controller/command", Float64, queue_size=10) pub_vel_right_front_wheel = rospy.Publisher("/car1/right_front_wheel_velocity_controller/command", Float64, queue_size=10) # 创建发布者对象,用于向转向控制器发布转向角度命令 pub_pos_left_steering_hinge = rospy.Publisher("/car1/left_steering_hinge_position_controller/command", Float64, queue_size=10) pub_pos_right_steering_hinge = rospy.Publisher("/car1/right_steering_hinge_position_controller/command", Float64, queue_size=10) # 从接收到的消息中提取速度值,并乘以 28 进行缩放 throttle = data.drive.speed * 28 # 从接收到的消息中提取转向角度值 steer = data.drive.steering_angle # 向四个车轮的速度控制器发布速度命令 pub_vel_left_rear_wheel.publish(throttle) pub_vel_right_rear_wheel.publish(throttle) pub_vel_left_front_wheel.publish(throttle) pub_vel_right_front_wheel.publish(throttle) # 向两个转向控制器发布转向角度命令 pub_pos_left_steering_hinge.publish(steer) pub_pos_right_steering_hinge.publish(steer) def servo_commands(): # 定义主函数,用于初始化 ROS 节点并订阅消息 # 初始化 ROS 节点,节点名称为 'servo_commands1',anonymous=True 表示节点名称会自动添加唯一后缀 rospy.init_node('servo_commands1', anonymous=True) # 订阅 "/car1/ackermann_cmd_mux/output" 主题,接收 AckermannDriveStamped 消息,并调用 set_throttle_steer 回调函数处理 rospy.Subscriber("/car1/ackermann_cmd_mux/output", AckermannDriveStamped, set_throttle_steer) # 保持节点运行,等待消息并调用回调函数 rospy.spin() if __name__=='__main__': # 主程序入口,确保脚本直接运行时执行以下代码 try: # 调用 servo_commands 函数,启动 ROS 节点 servo_commands() except rospy.ROSInternalException: # 捕获 ROS 内部异常并忽略,防止程序崩溃 passKeyControl.py如下:
#! /usr/bin/env python # 指定脚本使用系统环境中的 Python 解释器执行 import rospy # 导入 ROS Python 客户端库,用于与 ROS 系统交互 import sys, select, termios, tty # 导入系统、终端设置和终端输入输出相关的库 from ackermann_msgs.msg import AckermannDriveStamped # 导入 AckermannDriveStamped 消息类型,用于发布速度和转向命令 def getKey(): # 定义函数,用于从终端获取单个按键输入 tty.setraw(sys.stdin.fileno()) # 设置终端为原始模式,禁用行缓冲和特殊字符处理 select.select([sys.stdin], [], [], 0) # 监听标准输入,等待用户按键 key = sys.stdin.read(1) # 读取用户按下的单个字符 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings) # 恢复终端原始设置 return key # 返回用户按下的字符 settings = termios.tcgetattr(sys.stdin) # 获取当前终端设置并保存到变量 settings 中 def pub_cmd(): # 定义主函数,用于发布速度和转向命令 index = 1 # 定义车辆索引,默认为 1 rospy.init_node("pub_cmd") # 初始化 ROS 节点,节点名称为 "pub_cmd" # rospy.set_param("Car Index", 1) # 注释掉的代码:设置 ROS 参数 "Car Index" 为 1 pub = rospy.Publisher("/car"+str(index)+"/ackermann_cmd_mux/output", AckermannDriveStamped, queue_size=10) # 创建发布者对象,用于向指定主题发布 AckermannDriveStamped 消息 akm = AckermannDriveStamped() # 创建 AckermannDriveStamped 消息对象 while 1: # 进入无限循环,持续监听键盘输入 x=0 # 初始化速度变量,默认为 0 a=0 # 初始化转向角度变量,默认为 0 key = getKey() # 调用 getKey 函数获取用户按下的按键 rospy.loginfo("键盘已录入:%s", key) # 记录日志,显示用户按下的按键 if key == 'w': # 如果按下 'w' 键,设置前进速度 x=0.3 a=0 elif key == 's': # 如果按下 's' 键,设置后退速度 x=-0.3 a=0 elif key == 'a': # 如果按下 'a' 键,设置左转 x=0.3 a=0.7 elif key == 'd': # 如果按下 'd' 键,设置右转 x=0.3 a=-0.7 elif key == 'x': # 如果按下 'x' 键,停止车辆 x=0 a=0 elif key == 'o': # 如果按下 'o' 键,退出循环 break else: # 如果按下其他键,跳过当前循环 continue # akm.drive.speed = x*6.97674 # 注释掉的代码:将速度值乘以 6.97674 进行缩放 akm.drive.speed = x # 设置消息中的速度值 akm.drive.steering_angle = a*0.7 # 设置消息中的转向角度值,并乘以 0.7 进行缩放 # akm.drive.jerk = 2 # 注释掉的代码:设置消息中的 jerk 值 pub.publish(akm) # 发布消息到指定主题 if __name__=="__main__": # 主程序入口,确保脚本直接运行时执行以下代码 pub_cmd() # 调用 pub_cmd 函数,启动 ROS 节点 pass # 占位符,无实际作用3、实验操作
在~/catkin_ws文件夹中新建终端,输出以下命令开启程序。
source devel/setup.bash roslaunch site_model spawn.launch
在~/catkin_ws文件夹中新建终端,输出以下命令开启程序。
source devel/setup.bash roslaunch site_model model.launch
在~/catkin_ws文件夹中新建两个终端,分别输出以下命令开启程序。
cd src/site_model/src/scripts_wx/ python3 ServoCommands.pycd src/site_model/src/scripts_wx/ python3 KeyControl.py
运行一段时间后退出所有程序,在/catkin_ws/src/site_model/src/scripts_wx/RMMDet_MAB路径下找到data_MAB_car01_04.txt文件打开可查看小车运行的总时间和在环岛区域里面的时间数据

清理资源
计算巢—服务实例—复制服务实例ID,点击【删除】

在弹窗粘贴服务实例ID,并进行勾选,点击【确定删除】

完成安全验证后,即可成功释放实例。

回到云服务器ECS——实例,检查是否成功释放资源

关闭实验
在完成实验后,点击 结束实操

点击 取消 回到实验页面,点击 确定 跳转实验评分

请为本次实验评分,并给出您的建议,点击 确认,结束本次实验






















