rabbitMQ 消费者线程

2023/10/06 MQ系列 共 1269 字,约 4 分钟
闷骚的程序员

1. 为什么要考虑这个问题?

我们前面学习rabbitMQ,学的好好的,为什么突然间考虑到多线程这个问题?
生产者向交换机推送了消息,交换机根据路由键将消息转发到了指定的消息队列(假设可以正常路由),消费者从消息队列消费消息。

消费者从Queue中消费消息,有两种方式:
(1)同步消费
消费者主动从Queue中去拉取消息,pull模式,拉取一次启动一个消费者线程,多次拉取的话需要启动多个消费者线程。
当然,这取决于消费者的具体操作,如果消费者建立一次连接,然后循环去主动拉取所有消息,那么此时消费者线程也是一个。
又或者说,消费者委托了一个独立的线程去Queue中去拉取消息,此时拉取消息的消费者线程也是一个。
当然,消费者也可以启动多个线程(比如线程池)然后并发地从Queue中去主动拉取消息。

同步消费的代码如下:

GetResponse getResponse = channel.basicGet(MyProducer1.QUEUE_NAME, false);

(2)异步消费
消费者启动后,主线程不能断开连接,必须一直和rabbitMQ保持连接在线。
然后消息是通过rabbitMQ服务器主动推送过来的,消费者属于被动接收消息,这种模式是push模式。
因为不需要消费者主动去rabbitMQ中去拉取消息,这样不耽误消费者线程,也不需要消费者去考虑启动其他线程一直轮询地去服务器查询是否有消息,因此这种方式解放了消费者,属于异步消费。

异步消费的代码如下:

channel.basicConsume(MyProducer1.QUEUE_NAME, consumer);

其中consumer对象就是一个服务器端定义好的回调对象,当有消息进来后,rabbitMQ服务器会主动回调该对象。

通过上面的分析,清楚了同步消费,由消费者主动去消费,至于创建几个消费者线程,是由消费者自己实现的。
但是同步消费一般要浪费消费者线程,可能服务器一直没有消息进来,但是消费者还必须保持有一些线程一直连接服务器去轮询地查看消息。
所以,同步消费场景使用的非常少。
主要是异步消费,消费者向服务端注册一个监听器,当服务端一有消息进来,服务端就会回调这个监听器去推送消息给消费者。
我们主要考虑,在异步消费时,服务端异步执行的这个线程到底是不是同一个线程?

考虑这个问题有什么意义呢?
如果异步消费场景下,消费者拿到消息后做处理,但是这个处理特别慢,是个重量级操作,这时候就需要考虑这个消息推送过来后到底是同一个线程在执行还是每个消息会拥有一个独立的线程。

2. 消费者线程

异步消费场景,实际上是消费者和rabbitMQ保持了一个TCP长连接。
这时候,消费者端实际上是TCP服务端,rabbitMQ实际上是一个TCP客户端。
rabbitMQ主动向消费者推送消息过来,实际上是和消费者建立连接后发送了一次数据。
但凡我们熟悉java socket编程的话,其实这个问题一眼就明白了。
异步消费时,消费者收到rabbitMQ推送的所有消息,其实都是消费者启动的同一个主线程去操作的。
如果不熟悉socket编程的话,请移步

文档信息

Search

    Table of Contents