rabbitTemplate rabbitmq 发布模式与消息确认
2、组成
- 生产者
- 消息队列
- 消费者
- 交换机:
- 隔离生产者和消息队列,充当二者中间体。
- 接受相应的消息并绑定到指定队列
3、三种发布模式
-
根据交换机类型不同,分为 3 种:
-
Direct <直接>:1 对 1----- 一个消息只能被一个消费者消费
-
Topic <主题>:1 对多 ----- 一个消息可以被多个消费者消费 (纠正:一个消息可被多个队列接收,若多个消费者监听同一个队列,会以轮询方式被多个消费者接收,本质依然是 1 对 1,类 Direct。详述见下方评论!)
将路由和某个模式匹配,# 匹配一个或者多个,* 匹配一个。例如 Good.insert Good.delete,则 Good.# 都能获得
-
Fanout <分列>:广播
-
4、Direct 发布模式
-
核心依赖及 application.properties(除端口外不变)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
spring.rabbitmq.host=101.132.43.162
spring.rabbitmq.port=5672
spring.rabbitmq.username=rebbitmq
spring.rabbitmq.password=3321
-
生产者工程
- 4.1、创建消息队列
两种方式:代码创建
/**
* 模式为direct,直接发送到对应的消息队列
* @return
*/
@Bean
public Queue directQueue(){
return new Queue("direct");
}
(2)RabbitMQ 管理界面手动添加队列
- 4.2、发送端
API:amqpTemplate.convertAndSend(“队列名”,“消息内容”)
此处队列名必须与创建的队列一致。
public void send(String str) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("direct",message);
}
- 4.3、测试类
rabbitMqService.send("direct消息");
-
消费者工程
-
4.1、创建消息队列
同上:略
- 4.2、接收端
@RabbitListener(queues = “direct”):监听器监听指定队列
@RabbitHandler
@RabbitListener(queues = "direct")
public void Handler(Message message, Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息编号:"+messageId);
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消息消费:"+s);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
5、Topic 发布模式
-
生产者工程
-
5.1、创建消息队列
API:BindingBuilder.bind(指定队列).to(交换机).with(路由键);
路由键相当于队列名
@Bean(name = "message")
public Queue queue(){
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queue1(){
return new Queue("topic.messages");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("exchange");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding binding(@Qualifier("message") Queue message, TopicExchange topicExchange){
return BindingBuilder.bind(message).to(topicExchange).with("topic.message");
}
@Bean
public Binding binding1(@Qualifier("messages") Queue queue,TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
}
![img](mq 使用. assets/20190312154751707.png
- 5.2、发送端:
API:amqpTemplate.convertAndSend(“交换机名”,“路由键”,“消息内容”)
RabbitMQ 将会根据第二个参数去寻找有没有匹配此规则的队列, 如果有, 则把消息给它, 如果有不止一个, 则把消息分发给匹配的队列 (每个队列都有消息!)
@Override
public void send1(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchange","topic.message",message);
}
- 5.3、测试类
rabbitMqService.send1("topic.message消息");
-
消费者工程
@RabbitHandler
@RabbitListener(queues = "topic.message")
public void Handler1(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("topic.message消息编号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("topic.message消息体" + str);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
@RabbitHandler
@RabbitListener(queues = "topic.messages")
public void Handler2(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("topic.messages消息编号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("topic.messages消息体" + str);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
注意:topic 模式中只有消费端监听的路由键符合发送端的路由规则(路由键决定)的队列才会收到消息。
6、Fanout 发布模式
-
广播:发送到路由器的消息会使得绑定到该路由器的每一个 Queue 接收到消息, 这个时候就算指定了 Key, 或者规则 (即上文中 convertAndSend 方法的参数 2), 也会被忽略!
-
交换机类型:FanoutExchange
-
API:amqpTemplate.convertAndSend(“交换机名”,“ ”,“消息内容”);// 路由键被忽略
-
消费端:只要是绑定到该交换机上的都能收到消息。
在配置类中增加
@Bean(name = "fanoutMessage")
public Queue queueC(){
return new Queue("fanoutMessage");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding binding2(@Qualifier("fanoutMessage") Queue queue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
生产者服务
@Override
public void send2(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("fanoutExchange","",message);
}
消费者服务
@RabbitHandler
@RabbitListener(queues = "fanoutMessage")
public void Handler3(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("fanoutMessage消息编号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("fanoutMessage消息体" + str);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
人工确认消息发送和消费成功
在 application.properties 中增加配置
#设置消息确认成功确认后进行回调
#NONE值是禁用发布确认模式,是默认值
#CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
#SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
#调用confirmCallback接口
spring.rabbitmq.publisher-confirm-type=correlated
#设置成功到达路由器后不会调用returnsCallback 方法,否则则会调用returnsCallback
spring.rabbitmq.publisher-returns=true
消息队列配置
package cn.atiaozao.springbootmqprovider.config;
import com.rabbitmq.client.AMQP;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
/**
* 模式为direct,直接发送到对应的消息队列
* @return
*/
@Bean
public Queue directQueue(){
return new Queue("direct");
}
@Bean(name = "message")
public Queue queue(){
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queue1(){
return new Queue("topic.messages");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("exchange");
}
//消息绑定
@Bean
public Binding binding(@Qualifier("message") Queue message, TopicExchange topicExchange){
return BindingBuilder.bind(message).to(topicExchange).with("topic.message");
}
//表达式绑定
@Bean
public Binding binding1(@Qualifier("messages") Queue queue,TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
}
}
消息服务方法
package cn.atiaozao.springbootmqprovider.service.impl;
import cn.atiaozao.springbootmqprovider.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.UUID;
@Component
public class RabiitmqServiceImpl implements RabbitMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmMessageImpl confirmMessageImpl;
/**
* 使用@PostConstruct 在注入完成之后进行设置消息push 确认的回调方法
*
*/
@PostConstruct
public void init(){
//消息成功到达指定队列的回调方法
rabbitTemplate.setConfirmCallback(confirmMessageImpl);
//消息如果未成功到达队列的
rabbitTemplate.setReturnCallback(confirmMessageImpl);
}
@Override
public void send(String str) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("direct",message);
}
@Override
public void send1(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchange","topic.message",message);
}
@Override
public void send2(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchange","",message);
}
}
消息成功到达路由器后的确认回调方法
package cn.atiaozao.springbootmqprovider.service.impl;
import lombok.extern.java.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Log
@Component
public class ConfirmMessageImpl implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息确认成功"+b+"|返回错误消息"+s+"|附带数据"+correlationData);
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息发送失败|"+message.getBody());
}
}
消息消费服务
#设置消息队列使用手工确认的方式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
消息消费监听
package cn.atiaozao.springbootmqconsumer.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Handler;
@Component
public class DirectConsumerHander {
@RabbitHandler
@RabbitListener(queues = "direct")
public void Handler(Message message, Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息编号:"+messageId);
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消息消费:"+s);
try {
//消息人工确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
@RabbitHandler
@RabbitListener(queues = "topic.message")
public void Handler1(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("topic.message消息编号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("topic.message消息体" + str);
//消息人工确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
@RabbitHandler
@RabbitListener(queues = "topic.messages")
public void Handler2(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("topic.messages消息编号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("topic.messages消息体" + str);
//消息人工确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}