由于服务端升级、服务端重启、网络抖动等原因,服务端和客户端的网络连接可能会断开。本文介绍如何在客户端设置Connection和Topology自动恢复,使Connection和Topology在网络连接断开后自动恢复,避免断连对您的业务造成影响。

触发原因

触发Connection自动恢复的原因如下:

  • Connection的I/O抛出异常。
  • Socket读取操作超时。
  • 检测到服务端心跳丢失。

恢复方法

注意 4.0.0及以上版本Java客户端默认开启Connection和Topology自动恢复,您无需在代码中设置。

在客户端开启Connection和Topology(Queue、Exchange、Binding、Consumer)自动恢复的方法如下:

  • factory.setAutomaticRecoveryEnabled(boolean):用于开启或关闭Connection自动恢复。
  • factory.setNetworkRecoveryInterval(long):用于设置重试时间间隔。如果Connection自动恢复异常,设置了Connection自动恢复的客户端将在一段固定时间间隔(默认为5秒)后重试。
  • factory.setTopologyRecoveryEnabled(boolean):用于开启Topology自动恢复。Topology包括Queue、Exchange、Binding、Consumer。

示例代码

开启Connection和Topology自动恢复的客户端示例代码如下:

ConnectionFactory factory = new ConnectionFactory();
// 设置接入点,在消息队列AMQP版控制台实例详情页面获取。
factory.setHost("xxx.xxx.aliyuncs.com");
// ${instanceId}为实例ID,从阿里云AMQP版控制台实例详情页面获取。
// ${AccessKey}阿里云身份验证,在阿里云管理控制台创建。
// ${SecretKey}阿里云身份验证,在阿里云管理控制台创建。
factory.setCredentialsProvider(new AliyunCredentialsProvider("${AccessKey}", "${SecretKey}", "${instanceId}"));
// 设置Vhost名称,请确保已在消息队列AMQP版控制台上创建。
factory.setVirtualHost("${VhostName}");
// 默认端口,非加密端口5672,加密端口5671。
factory.setPort(5672);
// 基于网络环境设置合理的超时时间。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// 开启Connection自动恢复。
factory.setAutomaticRecoveryEnabled(true);
// 设置Connection重试时间间隔为10秒。
factory.setNetworkRecoveryInterval(10000);
// 开启Topology自动恢复。
factory.setTopologyRecoveryEnabled(true);
Connection connection = factory.newConnection();                      

恢复限制

Connection自动恢复的限制如下:

  • Connection断开需要一定的时间检测。要确保这段时间内发送的消息不丢失,需使用Publisher Confirms实现可靠发送。
  • Channel异常导致Connection断开时,不会触发Connection自动恢复。Channel异常通常为应用级别的问题,需要使用方自行处理。
  • Connection自动恢复不会使Channel也自动恢复。