up:: RabbitMQ管理后台简介
说明:
(1) 内容说明:
● 这儿我们会创建一个项目,演示RabbitMQ最基础的内容;
通过,这个最简单的例子,先了解:如何使用RabbitMQ,如何连接RabbitMQ,如何发送消息,如何接收消息等最最基础的内容;
● 然后,会演示多个消费者平均压力的内容;
一:第一个生产者和消费者;
0.创建一个maven项目rabbitmq,演示用;
RabbitMQ支持多语言,其中就包括Java;同时,RabbitMQ的API丰富,我们可以利用RabbitMQ针对Java提供的客户端的一系列API,来完成操作;
1.引入RabbitMQ的Java客户端的,依赖;
说明:
(1) 依赖说明;
2.第一个生产者;
Send类:
说明:
(0) RabbitMQ有三个所谓的“端”:这儿梳理一下;
● 服务端:就是我们安装了RabbitMQ的Linux系统;我们把RabbitMQ启动后,这个服务器中的RabbitMQ就是服务端;
● 管理后台:RabbitMQ的管理后台,就是我们在【RabbitMQ管理后台简介】中,演示的在web端查看、管理RabbitMQ的一些配置的地方;(PS:要想在网页上访问管理后台,那么部署RabbitMQ的服务器,就要开发RabbitMQ的15672端口)
● 客户端:比如,在这儿,我们在我们的Java项目中,引入RabbitMQ提供的Java客户端后,我们就可以通过客户端去操作RabbitMQ了;(PS:要想在远端服务器,通过客户端访问RabbitMQ服务,那么部署RabbitMQ的服务器,就要开发RabbitMQ的5672端口)
(1) 看注释;这儿的连接RabbitMQ的套路,都是相对固定的,后面如有需要,我们似乎也可以创建一个工具类;
(2) 我们要想在其他服务器,通过客户端访问RabbitMQ,那么部署RabbitMQ的服务器,就要开发RabbitMQ的5672端口;关于Linux防火墙设置,可以参考【Linux环境搭建与Redis应用(三)】;
(3) 如果我们的RabbitMQ是安装在本机的话,就可以设置本机地址,然后其默认会使用guest用户去登录,所以设置用户就可以省略;
(4) 运行结果:
3.第一个消费者;
Recv类:
说明:
(1) 接收消息的时候,我们不需要关闭channel和connection,这个类会一直运行,即其会一直看队列中有没有数据,有的话就拿过来消费;
(2) 看注释;
(3) 我们接收到消息后,具体消息怎么处理,写在了handleDelivery()方法中;
(4) 运行结果;
4.瞅一眼RabbitMQ管理后台;
二: 根据消息内容的不同,采取不同的处理策略;(这儿演示的是一种思路)
在实际开发中,有时可能需要根据消息内容的不同,采取不同的处理策略;本篇博客,就来演示一下;
NewTask类:
说明:
(1) 看注释;
(2) 类内容说明;
Worker类:
说明:
(1) 看注释;
(2) 类内容说明;
(3) 运行结果;
三:当消息相对较多时,多个消费者平均压力;
可以看到,在(二:根据消息内容的不同,采取不同的处理策略;(这儿演示的是一种思路))中,单靠Worker处理10条信息,需要10秒多;试想,我们再创建一个Worker,10条消息的处理速度就会提升;
1.多个消费者,平均压力:引入;
一个直接的想法就是,既然我们想让多个Worker一起来接收并处理消息,那么我们可不可以再运行一次Worker类?
进行一下配置,让Worker类可以有多个实例并行运行;
那么,此时的效果;
默认情况下,如果有多个Worker的话,那么这多个Worker会并行工作;RabbitMQ会根据已启动worker和消息的情况,按顺序把每个消息发送给下一个Worker,在消息数量上是平均分配的;即,比如上面有10条消息,两个消费者,那么无论两个消费者处理能力如何,每个消费者都会收到5条消息;>
但是,这种平均,是任务量的平均分配,而不一定是真实工作量(压力)的分配;比如,下面的案例;
此时,再发送消息,观察效果:
而,为了解决这种,纯按数量和顺序分配,却没有按工作量(压力)平均分配的问题;就是下面公平派遣的内容了;
2.公平派遣;
公平派遣是在有多个消费者,而且是循环调度的情况下,来说的;公平派遣机制下,RabbitMQ会根据消费者的压力,来决定是否派遣;
要想实现公平派遣,还需要加入消息确认机制;主要目的是,消费者处理完消息后,发送一个确认消息,这样一来,RabbitMQ就知道你处理完了,然后就会给你发送下一个消息;
修改Worker这个Consumer类的内容如下:
说明:
(1) 修改内容说明;
(2) 注意:此时,一定要记得主动发送【消息处理完成的通知】;否则,RabbitMQ就不知道,这个消息是否被处理完了,其就会认为没有被处理完,于是后续的消息就会得不到处理,越积越多,消耗内存;
(3) 运行效果;此时,我们重开两个Worker,然后运行NewTask,发送10条消息;