2019 年 1 月,孵化 6 个月的 RocketMQ-Spring 作为 Apache RocketMQ 的子项目正式毕业,发布了第一个 Release 版本 2.0.1。该项目是把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费。当时 RocketMQ 社区同学请 Spring 社区的同学对 RocketMQ-Spring 代码进行 review,引出一段
罗美琪(RocketMQ)和春波特(Spring Boot)的故事
。
时隔两年,RocketMQ-Spring 正式发布 2.2.0。在这期间,RocketMQ-Spring 迭代了数个版本,以 RocketMQ-Spring 为基础实现的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了
Spring 的官网
,Spring 布道师 baeldung 向国外同学介绍
如何使用 RocketMQ-Spring
,越来越多国内外的同学开始使用 RocketMQ-Spring 收发消息,RocketMQ-Spring 仓库的 star 数也在短短两年时间内超越了 Spring-Kafka 和 Spring-AMQP(注:两者均由 Spring 社区维护),成为 Apache RocketMQ 最受欢迎的生态项目之一。
RocketMQ-Spring 的受欢迎一方面得益于支持丰富业务场景的 RocketMQ 与微服务生态 Spring 的完美契合,另一方面也与 RocketMQ-Spring 本身严格遵循 Spring Messaging API 规范,支持丰富的消息类型分不开。
遵循 Spring Messaging API 规范
Spring Messaging 提供了一套抽象的 API,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。
1. 发送端
RocketMQ-Spring 在遵循 Spring Messaging API 规范的基础上结合 RocketMQ 自身的功能特点提供了相应的 API。在消息的发送端,RocketMQ-Spring 通过实现 RocketMQTemplate 完成消息的发送。如下图所示,RocketMQTemplate 继承 AbstractMessageSendingTemplate 抽象类,来支持 Spring Messaging API 标准的消息转换和发送方法,这些方法最终会代理给 doSend 方法,doSend 方法会最终调用 syncSend,由 DefaultMQProducer 实现。
// Build a SpringMessage for sending in transaction
Message msg = MessageBuilder.withPayload(..)...;
// In sendMessageInTransaction(), the first parameter transaction name ("test")
// must be same with the @RocketMQTransactionListener's member field 'transName'
rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null);
// Define transaction listener with the annotation @RocketMQTransactionListener
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// ... local transaction process, return bollback, commit or unknown
return RocketMQLocalTransactionState.UNKNOWN;
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return bollback, commit or unknown
return RocketMQLocalTransactionState.COMMIT;
}
}
@Service
@RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer")
public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> {
@Override
public String onMessage(String message) {
……
return "reply string";
}
}
1
2
3
4
5
6
7
8
1
2
3
4
5
6
7
8
RocketMQ-Spring 遵循 Spring 约定大于配置(Convention over configuration)的理念,通过启动器(Spring Boot Starter)的方式,在 pom 文件引入依赖(groupId:org.apache.rocketmq,artifactId:rocketmq-spring-boot-starter)便可以在 Spring Boot 中集成所有 RocketMQ 客户端的所有功能,通过简单的注解使用即可完成消息的收发。在
RocketMQ-Spring Github Wiki
中有更加详细的用法和常见问题解答。