雷达-相机融合实验
本实验通过键盘控制车辆通过高架,通过毫米波雷达和相机消息融合,保存车辆定位点为本地图片
场景简介
本实验通过键盘控制车辆通过高架,通过毫米波雷达和相机消息融合,保存车辆定位点为本地图片
实验室资源方式简介
进入实操前,请确保阿里云账号满足以下条件:
个人账号资源
使用您个人的云资源进行操作,资源归属于个人。
平台仅提供手册参考,不会对资源做任何操作。
确保已完成云工开物300元代金券领取。
已通过实名认证且账户余额≥100元。
本次实验将在您的账号下开通实操所需计算型实例规格族c7a,费用约为:3元/时(具体金额取决于实验完成的时间),需要您通过阿里云云工开物学生专属300元抵扣金兑换本次实操的云资源。
如果您调整了资源规格、使用时长,或执行了本方案以外的操作,可能导致费用发生变化,请以控制台显示的实际价格和最终账单为准。
领取专属权益及创建实验资源
在开始实验之前,请先点击右侧屏幕的“进入实操”再进行后续操作

本次实验需要您通过领取阿里云云工开物学生专属300元抵扣券兑换本次实操的云资源,如未领取请先点击领取。(若已领取请跳过)

实验产生的费用优先使用优惠券,优惠券使用完毕后需您自行承担。

实验步骤
1、服务部署
点击链接,进入部署页面
按下图设置【服务实例名称】、【地域】、【实例密码】
服务实例名称:test(可自定义命名)
地域:华东2(上海)
实例密码:Sjtu@520
说明输入实例密码时请注意大小写,请记住您设置的实例名称及对应密码,后续实验过程会用到。

完成填写后点击【下一步:确认订单】

核对实例信息及价格预览,无误请点击【立即创建】
重要领取300元优惠券后,资源应为0元/小时,且会提示【您当前账户的余额充足】!若提示余额不足等,请检查是否正确领取优惠券
创建成功,点击【去列表查看】

查看实例,并等待部署

点击左侧的图标展开目录,选择目录中的【云服务器ECS】

云服务器ECS—实例—远程连接

下拉展开更多登录方式,选择【通过VNC远程连接】

点击ecs-user,并输入之前设置的实例密码:Sjtu@520,回车

进入镜像后,点击左侧第2个图标,打开文件夹后点击catkin_ws文件夹

进入catkin_ws文件夹,在空白处点击鼠标右键,选择Open in Terminal


