使用Exactly-Once投递语义收发消息
本文主要介绍如何使用云消息队列 RocketMQ 版的Exactly-Once投递语义收发消息,以保证消息的最终处理结果写入到数据库有且仅有一次。
背景信息
目前Exactly-Once投递语义仅在Java SDK中支持。相关SDK下载,请参见版本说明。
云消息队列 RocketMQ 版的Exactly-Once投递语义适用于接收消息>处理消息>结果持久化到数据库的流程,能够保证您的每一条消息消费的最终处理结果写入到您的数据库有且仅有一次,保证消息消费的幂等。
更多Exactly-Once投递语义的概念和典型使用场景,请参见Exactly-Once投递语义。
操作步骤
若要使用该语义,请按照以下步骤进行操作:
在应用中添加SDK包依赖和Spring 3.0以上版本的依赖。更多信息,请参见步骤一:添加依赖。
在用于存储消息消费结果的数据库中创建transaction_record表。更多信息,请参见步骤二:创建消费事务表。
重要存储消息消费结果的数据库系统必须支持本地事务。
在消息生产端使用PropertyKeyConst.EXACTLYONCE_DELIVERY属性设置打开Exactly-Once投递语义。更多信息,请参见步骤三:生产端开启Exactly-Once投递语义。
在消息消费端创建ExactlyOnceConsumer,并开启Exactly-Once的消费模式。更多信息,请参见步骤四:消费端开启Exactly-Once投递语义。
步骤一:添加依赖
云消息队列 RocketMQ 版的ExactlyOnceConsumer在客户端SDK ons-client-ext-1.8.4.Final中发布,若要使用Exactly-Once投递语义,需在应用中依赖该SDK。
另外,ExactlyOnceConsumer基于Spring实现了通过注解@MQTransaction开启Exactly-Once消费的方式,因此还需要在应用中增加Spring 3.0以上版本的依赖。
完整的依赖内容如下所示。
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client-ext</artifactId>
<version>1.8.4.Final</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring-version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring-version}</version>
</dependency>
步骤二:创建消费事务表
若要使用云消息队列 RocketMQ 版的Exactly-Once投递语义,您需要在业务处理结果持久化的数据库中创建一张transaction_record表,保证此表与存储业务处理结果的表在同一个数据库内,且该数据库支持本地事务。
目前,云消息队列 RocketMQ 版的Exactly-Once投递语义支持您的业务访问MySQL和SQLServer两种类型的数据源。这两种类型的数据源下transaction_record表的建表语句如以下所示。
MySQL
CREATE TABLE `transaction_record` ( `consumer_group` varchar(128) NOT NULL DEFAULT '', `message_id` varchar(255) NOT NULL DEFAULT '', `topic_name` varchar(255) NOT NULL DEFAULT '', `ctime` bigint(20) NOT NULL, `queue_id` int(11) NOT NULL, `offset` bigint(20) NOT NULL, `broker_name` varchar(255) NOT NULL DEFAULT '', `id` bigint(20) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`), UNIQUE KEY `message_id_unique_key` (`message_id`), KEY `ctime_key` (`ctime`), KEY `load_key` (`queue_id`,`broker_name`,`topic_name`,`ctime`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
SQLServer
CREATE TABLE transaction_record ( [consumer_group] varchar(128) NOT NULL , [message_id] varchar(255) NOT NULL , [topic_name] varchar(255) NOT NULL , [ctime] bigint NOT NULL , [queue_id] int NOT NULL , [offset] bigint NOT NULL , [broker_name] varchar(50) NOT NULL , [id] bigint IDENTITY(1,1) PRIMARY KEY ) CREATE NONCLUSTERED INDEX load_key ON transaction_record (queue_id, broker_name, topic_name, ctime); CREATE UNIQUE NONCLUSTERED INDEX message_id_uniq_key ON transaction_record (message_id); CREATE NONCLUSTERED INDEX ctime_key ON transaction_record (ctime);
对于企业版SQL Server数据库,建议您通过
ALTER DATABASE [USERDB] SET PARTNER SAFETY OFF;
打开异步模式,提高数据库读写性能。SQL Server数据库同时还可以通过
ALTER DATABASE [USERDB] SET DELAYED_DURABILITY=FORCED
的方式开启Delayed Durability选项,通过开启该选项可以降低对数据库的IOPS。
步骤三:生产端开启Exactly-Once投递语义
在生产端,您仅需要在创建Producer时,将PropertyKeyConst.EXACTLYONCE_DELIVERY
属性设置为true,即可打开Exactly-Once投递语义,示例代码如下。
/**
* TestExactlyOnceProducer启动
* 通过PropertyKeyConst.EXACTLYONCE_DELIVERY开启exactly-once投递语义。
*/
public class TestExactlyOnceProducer {
public static void main(String[] args) {
Properties producerProperties = new Properties();
producerProperties.setProperty(PropertyKeyConst.GROUP_ID,"{GROUP_ID}");
producerProperties.setProperty(PropertyKeyConst.AccessKey,"{accessKey}");
producerProperties.setProperty(PropertyKeyConst.SecretKey,"{secretKey}");
producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR,"{NAMESRV_ADDR}");
producerProperties.setProperty(PropertyKeyConst.EXACTLYONCE_DELIVERY,"true");
Producer producer = ExactlyOnceONSFactory.createProducer(producerProperties);
producer.start();
System.out.println("Producer Started");
for (int i = 0; i < 10; i++) {
Message message = new Message("{topic}", "{tag}", "mq send transaction message test".getBytes());
try {
SendResult sendResult = producer.send(message);
assert sendResult != null;
System.out.println(new Date() + " Send mq message success! msgId is: " + sendResult.getMessageId());
} catch (ONSClientException e) {
System.out.println("发送失败");
//出现异常意味着发送失败,为避免消息丢失,建议缓存该消息然后进行重试。
}
}
producer.shutdown();
}
}
步骤四:消费端开启Exactly-Once投递语义
使用云消息队列 RocketMQ 版的Exactly-Once投递语义进行消费时,消费端需要使用ExactlyOnceONSFactory调用createExactlyOnceConsumer接口创建ExactlyOnceConsumer,然后通过使用ExactlyOnceConsumer进行Exactly-Once模式的消费。
在使用ExactlyOnceConsumer时,需要注意以下两点:
创建ExactlyOnceConsumer时,可以通过设置PropertyKeyConst.EXACTLYONCE_DELIVERY属性打开或者关闭Exactly-Once投递语义。ExactlyOnceConsumer默认打开Exactly-Once投递语义。
使用ExactlyOnceConsumer消费时,在消息监听器MessageListener的consume方法中,您的业务处理逻辑需要使用MQDataSource对数据库的进行读写操作。
您可以选择以下任一方式在消费端开启Exactly-Once投递语义:
以非Spring方式开启Exactly-Once投递语义
示例如下。
/** * ExactlyOnceConsumer启动。 * 通过PropertyKeyConst.EXACTLYONCE_DELIVERY开启Exactly-Once投递语义。 */ public class TestExactlyOnceConsumer { private ExactlyOnceConsumer consumer; private TxMessageListener listener; public TestExactlyOnceConsumer() { Properties consumerProperties = new Properties(); consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, "{GID}"); consumerProperties.setProperty(PropertyKeyConst.AccessKey, "{accessKey}"); consumerProperties.setProperty(PropertyKeyConst.SecretKey, "{secretKey}"); consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, "{NAMESRV_ADDR}"); this.consumer = ExactlyOnceONSFactory.createExactlyOnceConsumer(consumerProperties); this.consumer.subscribe("{topic}", "", new TestExactlyOnceListener()); consumer.start(); System.out.println("Consumer start success."); } } /** * SimpleListener为使用ExactlyOnceConsumer消费的简单示例。 * 实现了一个简单的消息记录到数据库的过程,保证每条消息持久化到数据库有且仅有一次生效。 */ public class SimpleListener implements MessageListener { private MQDataSource dataSource; public SimpleListener() { this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}"); } @Override public Action consume(Message message, ConsumeContext context) { Connection connection = null; PreparedStatement statement = null; try { /** * 在此处对消费到的消息message做业务计算处理,使用MQDataSource将处理结果持久化到数据库系统。 * 此示例演示了消费并记录消息到数据库系统的场景,实际的业务处理按照:接收消息->业务处理->结果持久化的过程。 * Exactly-Once投递语义保证消息处理的持久化过程有且仅有一次。 */ connection = dataSource.getConnection(); statement = connection.prepareStatement("INSERT INTO app(msg, ctime) VALUES(?, ?)"); statement.setString(1, new String(message.getBody())); statement.setLong(2, System.currentTimeMillis()); statement.execute(); System.out.println("consume message success"); return Action.CommitMessage; } catch (Throwable e) { System.out.println("consume message fail:" + e.getMessage()); return Action.ReconsumeLater; } finally { if (statement != null) { try { statement.close(); } catch (Exception e) { } } if (connection != null) { try { connection.close(); } catch (Exception e) { } } } } }
MessageListener中以事务方式实现多项数据库操作和消息消费的事务性
示例如下。
/** * TestExactlyOnceListener实现。 * 实现了一个事务中对多个业务表进行更新的场景,保证事务内的操作有且仅有一次生效。 */ public class SimpleTxListener implements MessageListener { private MQDataSource dataSource; public SimpleTxListener() { this.dataSource = new MQDataSource("{url}", "{user}", "{passwd}", "{driver}"); } @Override public Action consume(Message message, ConsumeContext context) { Connection connection = null; Statement statement = null; try { /** * 在此处对消费到的消息message做业务计算处理,使用MQDataSource将处理结果持久化到数据库系统。 * 此范例演示了在一个事务内对多个表进行更新的业务场景,Exactly-Once投递语义保证事务内的操作有且仅有一次。 * 实际的业务处理按照:接收消息->业务处理->结果持久化的流程来设计。 */ connection = dataSource.getConnection(); connection.setAutoCommit(false); String insertSql = String.format("INSERT INTO app(msg, message_id, ctime) VALUES(\"%s\", \"%s\", %d)", new String(message.getBody()), message.getMsgID(), System.currentTimeMillis()); String updateSql = String.format("UPDATE consume_count SET cnt = count + 1 WHERE consumer_group = \"%s\"", "GID_TEST"); statement = connection.createStatement(); statement.execute(insertSql); statement.execute(updateSql); connection.commit(); System.out.println("consume message :" + message.getMsgID()); return Action.CommitMessage; } catch (Throwable e) { try { connection.rollback(); } catch (Exception e1) { } System.out.println("consume message fail"); return Action.ReconsumeLater; } finally { if (statement != null) { try { statement.close(); } catch (Exception e) { } } if (connection != null) { try { connection.close(); } catch (Exception e) { } } } } }
MessageListener中通过Springboot注解方式实现开启Exactly-Once投递语义
创建MessageListener,如下所示。
/** * MessageListener通过注解方式开启Exactly-Once。 * 只需在MessageListener的consume方法加上@MQTransaction即可开启。 * 适用于springboot微服务中使用。 */ public class TestMessageListener implements MessageListener { private final static String INSERTSQLFORMAT = "INSERT INTO app(message_id, ctime) VALUES(\"%s\", %d)"; private MQDataSource dataSource; @Override @MQTransaction public Action consume(Message message, ConsumeContext context) { Connection connection = null; Statement statement = null; try { String insertSql = String.format(INSERTSQLFORMAT, message.getMsgID(), System.currentTimeMillis()); connection = this.dataSource.getConnection(); statement = connection.createStatement(); statement.execute(insertSql); return Action.CommitMessage; } catch (Throwable e) { return Action.ReconsumeLater; } finally { if (statement != null) { try { statement.close(); } catch(Exception e) { } } if (connection != null) { try { connection.close(); } catch (Exception e) { } } } } public void setDataSource(MQDataSource dataSource) { this.dataSource = dataSource; } }
在consumer.xml中定义消费者Bean等信息。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="mqDataSource" class="com.aliyun.openservices.ons.api.impl.rocketmq.exactlyonce.datasource.MQDataSource" init-method="init" destroy-method="close"> <property name="url" value="{url}" /> <property name="username" value="{user}" /> <property name="password" value="{passwd}" /> <property name="driverClass" value="{driver}" /> </bean> <bean id="msgListener" class="com.aliyun.openservices.ons.api.impl.rocketmq.exactlyonce.spring.TestMessageListener"> <property name="dataSource" ref="mqDataSource"> <!--消费者配置信息--> </property> </bean> <!--Listener配置--> <!--多GID订阅同一个Topic,可以创建多个ConsumerBean--> <bean id="consumer" class="com.aliyun.openservices.ons.api.bean.ExactlyOnceConsumerBean" init-method="start" destroy-method="shutdown"> <property name="properties" > <!--消费者配置信息--> <props> <prop key="GROUP_ID">{gid}</prop> <prop key="AccessKey">{accessKey}</prop> <prop key="SecretKey">{secretKey}</prop> <!--将消费者线程数固定为50个 <prop key="ConsumeThreadNums">50</prop> --> </props> </property> <property name="subscriptionTable"> <map> <entry value-ref="msgListener"> <key> <bean class="com.aliyun.openservices.ons.api.bean.Subscription"> <property name="topic" value="{topic}"/> <property name="expression" value="{subExpression}"/><!--expression 即 Tag,可以设置成具体的 Tag,如 taga||tagb||tagc,也可设置成*。 *仅代表订阅所有 Tag,不支持通配--> </bean> </key> </entry> </map> </property> </bean> </beans>
运行已经与Spring集成好的消费者,如下所示。
public class ConsumeWithSpring { public static void main(String[] args) { /** * 消费者Bean配置在consumer.xml中,可通过ApplicationContext获取或者直接注入到其他类(例如具体的Controller)中。 */ ApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml"); System.out.println("Consumer Started"); }
MessageListener中通过MyBatis方式实现Exactly-Once投递语义
设计业务数据库写入操作DAO。
package com.aliyun.openservices.tcp.example.mybatis; public interface AppDAO { Integer insertMessage(String msgId); }
在mapper.xml文件中编写插入操作SQL语句。
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.aliyun.openservices.tcp.example.mybatis.AppDAO"> <insert id="insertMessage"> INSERT INTO app (message_id, ctime) VALUES (#{msgId}, now()) </insert> </mapper>
利用MQDataSource实现自定义的DataSourceFactory。
public class MQDataSourceFactoty extends DruidDataSourceFactory implements DataSourceFactory { protected Properties properties; @Override public void setProperties(Properties properties) { this.properties = properties; } @Override public DataSource getDataSource() { try { DruidDataSource druidDataSource = (DruidDataSource) createDataSource(this.properties); return new MQDataSource(druidDataSource); } catch (Exception e) { System.out.println("err:" + e.getMessage()); } return null; } }
在mybatis-config.xml中注册datasource为MQDataSourceFactory类型。
<configuration> <environments default="development"> <environment id="development"> <transactionManager type="JDBC"/> <!-- 配置数据库连接信息 --> <dataSource type="com.aliyun.openservices.tcp.example.mybatis.MQDataSourceFactoty"> <property name="driverClass" value="com.mysql.jdbc.Driver"/> <property name="url" value="{url}"/> <property name="username" value="{username}"/> <property name="password" value="{password}"/> <property name="initialSize" value="10" /> <property name="maxActive" value="20"/> </dataSource> </environment> </environments> <mappers> <mapper resource="mapper.xml"/> </mappers> </configuration>
在监听器中使用MyBatis的方式访问数据库,实现Exactly-Once的消费方式。
public class TestMybatisListener implements MessageListener { private static SqlSessionFactory sqlSessionFactory; static { String resource = "mybatis-config.xml"; Reader reader = null; try { reader= Resources.getResourceAsReader(resource); } catch (IOException e) { e.printStackTrace(); } sqlSessionFactory = new SqlSessionFactoryBuilder().build(reader); } @Override public Action consume(Message message, ConsumeContext context) { long begion = System.currentTimeMillis(); SqlSession sqlSession = null; try { sqlSession = sqlSessionFactory.openSession(); AppDAO appDAO = sqlSession.getMapper(AppDAO.class); appDAO.insertMessage(message.getMsgID()); System.out.println("consume : " + message.getMsgID()); sqlSession.commit(); return Action.CommitMessage; } catch (Exception e) { e.printStackTrace(); sqlSession.rollback(); return Action.ReconsumeLater; } finally { sqlSession.close(); } } }
注意事项
在使用云消息队列 RocketMQ 版的ExactlyOnceConsumer消费消息的过程中,请注意以下两点:
不可在云消息队列 RocketMQ 版控制台手动重置消费位点。若您主动重置位点到一个已经消费过的位点,则会失去Exactly-Once的投递语义。
每次数据库的INSERT或UPDATE操作会带来一次额外的更新操作,同时云消息队列 RocketMQ 版的ExactlyOnceConsumer也会有定时的查询或删除操作,会对数据库的IOPS带来一定增长。
- 本页导读 (1)