Doing some proofs of concept for the brand new microservices architecture using Spring Boot and Spring Cloud I’ve chosen few backing technologies for tests. To create serious web application we need at least a database, something for messaging and distributed session/cache. The most common choices as always come from Spring-supported technologies and my ones are PostgreSQL and MongoDB as databases (depending on if we need transactional database in the specific service or not), Redis as a distributed memory grid and RabbitMQ for messaging. Now it’d be nice to have everything working transparently under one transaction manager.
However, from the list above MongoDB doesn’t support transactions at all. I feel this will finally be a problem in a complex application, but we will skip this technology in this little article. Redis on the other hand supports some kind of transactions which is even supported by RedisTemplate
in Spring Data. However, Spring support for Redis seems to raw for the usages I’ve been thinking about. For those much better implementation is provided by Redisson, which does not support transactions. But finally I don’t see the requirement of using transaction with mentioned usecases (distrubuted session, cache, etc.).
So what we have left is RabbitMQ and Postgres. Both of them support transactions and both are supported by PlatformTransactionManager
in Spring Framework.
Planning the architecture I’ve extracted some java core library, which can be used as a base for all java-based microservices in the application ecosystem. I assume some of services will use RabbitMQ only, while other services will use additionally MongoDB and/or PostgreSQL database. It’d be nice if this core library could autodetect technologies used by specific service and prepare transaction manager accordingly.
To achieve this I started the development from the service using RabbitMQ, i.e. from the service with only org.springframework.boot:spring-boot-starter-amqp
dependency. If there’s no other transactional resources RabbitMQ can be transactional using its own RabbitTransactionManager
. However this manager should be explicitly created by the application, what I do together with RabbitAdmin
bean (can be used to define queues, exchanges and bidings) and making sure the RabbitTemplate
has enabled transactional channel:
@Configuration
@ConditionalOnClass(EnableRabbit.class)
public class RabbitConfiguration {
@Autowired protected RabbitTemplate rabbitTemplate;
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@PostConstruct
protected void init() {
// make rabbit template to support transactions
rabbitTemplate.setChannelTransacted(true);
}
}
On the one hand I want to have a support for RabbitMQ in the core library, but on the other hand I don’t want it enforces using RabbitMQ in all services. This is why in the core library, which the class above belongs to, I use rabbit dependencies only at compile time. With gradle this means following:
dependencies {
compileOnly('org.springframework.boot:spring-boot-starter-amqp')
}
So, only if EnableRabbit
class is available on the classpath, this configuration will be applied. The same declarations are included in the original RabbitMQ initialization code from Spring AMQP. Now, let’s define some simple controller to send the example event transactionally:
@RestController
@RequestMapping("/rabbit")
public class RabbitController {
public static final Logger logger = LoggerFactory.getLogger(RabbitController.class);
@Autowired protected RabbitTemplate rabbitTemplate;
@Autowired protected RabbitAdmin rabbitAdmin;
@PostConstruct
protected void init() {
// define simple exchnage, queue and binding
rabbitAdmin.declareExchange(new TopicExchange("myexchange", true, false));
rabbitAdmin.declareQueue(new Queue("myqueue", true, false, true, null));
rabbitAdmin.declareBinding(new Binding("myqueue", DestinationType.QUEUE, "myexchange", "myroutingkey", null));
}
@RequestMapping(value = "/send")
@Transactional
public String send() {
String event = "Message: " + UUID.randomUUID();
logger.info("Sending message: {} with transaction manager: {}", event, transactionManager.getClass().getSimpleName());
rabbitTemplate.convertAndSend("myexchange", "myroutingkey", event);
return String.format("Event sent: %s", event);
}
}
In the @PostConsruct
part we create exchange, queue and binding between the exchange and queue using routing key. I use TopicExchange
due to its flexibility. And in send()
method we send simple String
event to this exchange. Note that send()
method is marked as @Transactional
.
To test if everything works well we also need a consumer which receives the message. Let’s make it simple and @Transactional
:
@Component
@Transactional
@RabbitListener(queues = {"myqueue"})
public class RabbitReceiver {
public static final Logger logger = LoggerFactory.getLogger(RabbitReceiver.class);
@Autowired protected PlatformTransactionManager transactionManager;
@RabbitHandler
public void receive(String event) {
logger.info("Receiving message: {} with transaction manager: {}", event, transactionManager.getClass().getSimpleName());
}
}
Now, we can test everything by going to send()
method URL and we can check in logs that everything works as expected:
Sending message: Message: 53348ef5-e538-4330-88d1-ed09e8455c73 with transaction manager: RabbitTransactionManager
Receiving message: Message: 53348ef5-e538-4330-88d1-ed09e8455c73 with transaction manager: RabbitTransactionManager
So, what does exactly mean for RabbitMQ to have transactional sending? To check this we introduce another transactional sending method throwing exception:
public class RabbitController {
// [...]
@RequestMapping(value = "/send-error")
@Transactional
public String sendError() {
String event = "Message: " + UUID.randomUUID();
logger.info("Sending message: {} with transaction manager: {}", event, transactionManager.getClass().getSimpleName());
rabbitTemplate.convertAndSend("myexchange", "myroutingkey", event);
throw new RuntimeException("Test exception");
}
}
After execution of this method we can see following things in log:
Sending message: Message: 3918c4e6-88eb-4b23-babf-5df3978bc687 with transaction manager: RabbitTransactionManager
Uncaught exception thrown
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.RuntimeException: Test exception
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:986)
[...]
As you can see, there’s no Receiving message […] line at all. The message was passed to RabbitTemplate
but has never been sent due to exception thrown. So, it works as expected on the sender side.
To test what @Transactional
means on receiver side we introduce a small change for receiver which throws an exception in certain cases:
public class RabbitReceiver {
@RabbitHandler
public void receive(String event) {
logger.info("Receiving message: {} with transaction manager: {}", event, transactionManager.getClass().getSimpleName());
if (event.startsWith("Error"))
throw new RuntimeException("Test receiver exception");
}
}
And we also introduce third method in the controller:
public class RabbitController {
// [...]
@RequestMapping(value = "/send-receive-error")
@Transactional
public String sendReceiveError() {
String event = "ErrorMessage: " + UUID.randomUUID();
logger.info("Sending message: {} with transaction manager: {}", event, transactionManager.getClass().getSimpleName());
rabbitTemplate.convertAndSend("myexchange", "myroutingkey", event);
return String.format("Event sent: %s", event);
}
}
After executing this method here is what we have in logs:
Sending message: ErrorMessage: 57b786ef-214e-4317-b485-9d584bb04f7a with transaction manager: RabbitTransactionManager
Receiving message: ErrorMessage: 57b786ef-214e-4317-b485-9d584bb04f7a with transaction manager: RabbitTransactionManager
Retries exhausted for message (Body:'[B@1a741231(byte[365])' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myexchnage, receivedRoutingKey=myroutingkey, deliveryTag=1, consumerTag=amq.ctag-DB6loookbw1OrNTpmIlqsQ, consumerQueue=myqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void RabbitReceiver.receive(java.lang.String)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
[...]
Caused by: java.lang.RuntimeException: Test receiver exception
at RabbitReceiver.receive(RabbitReceiver.java:36)
[...]
This message was delivered, but then exception was thrown. By default such message is then removed from the queue: there are no more attempts of receiving this message and the only trace of this attempt is available in the application log. The message dissapears.
For receiving side @Transactional
works by default this way, however we may try to make it more useful. For example we can configure redelivery in case of exceptions. This can be done using spring config:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
multiplier: 2.0
stateless: true
With such config the message will be delivered three times before giving up, and each redelivery will be preceded by growing delay. However with this config the message still dissapers after three attempts. To avoid this it can be directed to the deadletter queue, but this requires a little more code managing with RabbitAdmin
and queues creation. Let’s redefine our configuration:
public class RabbitController {
// [...]
@PostConstruct
protected void init() {
// first we will define deadletter exchange and queue
rabbitAdmin.declareExchange(new DirectExchange("deadletter-exchange", true, false));
rabbitAdmin.declareQueue(new Queue("deadletter-queue", true, false, false, null));
rabbitAdmin.declareBinding(new Binding("deadletter-queue", DestinationType.QUEUE, "deadletter-exchange", "deadletter-routingkey", null));
// define simple exchange, queue with deadletter support and binding
rabbitAdmin.declareExchange(new TopicExchange("myexchange", true, false));
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "deadletter-exchange");
args.put("x-dead-letter-routing-key", "deadletter-routingkey");
rabbitAdmin.declareQueue(new Queue("myqueue", true, false, true, args));
rabbitAdmin.declareBinding(new Binding("myqueue", DestinationType.QUEUE, "myexchange", "myroutingkey", null));
}
// [...]
}
This changes following things in our sendReceiveError()
method execution:
2017-12-29 19:33:43.265 Sending message: ErrorMessage: 57b786ef-214e-4317-b485-9d584bb04f7a with transaction manager: RabbitTransactionManager
2017-12-29 19:33:43.280 Receiving message: ErrorMessage: 57b786ef-214e-4317-b485-9d584bb04f7a with transaction manager: RabbitTransactionManager
2017-12-29 19:33:44.285 Receiving message: ErrorMessage: 57b786ef-214e-4317-b485-9d584bb04f7a with transaction manager: RabbitTransactionManager
2017-12-29 19:33:46.287 Receiving message: ErrorMessage: 57b786ef-214e-4317-b485-9d584bb04f7a with transaction manager: RabbitTransactionManager
2017-12-29 19:33:46.293 Retries exhausted for message (Body:'[B@1a741231(byte[365])' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myexchnage, receivedRoutingKey=myroutingkey, deliveryTag=1, consumerTag=amq.ctag-DB6loookbw1OrNTpmIlqsQ, consumerQueue=myqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public void RabbitReceiver.receive(java.lang.String)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
[...]
Caused by: java.lang.RuntimeException: Test receiver exception
at RabbitReceiver.receive(RabbitReceiver.java:36)
[...]
The message was tried to be delivered three times with different delays (I preserved timestamps to show this). Furthermore, after these three failed attempts the message has been routed to the deadletter queue:
The above was about RabbitTransactionManager
and the microservice using only RabbitMQ, but no SQL database. Now we can check what happens if we use PostgreSQL database. This can be achieved by adding following dependencies to the service:
dependencies {
compile('org.springframework.boot:spring-boot-starter-data-jpa')
runtime('org.postgresql:postgresql')
}
However, for services using both RabbitMQ and PostgreSQL our previous RabbitTransactionManager
bean definition will cover JpaTransactionManager
which is by default provided by Spring Data JPA autoconfiguration, and which is strictly tied to transactional database transactions. To avoid this we need to make RabbitTransactionManager
bean conditional, using for example @ConditionalOnMissingClass
:
public class RabbitConfiguration {
// [...]
@Bean
@ConditionalOnMissingClass("org.springframework.orm.jpa.JpaTransactionManager")
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
// [...]
}
Let’s now check what happens to our test methods in this configuration. RabbitController:send()
shows that JpaTransactionManager
is now active:
Sending message: Message: a416137e-6959-4049-8e35-067425a50918 with transaction manager: JpaTransactionManager
Receiving message: Message: a416137e-6959-4049-8e35-067425a50918 with transaction manager: JpaTransactionManager
RabbitController:sendError()
shows the JpaTransactionManager
works well with RabbitTemplate
on sending messages:
Sending message: Message: a9ffb328-a6fe-46c9-b9bb-aaba91f4de19 with transaction manager: JpaTransactionManager
Uncaught exception thrown
org.springframework.web.util.NestedServletException: Request processing failed; nested exception is java.lang.RuntimeException: Test exception
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:986)
[...]
RabbitController:sendReceiveError()
shows the JpaTransactionManager
also works transparently with the receiver side in the same way as RabbitTransactionManager
previously:
2017-12-29 20:23:39.833 Sending message: ErrorMessage: 86f82442-5f91-4af6-8c1a-10eca204a8ac with transaction manager: JpaTransactionManager
2017-12-29 20:23:39.838 Receiving message: ErrorMessage: 86f82442-5f91-4af6-8c1a-10eca204a8ac with transaction manager: JpaTransactionManager
2017-12-29 20:23:40.844 Receiving message: ErrorMessage: 86f82442-5f91-4af6-8c1a-10eca204a8ac with transaction manager: JpaTransactionManager
2017-12-29 20:23:42.848 Receiving message: ErrorMessage: 86f82442-5f91-4af6-8c1a-10eca204a8ac with transaction manager: JpaTransactionManager
2017-12-29 20:23:42.853 Retries exhausted for message (Body:'[B@1a741231(byte[365])' MessageProperties [headers={}, contentType=text/plain, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myexchnage, receivedRoutingKey=myroutingkey, deliveryTag=1, consumerTag=amq.ctag-D4Uq5z6FhX4nt_mXhz7xHg, consumerQueue=myqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method 'public RabbitReceiver.receive(java.lang.String)' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:190)
[...]
Caused by: java.lang.RuntimeException: Test receiver exception
at RabbitReceiver.receive(RabbitReceiver.java:36)
[...]
So, nothing needs to be done more to have RabbitMQ as a transactional resource in Spring Boot application. It works with its own RabbitTransactionManager
, as well as with JpaTransactionManager
, and probably with any other transaction manager provided by Spring.