PHP 接入 AMQP
更新时间:
由于 PHP 三方库没有可用的支持 AMQP 1.0 的方案,为了支持 PHP 开发者更方便地接入 AMQP,我们提供了 STOMP 的扩展支持。
前置配置,可参考父文档,本篇仅介绍运行客户端部分。
运行客户端
下载 SDK
本示例提供基于Stomp PHP库的代码示例,使用STOMP协议和物联网平台云端通信。请访问Stomp PHP下载客户端和查看使用说明。
Stomp PHP SDK适用的PHP版本,请参见Stomp PHP SDK中composer.json中require
声明。
因Stomp PHP 5.0.0以下版本存在SDK断开后可能无法重连问题,建议您下载Stomp PHP 5.0.0或其以上版本SDK。详细说明,请参见Issues。
您可在PHP项目文件目录下,执行以下命令,下载Stomp PHP 5.0.0版本的SDK。
composer require stomp-php/stomp-php 5.0.0
代码示例
STOMP 客户端
<?php
require __DIR__ . '/vendor/autoload.php';
use Stomp\Client;
use Stomp\Network\Observer\Exception\HeartbeatException;
use Stomp\Network\Observer\ServerAliveObserver;
use Stomp\StatefulStomp;
use Stomp\Transport\Frame;
//参数说明,请参见AMQP客户端接入说明文档。
// 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考
$accessKey = "${YourAccessKey}""; // 建议使用: getenv("ACCESS_KEY")
$accessSecret = "${YourAccessSecret}""; // 建议使用: getenv("ACCESS_SECRET")
$consumerGroupId = "${YourConsumerGroupId}"";
$clientId = "${YourClientID}""; //自定义
//iotInstanceId:实例ID,可为空。
$iotInstanceId = "";
$timeStamp = round(microtime(true) * 1000);
//签名方法:支持hmacmd5,hmacsha1和hmacsha256。
$signMethod = "hmacsha1";
//userName组装方法,请参见AMQP客户端接入说明文档。
//若使用二进制传输,则userName需要添加encode=base64参数,服务端会将消息体base64编码后再推送。具体添加方法请参见下一章节"二进制消息体说明"。
$userName = $clientId . "|authMode=aksign"
. ",signMethod=" . $signMethod
. ",timestamp=" . $timeStamp
. ",authId=" . $accessKey
. ",iotInstanceId=" . $iotInstanceId
. ",consumerGroupId=" . $consumerGroupId
. "|";
$signContent = "authId=" . $accessKey . "×tamp=" . $timeStamp;
//计算签名,password组装方法,请参见AMQP客户端接入说明文档。
$password = base64_encode(hash_hmac("sha1", $signContent, $accessSecret, $raw_output = TRUE));
//接入域名,请参见实例控制台-开发配置-AMQP。
$client = new Client('tcp://${YourHost}":61613');
//服务端心跳监听。
$observer = new ServerAliveObserver();
$client->getConnection()->getObservers()->addObserver($observer);
//心跳设置,不需要云端发送心跳包
$client->setHeartbeat(30000, 0);
//设置 read 为非阻塞模式
$client->getConnection()->setReadTimeout(0);
$client->setLogin($userName, $password);
try {
$client->connect();
echo "连接成功!", PHP_EOL;
}
catch(Exception $e) {
echo "连接服务器失败,错误信息: " . $e->getMessage() , PHP_EOL;
exit(1);
}
// 使用直接的Frame方式订阅,而不是StatefulStomp
$frame = new Frame('SUBSCRIBE');
$frame->addHeaders([
'destination' => $consumerGroupId, // 使用您配置的消费组 ID
'id' => $consumerGroupId,
'ack' => 'auto'
]);
$client->sendFrame($frame);
$currentTime = date('Y-m-d H:i:s');
echo "[{$currentTime}]订阅成功!", PHP_EOL;
// 初始化心跳相关变量
$lastHeartbeatTime = time();
while (true) {
try {
// 检查连接状态
if (!$client->isConnected()) {
$currentTime = date('Y-m-d H:i:s');
echo "[{$currentTime}] 连接不存在,10秒后重新连接", PHP_EOL;
sleep(10);
$client->connect();
$frame = new Frame('SUBSCRIBE');
$frame->addHeaders([
'destination' => $consumerGroupId,
'id' => $consumerGroupId,
'ack' => 'auto'
]);
$client->sendFrame($frame);
$currentTime = date('Y-m-d H:i:s');
echo "[{$currentTime}] 重新连接并订阅成功", PHP_EOL;
}
// 直接从客户端读取消息帧
$frame = $client->readFrame();
if ($frame) {
$currentTime = date('Y-m-d H:i:s');
echo "[{$currentTime}] 收到消息:", PHP_EOL;
echo "Headers: ", print_r($frame->getHeaders(), true), PHP_EOL;
echo "Body: ", $frame->getBody(), PHP_EOL;
}
else {
// 每1秒发送一次心跳包
$now = time();
if ($now - $lastHeartbeatTime >= 30) {
$heartbeatFrame = new Frame('STOMP');
$client->sendFrame($heartbeatFrame);
$lastHeartbeatTime = $now;
echo "[{$currentTime}] 发送心跳包", PHP_EOL;
}
}
}
catch(HeartbeatException $e) {
$currentTime = date('Y-m-d H:i:s');
echo "[{$currentTime}] 服务器未在指定时间内发送心跳包", PHP_EOL;
$client->disconnect();
} catch(Exception $e) {
$currentTime = date('Y-m-d H:i:s');
echo "[{$currentTime}] 处理消息出错: ". $e->getMessage() , PHP_EOL;
$client->disconnect();
}
}
参数说明如下:
参数 | 说明 |
accessKey | 平台颁发给开发者的,用于调用接口的密钥ID和密钥,在系统管理>密钥管理获取。 |
accessSecret | |
consumerGroupId | 当前物联网平台对应实例中的消费组ID。 登录物联网平台控制台,在消息转发 > 服务端订阅 > 消费组管理查看您的消费组ID。 |
clientId | 表示客户端ID,用户自定义,长度不可超过64个字符。建议使用您的AMQP客户端所在服务器UUID、MAC地址、IP等唯一标识。 |
signMethod | 签名方式。支持如下三种:
|
host | AMQP接入域名。
|
该文章对您有帮助吗?