本文介绍如何对点云数据进行压缩。
功能简介
点云是海量点的集合,存储点云不仅会消耗大量的内存,而且不利于传输,也没有足够大的带宽支持将不经过压缩的点云直接在网络层进行传输,因此需要对点云进行压缩。智能媒体管理服务提供的点云压缩功能可以对点云数据的时空信息进行分析处理,大幅降低数据量与储存成本,实现高质量且实时的点云数据编码解码方案。
应用场景
点云压缩可用于以下场景。
场景名称 | 场景说明 |
自动驾驶 | 压缩自动驾驶场景中雷达扫描产生的点云数据。 |
数字文博 | 为文物数字化场景中产生的文物点云信息提供压缩方案。 |
智慧城市 | 对城市三维重建产生的点云信息进行有效的数据压缩,实现数据流畅的渲染展示。 |
混合现实 | 为混合现实场景中的点云数据提供实时编码解码能力。 |
前提条件
- 已创建并获取AccessKey。具体操作,请参见创建AccessKey。
- 已开通OSS服务、创建存储空间并上传文件到存储空间。具体操作,请参见控制台上传文件。
- 已开通智能媒体管理服务。具体操作,请参见开通产品。
- 已通过智能媒体管理控制台创建项目。具体操作,请参见创建项目。说明
- 您也可以调用API接口创建项目。具体操作,请参见CreateProject - 创建项目。
- 您可以调用ListProjects - 列出所有项目信息的列表接口列出指定地域下已创建的所有项目信息。
使用方法
调用CreateCompressPointCloudTask - 创建点云压缩任务接口对点云数据进行压缩。
任务开始执行后,任务信息只保存7天,超过7天则无法再获取。您可以通过以下几种方式及时获取任务信息:
调用GetTask - 获取任务信息或ListTasks - 列出任务接口获取返回的
TaskId
,查看任务信息。在与智能媒体管理相同的地域下开通MNS服务,并配置订阅关系,及时获取任务信息通知,异步通知消息格式请参见异步通知消息格式。关于MNS SDK的更多信息,请参见步骤四:接收和删除消息。
在与智能媒体管理相同的地域下开通RocketMQ服务,并创建RocketMQ 4.0实例、Topic、Group,及时获取任务信息通知,异步通知消息格式请参见异步通知消息格式。关于RocketMQ使用的更多信息,请参见调用HTTP协议的SDK收发普通消息。
在与智能媒体管理相同的地域下开通并接入事件总线EventBridge服务,及时获取任务信息通知。更多信息,请参见智能媒体管理IMM事件。
压缩信息
项目名称:test-project
点云文件的OSS地址:oss://test-bucket/test-object.pcd
压缩算法:octree
输出文件的OSS地址:oss://test-bucket/test-target-object
请求示例
{
"ProjectName": "test-project",
"SourceURI": "oss://test-bucket/test-object.pcd",
"UserData": "{\"ID\":\"testuid\",\"Name\": \"test-user\",\"Avatar\": \"http://test.com/testuid\"}",
"TargetURI": "oss://test-bucket/test-target-object",
"PointCloudFileFormat": "pcd",
"CompressMethod": "octree",
"PointCloudFields": "[\"xyz\"]",
"OctreeOption": "{\"PointResolution\": 0.001, \"OctreeResolution\": 0.01, \"DoVoxelGridDownDownSampling\": false, \"LibraryName\": \"pcl\"}",
"KdtreeOption": "{\"CompressionLevel\": 8, \"QuantizationBits\": 10, \"LibraryName\": \"draco\"}"
}
返回示例
{
"TaskId": "PointCloudCompress-091d9b4a-8726-47bf-b699-d24c7daff63c",
"RequestId": "8B0EEA2E-35FE-500F-BCDB-E2E7CA11DF7A",
"EventId": "180-1S7Q8gHbVXJf2lekgesKvlNM1VR"
}
示例代码
以Python SDK为例,点云压缩的完整示例代码如下。
# -*- coding: utf-8 -*-
# This file is auto-generated, don't edit it. Thanks.
import os
import sys
from typing import List
from alibabacloud_imm20200930.client import Client as imm20200930Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_imm20200930 import models as imm_20200930_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> imm20200930Client:
"""
使用AccessKey ID&AccessKey Secret初始化账号Client。
@param access_key_id:
@param access_key_secret:
@return: Client
@throws Exception
"""
config = open_api_models.Config(
access_key_id=access_key_id,
access_key_secret=access_key_secret
)
# 填写访问的IMM域名。
config.endpoint = f'imm.cn-beijing.aliyuncs.com'
return imm20200930Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
# 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
# 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
# 本示例通过从环境变量中读取AccessKey,来实现API访问的身份验证。如何配置环境变量,请参见https://help.aliyun.com/document_detail/2361894.html。
imm_access_key_id = os.getenv("AccessKeyId")
imm_access_key_secret = os.getenv("AccessKeySecret")
client = Sample.create_client(imm_access_key_id, imm_access_key_secret)
octree_option = imm_20200930_models.OctreeOption(
point_resolution=0.001,
octree_resolution=0.01,
do_voxel_grid_down_down_sampling=False,
library_name='pcl'
)
create_compress_point_cloud_task_request = imm_20200930_models.CreateCompressPointCloudTaskRequest(
source_uri='oss://test-bucket/test-object.pcd',
target_uri='oss://test-bucket/test-target-object',
point_cloud_file_format='pcd',
compress_method='octree',
point_cloud_fields=[
'[\"xyz\"]'
],
project_name='test-project',
octree_option=octree_option
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印API的返回值。
client.create_compress_point_cloud_task_with_options(create_compress_point_cloud_task_request, runtime)
except Exception as error:
# 如有需要,请打印错误信息。
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
# 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。
# 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。
# 本示例通过从环境变量中读取AccessKey,来实现API访问的身份验证。如何配置环境变量,请参见https://help.aliyun.com/document_detail/2361894.html。
imm_access_key_id = os.getenv("AccessKeyId")
imm_access_key_secret = os.getenv("AccessKeySecret")
client = Sample.create_client(imm_access_key_id, imm_access_key_secret)
octree_option = imm_20200930_models.OctreeOption(
point_resolution=0.001,
octree_resolution=0.01,
do_voxel_grid_down_down_sampling=False,
library_name='pcl'
)
create_compress_point_cloud_task_request = imm_20200930_models.CreateCompressPointCloudTaskRequest(
source_uri='oss://test-bucket/test-object.pcd',
target_uri='oss://test-bucket/test-target-object',
point_cloud_file_format='pcd',
compress_method='octree',
point_cloud_fields=[
'[\"xyz\"]'
],
project_name='test-project',
octree_option=octree_option
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印API的返回值。
await client.create_compress_point_cloud_task_with_options_async(create_compress_point_cloud_task_request, runtime)
except Exception as error:
# 如有需要,请打印错误信息。
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])