1. 消息队列?
消息队列(MQ,Message Queue),全称是Message Queue,很直白。
是一种应用程序和应用程序进行消息通信的中间件。
说白了就是在应用之间放置一个消息中间件,然后应用双方通过这个中间件进行消息通信。
为啥平白无故的要放一个消息中间件呢?
一般来说,小系统是不需要使用消息中间件的,只有分布式系统才会引入消息中间件,因为分布式系统需要应对高并发。
在分布式系统中,一般会面临三个问题:解耦、异步、削峰。
分布式系统需要多个系统之间解耦,也需要对用户比较友好的响应速度,更需要系统之间异步响应。
而消息中间件天然对各个应用系统之间进行解耦、异步发送消息和消费消息、并且通过队列缓存消息可以起到削峰填谷的作用。
因此,消息中间件可以完美的解决上面的三个问题。
然万物抱阳负阴,系统之间突然加了个中间件,提高系统复杂度的同时也增加了很多问题:
消息丢失怎么办?
消息重复消费怎么办?
某些任务需要消息的顺序消息,顺序消费怎么保证?
消息队列组件的可用性如何保证?
这些都是使用消息队列过程中需要思考需要考虑的地方,消息队列能给你带来很大的便利,也能给你带来一些对应的麻烦。
上面都是引入消息中间件后需要考虑的问题,后续文章会逐步分析。
2. RabbitMQ概览
RabbitMQ是一个消息组件,是一个通过erLang语言开发的AMQP(Advanced Message Queue)的开源实现。
RabbitMQ服务端使用ErLang语言实现,支持多种客户端。
其用于在分布式系统中存储转发信息,在易用性,扩展性,高可用性等方面表现不俗,消息队列是一种在应用系统之间进行通信的方法,是通过读写如队列的信息来通信,而RPC是直接调用彼此应用系统来通信。
消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP的主要特征是面向消息,队列,路由(包括点对点,发布/订阅)、可靠性、安全等的一个协议标准。
RabbitMQ采用了AMQP协议,这个协议到底是怎么一回事,我们不用关系,我们主要关心RabbitMQ在实战中到底如何使用。
我们先来看看RabbitMQ的一些概念,因为RabbitMQ实现了AMQP协议,所以这些概念也是AMQP中共有的。
盗用其他博主的一个架构图: Broker: 中间件本身。接收和分发消息的应用,这里指的就是RabbitMQ Server。
Virtual host: 虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange、queue等。host 是权限控制的基本单位,用户只能访问与之绑定的 vhost,默认 vhost:”/” ,默认用户”guest” 密码“guest”,来访问默认的vhost。
ConnectionFactory:连接工厂,生产Connection的工厂。
Connection: 连接。publisher/consumer和broker之间的TCP连接。断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题。是RabbitMQ的socket长连接,它封装了socket协议相关部分逻辑。
Channel: 渠道,也叫做频道、信道。如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销会比较大且效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。它是建立在Collection连接之上的一种轻量级的连接,我们大部分的业务操作是在Channel这个接口中完成的,包括定义队列的声明queueDeclare、交换机的声明exchangeDeclare、队列的绑定queueBind、发布消息basicPublish、消费消息basicConsume等。如果把Connection比作一条光纤电缆的话,那么Chanel管道就比作光纤电缆中的其中一束光纤。一个Connection上可以创建任意数量的Channel。
Exchange: 交换机,也叫做分发器。根据分发规则,匹配查询表中的routing key,分发消息到queue中去。
Queue: 消息的队列。消息最终被送到这里等待消费,一个message可以被同时拷贝到多个queue中。
Binding: 绑定。Binding并不是一个概念,而是一个操作。exchange和queue之间的虚拟连接,binding中可以包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。RabbitMQ中通过绑定,以Routing key作为桥梁将Exchange与Queue关联起来(Exchange->Routing key->Queue),这样RabbitMQ就知道如何正确地将消息路由到指定的队列了,通过queueBind方法将Exchange、Routing key、Queue绑定起来。
上面就是RabbitMQ消息中间件中的一些概念,其大致流程如下:
当我们的生产者端(Producer生产者应用)往Broker(RabbitMQ)中发送了一条消息,Broker会根据其消息的标识送往不同的Virtual host,然后Exchange(交换机)会根据消息携带的路由key(Routing key)和交换器类型(Exchange type)将消息(Message)分发到自己所属的Queue(消息队列)中去。
然后消费者端(Consumer消费者应用)会通过Connection(Rabbit连接)中的Channel(Rabbit通道)获取生产者刚刚推送到Queue中的消息,这个消息有可能是Queue主动推动给消费者(Push模式,异步消费),也可能是消费者主动从Queue中拉取(Pull模式,同步消费)。
注意:某个Exchange交换机有哪些属于自己的Queue,是通过Binding绑定关系决定的。
3. RabbitMQ和MQ的关系
RabbitMQ其实质是对MQ(Message Queue)规范的一种具体实现。
MQ规范实际上就是JMS(JAVA Message Service Java消息服务)API规范的扩展。
因此可以这么说,RabbitMQ实际上就是对JMS规范的具体实现。
当然,有很多厂商都对JMS规范做了实现,比如ActiveMQ、RocketMQ、Kafka等,都必须符合JMS规范,RabbitMQ也一样。
RabbitMQ在实现JMS规范的基础上扩展了自己的一些组件概念,其所有组件大致如下:
1.消息生产者Producer:负责生产消息,将消息发送到MQ中。
2.消息队列Queue:MQ中负责缓存消息的容器,可以看成是一个有序的数组,生产者生产的消息会发送到交换机中,最终交换机将消息存储到某个队列或者某些队列中(这取决于交换机根据Routing key匹配到了绑定的哪些队列),队列可被消费者订阅,也可被消费者主动拉取消息,不论是消费者订阅(订阅后消息将被MQ主动push,异步消费)还是消费者主动从MQ中拉取(pull,同步消费)消息,消费者都将从这些队列中拿到消息进行消费。
3.消息消费者Consumer:负责消费消息,从MQ的Queue中获取消息进行后续处理。消费者只需要关注队列即可,不需要关注交换机和路由键,消费者可以通过 basicConsume (订阅模式可以从队列中一直持续的自动的接收消息)或者 basicGet (先订阅消息,然后获取单条消息,再然后取消订阅,也就是说 basicGet 一次只能获取一条消息,如果还想再获取下一条还要再次调用basicGet)来从队列中获取消息。
4.交换机Exchange:这个是RabbitMQ自己扩展出来的一个组件,用来隔离生产者和消息队列,充当二者之间的桥梁;它负责从生产者接受消息,然后将消息通过Routing key分发到它绑定的消息Queue中去。Routing key就是交换机和Queue绑定时指定的一个路由策略键。
整体上来讲,在RabbitMQ中:
(1)生产者生产消息,将消息发送到Exchange,就结束了。生产者只关系它要往哪个Exchange发送消息。
(2)Exchange会通过Routing key和某个Queue或者和某一些Queue进行绑定,Routing key就是路由键,Exchange和每个队列之间都通过Routing key进行映射,只要能拿到Routing key,就能知道将消息分发到绑定的哪个Queue中去。
(3)消费者从Queue中消费消息,消费者不关心Exchange,也不关心Routing key,它只关心Queue,只需要知道从哪个Queue中消费消息就行。
4. RabbitMQ投递消息的架构场景
盗用其他博主的一个Exchange的架构图:
生产者、交换机、队列,这三者的组合最终决定了一条消息是如何流转并存储的。
基于这三者的维度来看,消息的运行周期应该考虑如下几种场景(下面都是基于一个Exchange的情况):
我们约束一个前提:那就是我们默认认为一个生产者生产的消息是同一种类型的消息,因为一个生产者携带的routing key是确定的。所以我们可以认为一个生产者,就对应生产同一种类型的消息。下面的消息源指的就是同一种类型的消息,即同一个生产者生产的消息,每个消息源携带的roting key是确定。
(1)单消息源定向单播投递:只有一个生产者生产消息,将这条消息定向投递到一个队列中,即所谓单播投递。
(2)单消息源定向多播投递:只有一个生产者生产消息,将这条消息定向投递到一组队列中,即所谓多播投递。
(3)单消息源广播投递:只有一个生产者生产消息,将这条消息广播投递到所有的队列中。
(4)多消息源定向单播投递:多个生产者生产消息,将这些消息定向投递到一个队列中。
(5)多消息源定向多播投递:多个生产者生产消息,将这些消息定向投递到一组队列中。
(6)多消息源广播投递:多个生产者生产消息,将这些消息广播投递到所有的队列中。
(7)多消息源模糊单播投递:多个生产者生产消息,将这些消息模糊投递到一个队列中。
(8)多消息源模糊多播投递:多个生产者生产消息,将这些消息模糊投递到一组队列中。
在实际操作中,上面几种场景,还存在多个Exchange存在的情况。这样一来,这种组合关系就更复杂了!
5. Exchange交换机类型
RabbitMQ架构中的核心组件就是交换机,通过交换机来实现消息的各种分发策略。
反复强调,在rabbitMQ中,生产者从来不会将消息推送到Queue中,甚至生产者根本不需要知道将消息发送给哪些Queue,生产者只能推送消息到交换机中,再由交换机根据Routing key的分发策略将消息推送到Queue中,如果没有定义交换机则消息默认会推送到default Exchange中,再由default Exchange根据Routing key推送到它绑定的Queue中。
交换机收到消息后,会根据交换机的类型和Routing key的映射配置来进一步决定消息该如何转发:
比如将消息推送到特定的队列。
也有可能推送到多个队列。
也有可能推送到某一个队列。
也有可能丢弃消息(当Routing key没有匹配到队列时)。
……
有了交换机后,我们就可以根据交换机和队列、以及路由key之间的映射关系实现很多复杂的能力。
比如:
(1)实现消息定向多播投递的能力,将一条消息同时推送到多个Queue中,从而被多个消费者同时消费。
(2)实现消息定向单播投递的能力,将一条消息只推送到某一个Queue中,从而只能被订阅该Queue的消费者消费,其他消费者无法消费。
(3)实现多消息源模糊单播或者多播投递的能力,将多条消息推送到某一个Queue中(单播)或者某一批Queue中(多播),只要订阅这一个Queue或者这一批Queue中的消费者都可以消费到这条消息。
……
盗用其他博主的一个Exchange的架构图:
由上图可知,生产者将消息发送到Exchange,再有Exchange根据Routing key推送到一个或者多个队列中,如果没有匹配的队列,将丢弃消息。
下来我们就详细分析下rabbitMQ提供的各种交换机类型,以及它们之间的区别和联系。
它提供了如下几种交换机类型:
Direct类型(直连交换机)
Topic类型(主题交换机)
Fanout类型(扇形交换机)
Headers类型(标题交换机)
4.1 Direct exchange(直连交换机)
直连交换机的路由算法非常简单:根据消息携带的routing key、与交换机和队列创建绑定关系时指定的routing key进行比对,如果两者相同,就将该消息投递到对应的队列中。否则丢弃该消息。
直连交换机X上绑定了两个队列:
第一个队列绑定了一个路由键: orange;
第二个队列绑定了两个路由键: black和green。
在这种场景下,一个消息在发布时指定了路由键为orange将会只被路由到队列Q1;
路由键为black 和green的消息都将被路由到队列Q2。
其他的消息都将被丢弃。
一个携带Routing key为black的消息,被Exchange同时推送到了队列Q1和Q2中。
分析场景:
(1)单消息源定向单播投递
这是Direct交换机的专属特性,一个Direct Exchange,通过一个Routing key绑定一个Queue,这时候Exchange 和 Queue 就是一对一的关系,实现单消息源定向单播投递。
(2)单消息源定向多播投递
Direct Exchange也可以使用同一个Routing key绑定到不同的Queue中去,这种场景下,一个交换机绑定了多个Queue,实现了单消息源定向多播投递的行为,这时候和Fanout交换机的行为类似。
(3)单消息源广播投递
广播投递的意思是不区分routing key,只要这个消息推送到了交换机,交换机就将这个消息广播给它绑定的所有Queue。
很显然,Direct Exchange无法实现广播投递,因为不论怎么设计,它都依旧需要识别消息的Routing key。
它只能将单消息源进行定向多播投递,但没法真正做到广播投递。
(4)多消息源定向单播投递
多个生产者连接一个Direct Exchange,那这个直连交换机要维护这些生产者携带的所有routing key。
也就是Direct Exchange需要维护多个Routing key,然后这些Routing key都绑定同一个Queue。
这样就实现了多消息源定向单播投递。
(5)多消息源定向多播投递
和(4)类似。
需要Direct Exchange维护所有多消息源携带的Routing key,然后通过第一个Routing key和一组Queue建立绑定关系;再通过第二个Routing key和同样这组Queue建立绑定关系;以此类推…
这样一个,这个Direct Exchange中就维护了很庞大的一张映射表。
Direct Exchange Routing key Queue
eric test.01 Q1
eric test.01 Q2
eric test.01 Q3
eric test.02 Q1
eric test.02 Q2
eric test.02 Q3
eric test.03 Q1
eric test.03 Q2
eric test.03 Q3
eric test.04 Q1
eric test.04 Q2
eric test.04 Q3
如上面的映射所示:
直连交换机eric,维护了一张很大的映射表。
其中,消息test.01会被投递到Q1,Q2,Q3。
test.02夜会被投递到Q1,Q2,Q3。
test.03和test.04也是一样。
这样就实现了多消息源定向多播投递:多个消息都会被统一投递到多个消息队列。
(6)多消息源广播投递
同上面的分析,Direct Exchange没法实现广播行为,只能实现多消息源定向多播投递。
(7)多消息源模糊单播投递
Direct Exchange无法实现模糊投递的能力。
(8)多消息源模糊多播投递
同(7)
补充:Exchange是如何维护Binding映射表的啊?
Binding映射表是在Exchange中维护的,可以想象它是一个Map,只不过键可以重复,相同的键不会覆盖彼此的值,而是各自维护各自的映射。
map.put("routing key1","queue1");
map.put("routing key1","queue2");
map.put("routing key2","queue2");
map.put("routing key3","queue2");
map.put("routing key4","queue4");
如上,routing key1可以同时映射两个队列queue1和queue2,这种实现了广播投递行为;
routing key1、routing key2、routing key3映射了同一个队列queue2,这种实现了消息的模糊投递;
routing key4只映射了queue4,这种实现了消息的定向投递。
4.2 Topic exchange(主题交换机)
直连交换机的缺点:
(1)假如一个生产者将消息投递到多个队列,那么直连交换机需要使用同一个Routing key分别绑定多个Queue才能实现。
(2)假如多个生产者将消息投递到同一个队列,那么直连交换机还必须使用多个Routing key去绑定同一个Queue。
(3)假如多个生产者将消息投递到多个队列,那么直连交换机必须使用多个Routing key,去分别绑定一批Queue。
不论上面的哪种行为,都导致直连交换机种维护的routing key和queue的映射特别复杂。
我们看看主题交换机吧。
这种交换机最灵活,它有Routing key,但是这个key支持模糊匹配。
当这个key是确定值(没有和#)的时候,就和Direct交换机的功能一样,实现定向投递;
当这个key是任意匹配(只有#)的时候,就和Fanout交换机的功能一样,广播所有接收到的所有消息到绑定的Queue中去,实现广播投递;
当这个key是模糊匹配(有正常的单词和,#等)的时候,它就能够将消息分发到它所匹配到的对应的Queue中去,实现模糊投递。
Direct交换机和Topic交换机中的Routing key的转发规则大致相同,都是直接比较消息中携带的Routing key是否和交换机中维护的Routing key相等,如果相等,就将消息投送到绑定的队列中,否则丢弃。
但Topic交换机中,我们说了它的路由键支持模糊匹配,因此Topic交换机的路由键增加了一些特殊的匹配规则。
Topic交换机中有两个特殊的匹配符号:
*:匹配任意一个字母
#:匹配零个或者多个字母
Topic交换机中的Routing key如果不带上面的特殊模糊匹配字符,它就和Direct交换机一样,直接进行精确匹配。
如果只带一个“#”的话,那意味着消息中的Routing key中没有任何字符或者有任何字符都进行投送,这表示Topic交换机会将消息无条件的投送到它绑定的Queue中去,这时就和Fanout扇形交换机一样,类似于广播投递。
带特殊符号的Routing key比如:topic.#.#,topic.route.#.#,topic.route.#,topic.route.one.#,*,#等。
使用场景如下:
Producer生产一个消息X,这个消息携带Routing key info.test,根据Topic交换机和Queue的Routing key模糊匹配关系,这个消息将会被同时推送到topic.queue.a和topic.queue.b队列。
然后消费者可以同时从这两个队列消费消息。
举个例子:
生产者发送消息道topic交换机上面,队列A和队列B绑定一个topoc交换机,对于队列a来说,它绑定的key为info.#,对于队列b来说,它绑定的key为info.*。
如果发送两条消息,其中第一条消息对应的路由key为info.xxxooo,第二条消息对于的路由key为info.xxxoo.kkk。根据匹配规则第一条跟第二条都会转到队列A中,而第一条则会转发队列B中去。
再看一个案例:
routing key quick.orange.rabbit-> queue Ql, Q2 routing key lazy.orange.elephant-> queue Ql,Q2
好了,接下来继续分析场景:
假如只有一个Topic exchange存在。
分析场景:
(1)单消息源定向单播投递
Topic交换机此时只需要通过一个精确的routing key和一个Queue进行绑定即可,此时Topic Exchange和Direct Exchange实现的能力一样。
(2)单消息源定向多播投递
Topic Exchange通过一个精确的routing key和一批Queue进行绑定。
(3)单消息源广播投递
Topic Exchange通过模糊符“#”与一批Queue进行绑定,此时消息携带任何Routing key都可以推送到这批Queue中,此时和Fanout Exchange实现的能力一样。
(4)多消息源定向单播投递
Topic Exchange通过支持模糊匹配的Routing key与某个Queue建立绑定关系,这个模糊匹配格式应该涵盖多消息源携带的routing key,这样就实现了多消息源定向单播投递。
(5)多消息源定向多播投递
Topic Exchange通过支持模糊匹配的Routing key与某一批Queue建立绑定关系,这样多消息源中携带的routing key只要匹配到了都会被定向投递到这一堆Queue中。
(6)多消息源广播投递
Topic Exchange通过模糊符“#”与一批Queue进行绑定,此时消息携带任何Routing key都可以推送到这批Queue中,此时和Fanout Exchange实现的能力一样。
(7)多消息源模糊单播投递
Topic Exchange通过支持模糊匹配的Routing key与某个Queue建立绑定关系,这个模糊匹配格式应该涵盖多消息源携带的routing key,这样就实现了多消息源定向单播投递。
(8)多消息源模糊多播投递
Topic Exchange通过支持模糊匹配的Routing key与某一批Queue建立绑定关系,这样多消息源中携带的routing key只要匹配到了都会被定向投递到这一堆Queue中。
通过分析可以看出,Topic exchange最灵活,能够实现各种交换机的行为,也能够实现它们不能够实现的行为(多消息多播)。
4.3 Fanout exchange(扇形交换机)
扇形交换机也叫做广播交换机,它是最基本的交换机类型。它能做的事情非常简单,就是广播投递消息。
它像一把扇子一样,从扇把处将消息广播给它所有连接的扇叶。
扇形交换机会把接收到的消息全部发送给与它绑定的所有队列上,因为广播不需要“思考”(即匹配路由算法),因此扇形交换机处理的速度是所有交换机里面最快的。
这种交换机没有Routing key的概念,即便指定了Routing key也不生效,一条消息发送到该交换机,会被转发到它绑定的所有Queue中。
扇形交换机没有Routing 的概念,没法实现我们上面定义的问题场景。
5. Routing key 路由键
交换机负责连接生产者和消息队列:
生产者->交换机->消息队列
(1)创建一个交换机 (2)创建一个队列 (3)绑定交换机和队列,这时候如果交换机是Direct、Topic类型,在绑定时,就需要指定一个Routing key,这个key维护了这个交换机和这个队列之间的绑定关系。
(4)生产者向交换机发送消息时,需要指定Routing key,交换机根据消息中携带的这个Routing key将消息分发到绑定的Queue中去。
Routing key:一个String值,用于定义Exchange到Queue之间的路由规则。
在队列和交换机进行绑定的时候需要指定Routing Key;
在生产者发布消息到交换机的时候需要指定Routing key,当消息的Routing key和队列绑定时指定的Routing key匹配时,消息就会发送到该队列。
其通常由零个或者多个有意义的单词通过点号(.)分隔拼接而成,这些单词可以是任意的,但一般都是与消息相关的一些特征,比如:topic.route.one,topic.route.two,topic等,路由键最多只能有255个字节。
6. 默认交换机
安装好rabbitMQ后,啥都不做,其服务器内部就有一个默认交换机(default exchange),它实际上是一个由rabbitMQ预先创建好的名字为空字符串(”“)的Direct exchange。
默认交换机,有个特征:
隐式的将所有队列都自动绑定到该交换机上,并且绑定的routing key即为队列的名字。
需要注意,即便手动为某个队列绑定了一个新的交换机,这个队列依旧绑定在默认交换机上,routing key就是该队列的名字。
详情请参考下一篇文章rabbitMQ 管理台操作
文档信息
- 本文作者:Marshall