2、代码部分
生成gazebo仿真环境需要调用spawn.launch
<launch> <!-- 定义启动文件中的参数 --> <arg name="paused" default="false"/> <!-- 是否暂停仿真,默认为false --> <arg name="use_sim_time" default="true"/> <!-- 是否使用仿真时间,默认为true --> <arg name="gui" default="true"/> <!-- 是否启动GUI,默认为true --> <arg name="headless" default="false"/> <!-- 是否无头模式,默认为false --> <arg name="debug" default="false"/> <!-- 是否调试模式,默认为false --> <arg name="verbose" default="false"/> <!-- 是否详细输出,默认为false --> <arg name="model" value="$(find site_model)/urdf/car.urdf"/> <!-- 机器人的URDF模型路径 --> <!-- Start gazebo and load the world --> <include file="$(find gazebo_ros)/launch/empty_world.launch" > <arg name="paused" value="$(arg paused)"/> <arg name="use_sim_time" value="$(arg use_sim_time)"/> <arg name="gui" value="$(arg gui)"/> <arg name="headless" value="$(arg headless)"/> <arg name="debug" value="$(arg debug)"/> <arg name="verbose" value="$(arg verbose)"/> </include> <!-- Spawn the site --> <node name="tf_footprint_base" pkg="tf" type="static_transform_publisher" args="0 0 0 0 0 0 base_link base_footprint 40" /> <arg name="gpu" default="false"/> <arg name="organize_cloud" default="false"/> <param name="robot_description" command="$(find xacro)/xacro --inorder '$(find site_model)/urdf/site_model.urdf.xacro' organize_cloud:=$(arg organize_cloud) gpu:=$(arg gpu)" /> <node pkg="gazebo_ros" type="spawn_model" name="spawn_model" args="-urdf -param /robot_description -model site_model"/> <node pkg="robot_state_publisher" type="robot_state_publisher" name="robot_state_publisher"> <param name="publish_frequency" type="double" value="30.0" /> </node> <node name="fake_joint_calibration" pkg="rostopic" type="rostopic" args="pub /calibrated std_msgs/Bool true"/> <!-- 从yaml文件加载联合控制器的参数 --> <rosparam file="$(find pkg)/config/ctrl.yaml" command="load"/> <!-- 加载机器人模型描述参数 --> <group ns="car1"> <param name="robot_description" textfile="$(find pkg)/urdf/car1.urdf"/> <node name= "robot_state_publisher" pkg= "robot_state_publisher" type= "robot_state_publisher"> <param name="publish_frequency" type="double" value="20.0" /> </node> <node name="urdf_spawner" pkg="gazebo_ros" type="spawn_model" respawn="false" output="screen" args="-urdf -model car1 -param robot_description -x -1.5 -y -3 -z 0.05"/> <node name="controller_manager" pkg="controller_manager" type="spawner" respawn="true" output="screen" ns="/car1" args="left_rear_wheel_velocity_controller right_rear_wheel_velocity_controller left_front_wheel_velocity_controller right_front_wheel_velocity_controller left_steering_hinge_position_controller right_steering_hinge_position_controller joint_state_controller"/> </group> <group ns="car2"> <param name="robot_description" textfile="$(find pkg)/urdf/car2.urdf"/> <node name= "robot_state_publisher" pkg= "robot_state_publisher" type= "robot_state_publisher"> <param name="publish_frequency" type="double" value="20.0" /> </node> <node name="urdf_spawner" pkg="gazebo_ros" type="spawn_model" respawn="false" output="screen" args="-urdf -model car2 -param robot_description -x -1.5 -y -4 -z 0.05"/> <node name="controller_manager" pkg="controller_manager" type="spawner" respawn="true" output="screen" ns="/car2" args="left_rear_wheel_velocity_controller right_rear_wheel_velocity_controller left_front_wheel_velocity_controller right_front_wheel_velocity_controller left_steering_hinge_position_controller right_steering_hinge_position_controller joint_state_controller"/> </group> <node pkg="site_model" type="ServoCommands.py" name="ServoCommands" output="screen"/> </launch>
场景生成后调用radar_listener.py运行雷达消息滤波器。
#!/usr/bin/env python3 # 指定脚本使用系统环境中的 Python 3 解释器执行 """ Subscribe the raw radar topics and publish filtered messages. """ # 脚本功能描述:订阅原始的雷达话题并发布过滤后的消息 import rospy # 导入 ROS Python 客户端库,用于与 ROS 系统交互 import message_filters # 导入 message_filters 库,用于同步多个话题的消息 from per_msgs.msg._SensorMsgsRadar import SensorMsgsRadar # 导入 SensorMsgsRadar 消息类型,用于接收雷达数据 from per_msgs.msg._GeometryMsgsRadarObject import GeometryMsgsRadarObject # 导入 GeometryMsgsRadarObject 消息类型,用于处理雷达检测到的目标 from msgs.msg._MsgRadar import MsgRadar # 导入 MsgRadar 消息类型,用于发布过滤后的雷达数据 from msgs.msg._MsgRadarObject import MsgRadarObject # 导入 MsgRadarObject 消息类型,用于封装雷达检测到的目标信息 def append_obj(t: GeometryMsgsRadarObject, obj_list: list): # 定义函数,将 GeometryMsgsRadarObject 对象转换为 MsgRadarObject 并添加到列表中 obj = MsgRadarObject() # 创建 MsgRadarObject 对象 obj.distance = t.range # 设置目标的距离 obj.velocity = t.range_rate # 设置目标的速度 obj.angle_centroid = t.angle_centroid # 设置目标的中心角度 obj.pos_x = t.obj_vcs_posex # 设置目标在 X 轴上的位置 obj.pos_y = t.obj_vcs_posey # 设置目标在 Y 轴上的位置 obj.track_id = t.track_id # 设置目标的跟踪 ID obj_list.append(obj) # 将目标对象添加到列表中 def radar_listener(radar2: SensorMsgsRadar, radar3: SensorMsgsRadar): # 定义回调函数,处理从两个雷达话题接收到的消息 global pub # 使用全局变量 pub,即发布者对象 msg_radar = MsgRadar() # 创建 MsgRadar 对象,用于封装过滤后的雷达数据 # left msg_radar.num_left = int(radar2.total_front_left_esr_tracks) # 设置左侧雷达检测到的目标数量 for t in radar2.front_left_esr_tracklist: # 遍历左侧雷达检测到的目标列表 append_obj(t, msg_radar.objects_left) # 将每个目标转换为 MsgRadarObject 并添加到左侧目标列表中 # right msg_radar.num_right = int(radar3.total_front_right_esr_tracks) # 设置右侧雷达检测到的目标数量 for t in radar3.front_right_esr_tracklist: # 遍历右侧雷达检测到的目标列表 append_obj(t, msg_radar.objects_right) # 将每个目标转换为 MsgRadarObject 并添加到右侧目标列表中 # Add time stamp msg_radar.header.stamp = rospy.Time.now() # 设置消息的时间戳为当前时间 pub.publish(msg_radar) # 发布处理后的雷达消息 if __name__ == '__main__': # 主程序入口,确保脚本直接运行时执行以下代码 # initialize ROS node rospy.init_node("radar_listener", anonymous=True) # 初始化 ROS 节点,节点名称为 "radar_listener",允许匿名 # initialize publisher pub = rospy.Publisher("/radar_msgs_combined", MsgRadar, queue_size=10) # 创建发布者对象,用于向 "/radar_msgs_combined" 主题发布 MsgRadar 消息 # subscribe messages sub_radar_2 = message_filters.Subscriber("/ARS_408_21_2_Topic", SensorMsgsRadar) # 创建订阅者对象,订阅 "/ARS_408_21_2_Topic" 主题的 SensorMsgsRadar 消息 sub_radar_3 = message_filters.Subscriber("/ARS_408_21_3_Topic", SensorMsgsRadar) # 创建订阅者对象,订阅 "/ARS_408_21_3_Topic" 主题的 SensorMsgsRadar 消息 # syncronize time stamps sync = message_filters.ApproximateTimeSynchronizer([sub_radar_2, sub_radar_3], 1, 1) # 创建时间同步器,用于同步两个雷达话题的消息,允许 1 秒的时间差 sync.registerCallback(radar_listener) # 注册回调函数,当两个雷达话题的消息同步时调用 radar_listener 函数 print("\033[0;32mRadar Listener Initialized successfully.\033[0m") # 打印绿色文本,提示雷达监听器初始化成功 rospy.spin() # 进入 ROS 事件循环,保持节点运行分别调用ServoCommands.py和KeyControl.py控制小车行驶。
ServoCommand.py如下:
#!/usr/bin/python3 # 指定脚本使用 Python 3 解释器执行 import rospy # 导入 ROS Python 客户端库,用于与 ROS 系统交互 from std_msgs.msg import Float64 # 导入 ROS 标准消息类型 Float64,用于发布浮点数数据 from ackermann_msgs.msg import AckermannDriveStamped # 导入 AckermannDriveStamped 消息类型,用于接收速度和转向命令 def set_throttle_steer(data): # 定义回调函数,用于处理接收到的 AckermannDriveStamped 消息 # 创建发布者对象,用于向各个车轮速度控制器发布速度命令 pub_vel_left_rear_wheel = rospy.Publisher("/car1/left_rear_wheel_velocity_controller/command", Float64, queue_size=10) pub_vel_right_rear_wheel = rospy.Publisher("/car1/right_rear_wheel_velocity_controller/command", Float64, queue_size=10) pub_vel_left_front_wheel = rospy.Publisher("/car1/left_front_wheel_velocity_controller/command", Float64, queue_size=10) pub_vel_right_front_wheel = rospy.Publisher("/car1/right_front_wheel_velocity_controller/command", Float64, queue_size=10) # 创建发布者对象,用于向转向控制器发布转向角度命令 pub_pos_left_steering_hinge = rospy.Publisher("/car1/left_steering_hinge_position_controller/command", Float64, queue_size=10) pub_pos_right_steering_hinge = rospy.Publisher("/car1/right_steering_hinge_position_controller/command", Float64, queue_size=10) # 从接收到的消息中提取速度值,并乘以 28 进行缩放 throttle = data.drive.speed * 28 # 从接收到的消息中提取转向角度值 steer = data.drive.steering_angle # 向四个车轮的速度控制器发布速度命令 pub_vel_left_rear_wheel.publish(throttle) pub_vel_right_rear_wheel.publish(throttle) pub_vel_left_front_wheel.publish(throttle) pub_vel_right_front_wheel.publish(throttle) # 向两个转向控制器发布转向角度命令 pub_pos_left_steering_hinge.publish(steer) pub_pos_right_steering_hinge.publish(steer) def servo_commands(): # 定义主函数,用于初始化 ROS 节点并订阅消息 # 初始化 ROS 节点,节点名称为 'servo_commands1',anonymous=True 表示节点名称会自动添加唯一后缀 rospy.init_node('servo_commands1', anonymous=True) # 订阅 "/car1/ackermann_cmd_mux/output" 主题,接收 AckermannDriveStamped 消息,并调用 set_throttle_steer 回调函数处理 rospy.Subscriber("/car1/ackermann_cmd_mux/output", AckermannDriveStamped, set_throttle_steer) # 保持节点运行,等待消息并调用回调函数 rospy.spin() if __name__=='__main__': # 主程序入口,确保脚本直接运行时执行以下代码 try: # 调用 servo_commands 函数,启动 ROS 节点 servo_commands() except rospy.ROSInternalException: # 捕获 ROS 内部异常并忽略,防止程序崩溃 passKeyControl.py如下:
#! /usr/bin/env python # 指定脚本使用系统环境中的 Python 解释器执行 import rospy # 导入 ROS Python 客户端库,用于与 ROS 系统交互 import sys, select, termios, tty # 导入系统、终端设置和终端输入输出相关的库 from ackermann_msgs.msg import AckermannDriveStamped # 导入 AckermannDriveStamped 消息类型,用于发布速度和转向命令 def getKey(): # 定义函数,用于从终端获取单个按键输入 tty.setraw(sys.stdin.fileno()) # 设置终端为原始模式,禁用行缓冲和特殊字符处理 select.select([sys.stdin], [], [], 0) # 监听标准输入,等待用户按键 key = sys.stdin.read(1) # 读取用户按下的单个字符 termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings) # 恢复终端原始设置 return key # 返回用户按下的字符 settings = termios.tcgetattr(sys.stdin) # 获取当前终端设置并保存到变量 settings 中 def pub_cmd(): # 定义主函数,用于发布速度和转向命令 index = 1 # 定义车辆索引,默认为 1 rospy.init_node("pub_cmd") # 初始化 ROS 节点,节点名称为 "pub_cmd" # rospy.set_param("Car Index", 1) # 注释掉的代码:设置 ROS 参数 "Car Index" 为 1 pub = rospy.Publisher("/car"+str(index)+"/ackermann_cmd_mux/output", AckermannDriveStamped, queue_size=10) # 创建发布者对象,用于向指定主题发布 AckermannDriveStamped 消息 akm = AckermannDriveStamped() # 创建 AckermannDriveStamped 消息对象 while 1: # 进入无限循环,持续监听键盘输入 x=0 # 初始化速度变量,默认为 0 a=0 # 初始化转向角度变量,默认为 0 key = getKey() # 调用 getKey 函数获取用户按下的按键 rospy.loginfo("键盘已录入:%s", key) # 记录日志,显示用户按下的按键 if key == 'w': # 如果按下 'w' 键,设置前进速度 x=0.3 a=0 elif key == 's': # 如果按下 's' 键,设置后退速度 x=-0.3 a=0 elif key == 'a': # 如果按下 'a' 键,设置左转 x=0.3 a=0.7 elif key == 'd': # 如果按下 'd' 键,设置右转 x=0.3 a=-0.7 elif key == 'x': # 如果按下 'x' 键,停止车辆 x=0 a=0 elif key == 'o': # 如果按下 'o' 键,退出循环 break else: # 如果按下其他键,跳过当前循环 continue # akm.drive.speed = x*6.97674 # 注释掉的代码:将速度值乘以 6.97674 进行缩放 akm.drive.speed = x # 设置消息中的速度值 akm.drive.steering_angle = a*0.7 # 设置消息中的转向角度值,并乘以 0.7 进行缩放 # akm.drive.jerk = 2 # 注释掉的代码:设置消息中的 jerk 值 pub.publish(akm) # 发布消息到指定主题 if __name__=="__main__": # 主程序入口,确保脚本直接运行时执行以下代码 pub_cmd() # 调用 pub_cmd 函数,启动 ROS 节点 pass # 占位符,无实际作用调用src.RadCamFusion.fusion运行相机-激光雷达融合程序。
#!/usr/bin/env python3 # 指定脚本使用系统环境中的 Python 3 解释器执行 """ Get the radar and camera messages and do Radar-camera fusion. """ # 脚本功能描述:获取雷达和相机消息并进行雷达-相机融合 from time import perf_counter, sleep # 导入时间模块,用于性能计时和延时 from pathlib import Path # 导入路径处理模块,用于处理文件路径 import argparse # 导入命令行参数解析模块 import yaml # 导入 YAML 模块,用于解析 YAML 配置文件 import numpy as np # 导入 NumPy 模块,用于数值计算 from typing import List, Tuple # 导入类型提示模块,用于类型注解 import cv2 # 导入 OpenCV 模块,用于图像处理 from cv_bridge import CvBridge # 导入 ROS 的 OpenCV 桥接模块,用于 ROS 图像消息与 OpenCV 图像之间的转换 import rospy # 导入 ROS Python 客户端库,用于与 ROS 系统交互 import message_filters # 导入 message_filters 库,用于同步多个话题的消息 from sensor_msgs.msg import Image # Camera message # 导入 ROS 的 Image 消息类型,用于接收相机数据 from msgs.msg._MsgRadarObject import MsgRadarObject # 导入 MsgRadarObject 消息类型,用于处理雷达检测到的目标 from msgs.msg._MsgRadar import MsgRadar # Radar message # 导入 MsgRadar 消息类型,用于接收雷达数据 from msgs.msg._MsgRadCam import MsgRadCam # fusion message # 导入 MsgRadCam 消息类型,用于发布融合后的数据 from nav_msgs.msg import Odometry # 导入 Odometry 消息类型,用于接收里程计数据 from tf.transformations import euler_from_quaternion # 导入欧拉角转换函数,用于将四元数转换为欧拉角 from ..utils.yolo.yolo import YOLO # 导入自定义的 YOLO 模块,用于目标检测 from ..utils.kalman import Kalman # 导入自定义的卡尔曼滤波模块,用于目标跟踪 from ..utils.poi_and_roi import image_roi # 导入自定义的 ROI(感兴趣区域)处理模块 from ..utils.visualization import VisualAssistant # 导入自定义的可视化助手模块,用于结果可视化 from ..utils.sensor_and_obs import ObsBundle, RadarSensor, ImageSensor, SensorCluster # 导入自定义的传感器和观测模块,用于传感器数据管理和融合 import warnings # 导入警告模块,用于忽略警告信息 warnings.filterwarnings("ignore") # 忽略所有警告信息 time_counter = 0 # 全局变量,用于记录上一帧的时间 frame_counter = 0 # 全局变量,用于记录帧数 geometry = {} # 全局变量,用于存储几何配置信息 CUDA = False # 全局变量,指示是否使用 CUDA 加速 YOLO_DIR = Path() # 全局变量,存储 YOLO 模型的路径 OUTPUT_DIR = Path() # 全局变量,存储输出目录路径 SAVE_DIR = Path() # 全局变量,存储保存目录路径 LOAD_DIR = Path() # 全局变量,存储加载目录路径 BASE_IMAGE_FILE = Path() # 全局变量,存储基础图像文件路径 MODEL_SIZE = 0 # 全局变量,存储卡尔曼滤波模型的大小 A = np.empty((0, 0)) # 全局变量,存储卡尔曼滤波的状态转移矩阵 Q = np.empty((0, 0)) # 全局变量,存储卡尔曼滤波的过程噪声协方差矩阵 MAX_AGE = 0 # 全局变量,存储卡尔曼滤波的最大跟踪寿命 THRES_IOU = 0 # 全局变量,存储 IOU(交并比)阈值 THRES_SCENE = 0 # 全局变量,存储场景阈值 def my_timer() -> float: # 定义计时函数,用于计算帧率和帧间隔时间 global time_counter, frame_counter print('+------------------------+') frame_counter += 1 my_now = perf_counter() del_time = my_now - time_counter print("\033[0;36mFrame {}, FPS: {:.2f}\033[0m".format(frame_counter, 1.0 / del_time)) time_counter = my_now return del_time def my_file_loader() -> None: # 定义文件加载函数,用于加载配置文件和几何文件 global YOLO_DIR, OUTPUT_DIR, BASE_IMAGE_FILE, SAVE_DIR, LOAD_DIR global CUDA, A, Q, THRES_IOU, THRES_SCENE, MAX_AGE, MODEL_SIZE global geometry ROOT_DIR = Path(__file__).resolve().parents[2] YOLO_DIR = ROOT_DIR.joinpath("src/utils/yolo") # 加载配置文件 config_file = ROOT_DIR.joinpath("config/config.yaml") with open(config_file, 'r') as f: config = yaml.load(f, Loader=yaml.FullLoader) CUDA = config['use_cuda'] OUTPUT_DIR = ROOT_DIR.joinpath(config['output']['RadCamFusion_dir']) SAVE_DIR = OUTPUT_DIR.joinpath("save") SAVE_DIR.mkdir(exist_ok=True) LOAD_DIR = OUTPUT_DIR.joinpath("load") BASE_IMAGE_FILE = ROOT_DIR.joinpath(config['visual_assistant']['base_image']) # 加载几何文件 geometry_file = ROOT_DIR.joinpath("config/geometry.yaml") with open(geometry_file, 'r') as f: geometry = yaml.load(f, Loader=yaml.FullLoader) MODEL_SIZE = geometry['scene']['kalman']['model_size'] A = np.array(geometry['scene']['kalman']['A']).reshape((MODEL_SIZE, MODEL_SIZE)) Q = np.array(geometry['scene']['kalman']['Q']).reshape((MODEL_SIZE, MODEL_SIZE)) MAX_AGE = geometry['scene']['kalman']['max_age'] THRES_IOU = geometry['scene']['kalman']['iou_thres'] THRES_SCENE = geometry['scene']['kalman']['scene_thres'] def raw_radar_process(radar: List[MsgRadarObject]) -> np.ndarray: """ 将雷达对象转换为NumPy数组格式。 :param radar: 雷达对象列表 :return: 包含距离、角度和速度的数组 """ if len(radar) == 0: return np.empty((0, 3)) else: radar_data = np.array([np.array([obj.distance, obj.angle_centroid, obj.velocity]) for obj in radar]) return radar_data def msg2data(raw_radar_data: List[List[MsgRadarObject]], images: List[Image]) -> Tuple[List[np.ndarray], List[np.ndarray]]: """ 将原始雷达和图像数据转换为处理后的格式。 :param raw_radar_data: 原始雷达数据列表 :param images: 图像数据列表 :return: 处理后的雷达数据和图像数据 """ global yolo radar_data, image_data = [], [] for rd in raw_radar_data: radar_data.append(raw_radar_process(rd)) for img in images: image_data.append(image_roi(img, yolo)) return radar_data, image_data def msg2save(frame: int, save_path: Path, raw_radar_data: List[List[MsgRadarObject]], images: List[Image], odoms: List[Odometry], sensor_cluster: SensorCluster) -> None: """ 保存当前帧的雷达和图像数据。 :param frame: 当前帧编号 :param save_path: 保存路径 :param raw_radar_data: 原始雷达数据 :param images: 图像数据 :param odoms: 里程计数据 :param sensor_cluster: 传感器集群 """ radar_data = list(map(raw_radar_process, raw_radar_data)) p = save_path.joinpath(str(frame)) p.mkdir(exist_ok=True) for r, s in zip(radar_data, sensor_cluster.radar_sensors): np.savetxt(str(p.joinpath("{}.txt".format(s.name))), r) for i, s in zip(images, sensor_cluster.image_sensors): i = CvBridge().imgmsg_to_cv2(i, 'bgr8') cv2.imwrite(str(p.joinpath("{}.png".format(s.name))), i) for i, o in enumerate(odoms): pos = o.pose.pose.position ori = o.pose.pose.orientation ar, ap, ay = euler_from_quaternion([ori.x, ori.y, ori.z, ori.w]) data = np.array([pos.x, pos.y, pos.z, ar, ap, ay]) np.savetxt(str(p.joinpath("odom_{}.txt".format(i))), data) print("\033[0;32mSaved radar and image data of frame {} sucessfully.\033[0m".format(frame)) def save2data(frame: int, load_path: Path, sensor_cluster: SensorCluster) -> Tuple[List[np.ndarray], List[np.ndarray]]: """ 从存储中加载指定帧的雷达和图像数据。 :param frame: 帧编号 :param load_path: 加载路径 :param sensor_cluster: 传感器集群 :return: 加载的雷达和图像数据 """ p = load_path.joinpath(str(frame)) if not p.exists: raise FileNotFoundError("The frame doesn't exist.") radar_data, image_data = [], [] for s in sensor_cluster.radar_sensors: d = np.loadtxt(str(p.joinpath("{}.txt".format(s.name))), dtype=float) d = d.reshape((-1, s.box_size)) radar_data.append(d) for s in sensor_cluster.image_sensors: d = np.loadtxt(str(p.joinpath("{}.txt".format(s.name))), dtype=int) d = d.reshape((-1, s.box_size)) image_data.append(d) print("\033[0;32mLoaded radar and image data of frame {} sucessfully.\033[0m".format(frame)) return radar_data, image_data def fusion(radar: MsgRadar, image_2: Image, image_3: Image, image_5: Image, image_6: Image, image_7: Image, odom_1: Odometry, odom_2: Odometry) -> None: """ 执行雷达和图像的融合处理。 :param radar: 雷达消息 :param image_2: 第二张图像 :param image_3: 第三张图像 :param image_5: 第五张图像 :param image_6: 第六张图像 :param image_7: 第七张图像 :param odom_1: 第一辆车的里程计消息 :param odom_2: 第二辆车的里程计消息 """ global frame_counter global sensor_cluster, kf, va, args # Output FPS and frame info _ = my_timer() # Off-YOLO mode (only save radar and raw image data) if args.mode == 'off-yolo': msg2save(frame_counter, SAVE_DIR, [radar.objects_left, radar.objects_right], [image_2, image_3, image_5, image_6, image_7], [odom_1, odom_2], sensor_cluster) sleep(0.05) # fps ~ 20 return # Acquire radar and image data radar_data, image_data = msg2data([radar.objects_left, radar.objects_right], [image_2, image_3, image_5, image_6, image_7]) zs = fusion_core(radar_data, image_data, sensor_cluster) # Visual assistant if args.save: va.scene_output(frame_counter, zs, kf) # Publish msg_rad_cam = MsgRadCam() msg_rad_cam.num_overpass = zs.total_objs msg_rad_cam.header.stamp = rospy.Time.now() pub.publish(msg_rad_cam) def fusion_core(radar_data: List[np.ndarray], image_data: List[np.ndarray], sensor_cluster: SensorCluster) -> ObsBundle: """ 执行融合核心处理,包括传感器更新和观测。 :param radar_data: 雷达数据 :param image_data: 图像数据 :param sensor_cluster: 传感器集群 :return: 观测数据 """ global frame_counter global kf, va, args global A # 更新传感器并获取观测 sensor_cluster.update(radar_data, image_data) zs = sensor_cluster.observe() # 获取观测数据 print("\033[0;36mDetection\033[0m", zs, sep='\n') # 卡尔曼滤波 kf.flush(A, zs) # 更新卡尔曼滤波器 print("\033[0;36mKalman Filter\033[0m", kf, sep='\n') return zs if __name__ == '__main__': # 设置命令行参数 parser = argparse.ArgumentParser() parser.add_argument("-m", "--mode", choices=['normal', 'off-yolo', 'from-save'], type=str, default='normal', required=False, help="Mode.") parser.add_argument("-s", "--save", action='store_true', default=False, required=False, help="Save the track of the targets as images.") args = parser.parse_args() my_file_loader() # 加载配置和几何信息 # 初始化 # YOLO if args.mode == 'normal': yolo = YOLO(YOLO_DIR, cuda=CUDA) print("\033[0;32mYOLO initialized successfully.\033[0m") else: yolo = None print("\033[0;33mRunning in off-YOLO mode.\033[0m") # Radars rad_2 = RadarSensor("Radar_2", geometry['radars']['radar_2']) rad_3 = RadarSensor("Radar_3", geometry['radars']['radar_3']) # Cameras cam_2 = ImageSensor("Image_2", geometry['cameras']['camera_2'], rad_2.offset[2]) cam_3 = ImageSensor("Image_3", geometry['cameras']['camera_3'], rad_3.offset[2]) cam_5 = ImageSensor("Image_5", geometry['cameras']['camera_5'], rad_2.offset[2]) cam_6 = ImageSensor("Image_6", geometry['cameras']['camera_6'], rad_3.offset[2]) cam_7 = ImageSensor("Image_7", geometry['cameras']['camera_7'], rad_2.offset[2]) # 传感器集群 sensor_cluster = SensorCluster([rad_2, rad_3], [cam_2, cam_3, cam_5, cam_6, cam_7]) # 卡尔曼滤波器初始化 kf = Kalman(MODEL_SIZE, Q, THRES_SCENE, MAX_AGE) # 可视化助手初始化 va = VisualAssistant(BASE_IMAGE_FILE, OUTPUT_DIR) # From-Save 模型 if args.mode == 'from-save': frame = 0 while True: try: frame += 1 radar_data, image_data = save2data(frame, LOAD_DIR, sensor_cluster) except FileNotFoundError: print("\033[0;32mTraversed all {} frames.\033[0m".format(frame)) exit(0) finally: zs = fusion_core(radar_data, image_data, sensor_cluster) # 可视化助手 if args.save: va.scene_output(frame, zs, kf) # 初始化发布者 pub = rospy.Publisher('/radar_camera_fused', MsgRadCam, queue_size=10) # 初始化ROS节点 rospy.init_node('radar_camera_fusion', anonymous=True) # 订阅消息 msg_radar = message_filters.Subscriber('/radar_msgs_combined', MsgRadar) msg_image_2 = message_filters.Subscriber('/image_raw_2', Image) msg_image_3 = message_filters.Subscriber('/image_raw_3', Image) msg_image_5 = message_filters.Subscriber('/image_raw_5', Image) msg_image_6 = message_filters.Subscriber('/image_raw_6', Image) msg_image_7 = message_filters.Subscriber('/image_raw_7', Image) msg_odom_1 = message_filters.Subscriber('/car1/base_pose_ground_truth', Odometry) msg_odom_2 = message_filters.Subscriber('/car2/base_pose_ground_truth', Odometry) # 同步时间戳 sync = message_filters.ApproximateTimeSynchronizer( [msg_radar, msg_image_2, msg_image_3, msg_image_5, msg_image_6, msg_image_7, msg_odom_1, msg_odom_2], 1, 1) sync.registerCallback(fusion) # 注册回调函数 print("\033[0;32mRadar-camera Fusion Initialized Sucessfully.\033[0m") rospy.spin()3、实验操作
在~/catkin_ws文件夹中新建终端,输出以下命令开启仿真环境。
source devel/setup.bash roslaunch site_model spawn.launch
在~/catkin_ws文件夹中新建终端,输出以下命令运行雷达消息滤波器。
source devel/setup.bash rosrun site_model radar_listener.py
在~/catkin_ws文件夹中新建两个终端,分别输出以下命令开启小车控制。
cd src/site_model/src/scripts_wx/ python3 ServoCommands.pycd src/site_model/src/scripts_wx/ python3 KeyControl.py
在~/catkin_ws文件夹中新建终端,输出以下命令运行雷达-相机融合程序。
cd src/site_model/ python3 -m src.RadCamFusion.fusion --save
结果保存在/catkin_ws/src/site_model/output/RadCamFusion/scene文件夹中。

清理资源
计算巢—服务实例—复制服务实例ID,点击【删除】

在弹窗粘贴服务实例ID,并进行勾选,点击【确定删除】

完成安全验证后,即可成功释放实例。

回到云服务器ECS——实例,检查是否成功释放资源

关闭实验
在完成实验后,点击 结束实操

点击 取消 回到实验页面,点击 确定 跳转实验评分

请为本次实验评分,并给出您的建议,点击 确认,结束本次实验























