本文记录通过SDK收发消息时常见的问题。
开源客户端是否可以直接访问云上服务?
消息队列RabbitMQ版完全兼容开源RabbitMQ。开源RabbitMQ可以直接访问云上服务。您需要通过消息队列RabbitMQ版控制台生成静态用户名密码之后,通过静态账户直接访问云上服务。如何创建静态用户名密码,请参见创建静态用户名密码。
支持哪些语言的开源SDK?
开源RabbitMQ提供的多语言或框架SDK消息队列RabbitMQ版全部都支持。具体信息,请参见表 1。
是否可以直接使用开源RabbitMQ JMS Client?
消息队列RabbitMQ版不支持直接使用开源RabbitMQ JMS Client。您需要通过Maven配置依赖库才能接入消息队列RabbitMQ版收发消息。如何通过Maven配置依赖库来收发消息,请参见JMS概述。
JMS标准有哪些接口不支持?
如果是自动ACK,是否支持通过Reject来触发消息重新入队列?
不可以。您可以使用basicReject方法否定应答单条消息,或者使用basicNack方法否定应答一条或多条消息,并使消息重入队列。
参数 | 说明 |
---|---|
deliveryTag | Channel的消息投递的唯一标识符。 |
requeue | 被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;如果设置为false,则消息被丢弃或发送到死信Exchange。更多信息,请参见死信Exchange。 |
channel.basicConsume("test", false, "consumertag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Rejected: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId()); channel.basicReject(envelope.getDeliveryTag(), true); } });
参数 | 说明 |
---|---|
deliveryTag | Channel的消息投递的唯一标识符。 |
multiple | 是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;如果设置为false,则仅否定应答带指定deliveryTag的单条消息。 |
requeue | 被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;如果设置为false,则消息被丢弃或发送到死信Exchange。更多信息,请参见死信Exchange。 |
channel.basicConsume("test", false, "consumertag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Rejected: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId()); channel.basicNack(envelope.getDeliveryTag(), true, true); } });
如何设置Message ID?
如果您需追踪和识别消息,可以在消息队列RabbitMQ版的Producer客户端设置Message ID属性,为每条消息设置唯一标识符。
在消息队列RabbitMQ版的Producer客户端设置Basic.Properties
的message-id
属性。示例代码如下:
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId("messageid").build(); channel.basicPublish("${ExchangeName}", "BindingKey", true, props, ("消息发送Body").getBytes(StandardCharsets.UTF_8));
properties = pika.BasicProperties(app_id='example-publisher', content_type='application/json', 'message_id'='messageid')
$msg = new AMQPMessage($msgBody, ['application_headers'=>$amqpTable,'content_type' => 'text/plain', 'delivery_mode' => 2,'message_id' => 'messageid',]);
err = ch.Publish( "helloExchange", "hello", false, false, amqp.Publishing { ContentType: "text/plain", Body: []byte(body), MessageId: "messageId", })
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
如何删除队列消息?
您可以使用Java客户端库中queuePurge
方法删除某个队列的所有消息。示例代码如下:
channel.queuePurge("queue-name");