up:: RabbitMQ入门生产者消费者演示
说明:
(1) 本篇博客主要介绍RabbitMQ交换机的内容,包括fanout模式,direct模式,topic模式,headers模式等;
一:RabbitMQ的交换机,在前面粗浅的介绍其概念,但我们没有详细介绍:
(1) 在【RabbitMQ简介】中,简单介绍了交换机的概念;
● 交换机的主要作用是,Producer在发送消息时候,是通过交换机向对应的队列中发送消息的;
● 然后,可以使用RoutingKey来指定,交换机和哪些队列绑定;即,Routing Key路由键就是专用于决定交换机和队列是如何进行绑定的;
简单来说,消息发送方即Producer生产者,在发布消息时候,可以设置交换机和路由键,以把消息发送到指定的队列中去;
即,交换机的主要作用是,Producer生产者发布消息用的;
(2) 在【RabbitMQ入门生产者消费者演示】中,演示通过客户端连接使用RabbitMQ的时候,也涉及到了交换机的一点内容;
●在这儿,我们发布消息的时候,没有指定交换机(即使用的是RabbitMQ的默认交换机);如果我们没有指定交换机,那么我们就需要设置RoutingKey,以让RabbitMQ知道,Producer发送的消息要发送到那个(些)队列上去;
二:RabbitMQ中,交换机的四种工作模式:简述;
(1)fanout:广播模式的交换机;
● 即,如果Producer生产者在发布消息的时候,采用的是fanout模式的交换机,那么RabbitMQ会向【所有与这个交换机绑定的队列】都发送这个(同样的)消息;
● 所以,如果Producer生产者在发布消息的时候,采用的是fanout模式的交换机;其发布消息的时候,就不用设置RoutingKey了;因为,其会向【所有与这个交换机绑定的队列】都发送这个(同样的)消息,就不需要RoutingKey来指定向哪些队列发送消息了;
(2)direct:直接模式;
● 其会根据,RoutingKey路由键,去决定交换机把消息发送到哪个(些)队列上去;
● Producer在发送消息时,如果使用direct模式;那么,我们在发送消息的时候,就需要指定一个Routing
Key(否则交换机,将不知道把消息发送到哪个(些)队列上去);
● 然后Consumer在接收消息的时候,需要创建一个队列;然后,Consumer要想使用队列接收到某个消息,那么在通过队列接受消息时,就需要设置对应的Routing
Key;
(3)topic模式;
● Topic模式可以认为是direct模式的一个威力加强版;
● Topic之所以,可以应对“多个条件”的复杂逻辑;主要是其可以进行模糊匹配;
● 其中,*可以匹配任何一个单词(不包含”.”的任意字符串);例如,item. ***** 可以匹配item.pikapika,或者item. &gdygj。diu,或者item. ji。。joidjfi%djl 但不能匹配item. ji。。joidjf.i%djl 实测如此;貌似在RabbitMQ的Topic模式中,”.”是一种特殊的存在,其可以作为分隔符,每隔一个”.”就表示多了一个单词);
可以匹配0至多个单词;例如:item.# 能够匹配 item. insert.abc 或者 item. insert ;(实测如此)
● 关于,topic模式,可以看下下面的案例;
三:交换机类型之:fanout广播模式;
0.问题引入;
(1) 一个适合使用fanout模式交换机的,场景;
试想一下这种场景:我们要构建一个日志记录系统,其情况和需求如下:
● 其中,比如我们的【业务代码】会发出日志消息,然后我们的【日志记录代码】负责接收并存储日志消息;
● 然后,同时既希望把日志存到磁盘上,同时也希望把日志打印到屏幕上;
● 那么,对于这种需求,我们就需要有多个接收日志的地方;即,也就是,我们有一个发送日志消息的,但却有多个接收日志消息的(对于同样的一条消息,这个多个接收日志消息的地方,都需收到);
那么,对于这种需求,那么Producer在发布消息的时候,使用fanout模式的交换机,就很适合;
● 如果Producer生产者在发布消息的时候,使用fanout模式交换机,会有如下特点:如果Producer生产者,在发布消息的时候,采用fanout模式的交换机;那么,在发布消息时,那么所有与这个交换机绑定的队列(一般这个队列声明在Consumer消费者处),都会收到这个消息;
● 即,Producer生产者在发布消息的时候,如果采用fanout模式的交换机;那么,其发布消息的时候,会向RabbitMQ中、所有、与这个交换机绑定的,队列中,都发送这个消息;
● 如果,RabbitMQ中、当前、没有与这个交换机绑定的队列,那么消息就相当于是一个无的之矢,这个消息就会在设定的存活时间内容,静静的等待;一旦有【与这个交换机绑定的队列】被创建后,这个消息就会把发送到那个队列中去;(PS:这一点,可能存在不准确、甚至错误的地方;但是,这也是自己经过实测的……)
(2) 关于日志存储,还有第二个需求;
● 需求:由于日志存储这个业务的特殊性,我们只想记录当前产生的日志,即【日志记录代码】只需要记录【日志记录代码启动后,业务代码产生的日志】;(因为,以前产生的日志,自有当时的日志记录代码负责;;;;同时,日志也有时效性,记录一些旧日志的意义不是很大;;;);
● 那么,针对这个需求,Consumer在接收消息的时候,就可以使用临时队列;(有关临时队列的内容,在下面会介绍)
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】;
EmitLog类:
说明:
(1) 类内容说明;
(2) 关于fanout交换机,和,RabbitMQ默认交换机,在使用时的区别;
● 在【RabbitMQ入门生产者消费者演示】中,我们一开始演示RabbitMQ的时候,并没有主动去关心交换机的内容,当时使用的是默认交换机;;;所以,在当时我们发送消息的时候,为了能让RabbitMQ知道要被这个消息发送到哪个(些)队列中,我们需要指定Routing Key;;;然后,当时,我们在Producer生产者中就定义了一个队列,然后把这个队列作为RoutingKey,就相当于告诉RabbitMQ,我们发送的这个消息就是发送到这个队列中的;;;自然,RabbitMQ通过默认交换机,把消息发送到那个队列中后,这个消息就在队列中了;;;如果,当前没有任何Consumer消费者连接这个队列,那么这个队列中的消息,就会默默的等待;;;如果,有Consumer消费者连接了这个队列,那么Consumer就可以从这个队列中获取消息;;;而且,使用默认交换机的时候,RabbitMQ会采用分发的方式,把队列中的消息,分发给所有(连接了这个队列的)的Consumer,每个消息只会发送给一个Consumer(比如,有1,2,3,4四条消息;然后,有两个队列;那么第一个队列会分到1,3两条消息,第二个队列会分到2,4两条消息);;;这在 【RabbitMQ入门生产者消费者演示】有着很明显的体现;
● 但是,Producer生产者在发送消息的时候,如果采用的是fanout模式的交换机;;;那么,其发布消息的时候,会向RabbitMQ中、所有、与这个交换机绑定的,队列中,都发送这个消息;;;如果,RabbitMQ中、当前、没有与这个交换机绑定的队列,那么消息就相当于是一个无的之矢,这个消息就会在设定的存活时间内容,静静的等待;一旦有【与这个交换机绑定的队列】被创建后,这个消息就会把发送到那个队列中去;(PS:这一点,可能存在不准确、甚至错误的地方;但是,这也是自己经过实测的……);;;同时,如果Producer生产者在发布消息的时候,采用的是fanout模式的交换机;其发布消息的时候,就不用设置RoutingKey了;因为,其会向【所有与这个交换机绑定的队列】都发送这个(同样的)消息,就不需要RoutingKey来指定向哪些队列发送消息了;
(3) 运行结果;发现,我们成功了发送了消息;(但其实,因为当前在RabbitMQ中,没有在任何Consumer中定义队列,来与这个交换机绑定;;;所以,这个消息就相当于是一个无的之矢,这个消息就会在设定的存活时间内容,静静的等待;一旦有【与这个交换机绑定的队列】被创建后,这个消息就会把发送到那个队列中去;(PS:这一点,可能存在不准确、甚至错误的地方;但是,这也是自己经过实测的……);;)
2.创建ReceiveLog类,这个类可以认为是一个存储日志的类;其角色是【Consumer消费者】;
ReceiveLog类:
说明:
(1.1) 类内容说明:
(1.2) 针对我们这儿的【日志存储】业务来说,因为我们有个需求是【日志记录代码只用记录当前生成的日志信息,以前产生的日志就没必要记录了;(因为,以前产生的日志,自有当时的日志记录代码负责)】;所以,Consumer消费者,在接收日志消息时候,选用的是一种临时队列;
● 临时队列的声明方式;
● 上面我们使用临时队列的方式:我们每次启动一次Consumer消费者的时候,都会创建一个全新的(因为临时队列的名字是RabbitMQ随机、自动生成的)临时队列;
● 临时队列的本质特点,自己并没有很好的掌握;
● 有关临时队列,在接收消息时的特点,在下面会介绍;
(1.3) 然后,就可以把队列绑定到交换机上,然后去队列中拿消息了;
(2)强调说明:
接收消息的时候,Consumer是与队列绑定的,和交换机是不绑定的;这是一种解耦的关系;即,Producer通过交换机把消息发送到队列中去,然后Consumer从队列中拿消息;
(3) 运行结果;
● 情况1:Producer在Consumer未启动前(即,临时队列未创建前)发送的消息,在Consumer启动后(即,临时队列创建后),临时队列是接收不到的消息的;
● 情况2:我们先启动Consumer(即,先创建临时队列),然后Produce发送的消息,临时队列是可以收到的;
● 情况3:启动多个Consumer(自然,每个Consumer的队列是不同的),然后Producer再发送消息;
这正好可以满足日志系统的,第一个需求;
● 情况4(情况1的变种):如果Consumer中使用的不是临时队列,Producer在Consumer未启动前(其队列未创建前)发送的消息,在Consumer启动后(即,临时队列创建后),队列是可以接受的到的;
所以,可以感受到,RabbitMQ的内容,其实还是有一些的,对于这些自己暂时不了解的内容,暂时搁置吧;以后有精力再了解;;而且,以后的项目中,万一消息中间件没有使用RabbitMQ,而是使用的RockerMQ呐,对吧;
四:交换机类型之:direct直接模式;
0.direct模式,简介;
(1)direct模式简介;
● Producer在发送消息时,如果使用direct模式;那么,我们在发送消息的时候,就需要指定一个Routing
Key(否则交换机,将不知道把消息发送到哪个(些)队列上去);
● 然后Consumer在接收消息的时候,需要创建一个队列;然后,Consumer要想使用队列接收到某个消息,那么在通过队列接受消息时,就需要设置对应的Routing Key;
(2)direct模式的一个应用场景;
● 比如,日志分为info,warning,error等等级;对于【把日志打印在控制台】来说,info,warning,error这三个等级都可以打印;对于【把日志写在日志文件中】来说,有时我们只需要写error等级的即可;
● 所以,对于这种场景,使用fanout广播模式的交换机,就不太合适了;
● 对于这种场景,我们可以使用direct模式;业务代码通过交换机发送日志消息的时候,给日志消息设置上对应的RoutingKey(此时,Routing
Key就可以设置为info,warning或error这三个日志等级);
● 那么此时,【把日志打印在控制台】在通过队列去接收消息的时候,这个队列接收消息的时候Routing
Key就可以设置为info,warning或error这三个日志等级;;;【把日志写在日志文件中】在通过队列去接收消息的时候,就可以设置Routing
key为error这一个日志等级
Tips:
定义(声明)一个交换机的时候,如果RabbitMQ已经存在叫这个名字的交换机时(PS:RabbitMQ创建交换机的时候,貌似可以设置过期时间等;RabbitMQ这种更复杂的内容,后续再了解吧),如果交换机类型不一致,就会报IO异常;
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】;
EmitLog类:
说明:
(1) 使用direct类型的交换机;
(2) 通过交换机发送消息的时候,设置对应的Routing Key;
2.创建ReceiveLog1类,这个类可以认为是【把日志打印在控制台】那个接收日志信息的类;其角色是【Consumer消费者】;
ReceiveLog1类:
说明:
(1) 定义交换机,声明一个临时队列;
(2.1) 然后,将队列绑定到交换机上去;同时,因为这儿想要接收info,warning,error类型的(而Producer在发送消息的时候,正好也对应的把routing
key设为了info,warning,error);;;所以,这儿在接收消息的时候,该队列的routingkey也可以设为info,warning,error;
(2.2) PS:经过实测,这儿绑定三次,其是同一个队列;
3.创建ReceiveLog2类,这个类可以认为是【把日志写在日志文件中】那个接收日志信息的类;其角色是【Consumer消费者】;
ReceiveLog2类:
说明:
(1) 这儿在接收消息是时候,只接受error类型的日志,所以其接收消息的时候routingKey设为了error;
4.运行效果;
先启动ReceiveLog1和ReceiveLog2两个Consumer(其本质,就是先创建两个接收消息的临时队列);
然后,启动EmitLog这个Producer,去发送消息;
那么,此时ReceiveLog1和ReceiveLog2两个Consumer,接收消息的情况为;
发现结果OK,符合预期效果;
五:交换机类型之:topic模式;
Topic模式可以认为是direct模式的一个威力加强版: 逻辑阐述如下;
(1) 先回顾一下,direct模式:
(2) 但是,对于一些业务比较复杂的情况,为了能够更加灵活的应对,就需要引入Topic模式;
● 比如,在实际开发中,对于日志,我们不仅要根据日志的严重性(info,warning,error)来区分日志,还要根据日志的来源(日志是来源于【商品模块】,还是【用户模块】);
● 此时,我们有一个日志记录系统,其只想记录来自于【用户模块】的error级别的日志信息;那么,对于这种需求,使用direct模式的交换机自然也是可以完成的,只是不太灵活,不简洁;
● 那么,对于这种“多个条件”的复杂逻辑,使用topic就可以比较好的应对;
(3) Topic模式简介;
● Topic之所以,可以应对“多个条件”的复杂逻辑;主要是其可以进行模糊匹配;
● 其中,*可以匹配任何一个单词(不包含”.”的任意字符串);例如,item. ***** 可以匹配item.pikapika,或者item. & gdygj。diu,或者item. ji。。joidjfi%djl 但不能匹配item. ji。。joidjf.i%djl (实测如此;貌似在RabbitMQ的Topic模式中,”.”是一种特殊的存在,其可以作为分隔符,每隔一个”.”就表示多了一个单词);
可以匹配0至多个单词;例如:item.# 能够匹配 item. insert.abc 或者 item. insert ;(实测如此)
● 关于,topic模式,可以看下下面的案例;
1.创建EmitLog类,这个类可以认为是一个产生日志的业务类;其角色是【Producer生产者】;
EmitLog类:
说明:
(1) 这儿我们使用topic模式的交换机;
(2) 然后,这儿我们发送11条消息,分别设置不同的Routing Key;(然后,为了方便观察效果,我们把消息的内容,设置成和Routing
Key的内容一样了);
2.创建Receive1类,其角色是【Consumer消费者】;然后,其接收消息时,设置的Routing Key是【.orange.】;
Receive1类:
说明:
(1) 类内容说明;
3.创建Receive2类,其角色是【Consumer消费者】;然后,其接收消息时,设置的Routing
Key是【..rabbit】和【lazy.#】;
Receive2类:
说明:
(1) 类内容说明;
4.运行效果;
(1) 先启动Receive1和Receive2两个Consumer(本质上就是先创建对应的两个临时队列);
(2) 然后,启动EmitLog发送消息;
(3) Receive1和Receive2接收消息的情况是;(结果就不详细分析了,稍微一瞅,就能明白)
● Receive1,接收消息,队列绑定交换机时,Routing Key设置的是【.orange.】;
● Receive2,接收消息,队列绑定交换机时,Routing Key设置的是【..rabbit】和【lazy.#】;
(4)可以看到,RabbitMQ的Topic模式的交换机的提供的这个功能,能够帮助我们灵活的应对实际业务开发中的复杂需求;;;;;但是,在实际开发中具体使用RabbitMQ的规范和常用写法,需要积累和总结;
● 如果,我们在发送消息的时候,设置了properties,并且传入了指定的规则;那么,我们就可以使用设置的规则发送给指定的队列;
● 只是,在实际开中,很少使用这种模式;
以下内容参考自【RabbitMQ的常见队列模型,simple模式、work模式、fanout模式、direct模式、topic模式、headers模式、RPC】
,该文的作者是【毅大师】;
不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。