RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。
RabbitMQ介绍
RabbitMQ特点:
- 开源、性能优秀,稳定性保障
- 提供可靠性消息投递模式、返回模式
- 与Spring AMQP完美整合,API丰富
- 集群模式丰富,表达式配置,HA模式,镜像队列模型
- 保证数据不丢失的前提做到高可靠性、可用性
MQ典型应用场景:
- 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
- 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
- 日志处理
- 应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。
Docker安装
Docker提供一种安全、可重复的环境中自动部署软件的方式,鳄鱼君Ba这里使用Docker进行安装RabbitMQ。进入官方下载地址,选择使用Docker安装,跳转到dockerhub查看镜像。
选择3.8.0-beta.4-management进行安装,带有management是含有管理界面的。这里可以直接在docker中拉取镜像和启动,如果不知道如何使用docker和镜像加速的参考:Docker容器的安装以及常见错误 Docker如何进行镜像加速
查询rabbitmq镜像:docker search rabbitmq:management
拉取rabbitmq镜像:docker pull rabbitmq:management
docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.8.0-beta.4-management
--hostname:指定容器主机名称
my-rabbit:指定容器名称
-p:将mq端口号映射到本地
15672:控制台端口号 5672:应用访问端口号
设置为自启
docker run -d --hostname my-rabbit -d --restart=unless-stopped
备选启动同时设置用户和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
查看本地镜像:docker images
打开浏览器访问:http://192.168.99.100:15672/,进行填写账号密码:默认账号密码都是guest.到此,RabbitMQ已经安装并运行起来了。记得不要关闭docker服务器哟!

这样的链接的话,host就是192.168.99.100,port就是5672,需要注意!
本地安装
安装: http://www.rabbitmq.com/install-standalone-mac.html,3.8.5版本的已经包含erlang,不需要单独安装,速度比较慢,你还是使用docker吧,这里不过多介绍!
安装python rabbitMQ module 使用:pip install pika命令即可,源码:https://pypi.python.org/pypi/pika。目前pika版本为1.1.0,所以说代码可能有些变化。
实现最简单的队列通信鳄鱼君Ba这里使用docker进行,所以host和port都需要注意,不是本地的。一个send.py用于发送,receive.py接收:
# send.py
import pika
# 建立socket
connection=pika.BlockingConnection(
pika.ConnectionParameters(
host='192.168.99.100',port=5672
))
# 声明一个管道
channel=connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')
# 通过管道
channel.basic_publish(exchange='',
routing_key='eyujun',# queue名字
body='Hello eyujun!')
print('发送 "Hello eyujun!"')
connection.close()
# receive.py
import pika
# 建立socket链接
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='192.168.99.100',port=5672
))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')
def callback(ch, method, properties, body):
print(ch,'\n',method,'\n',properties) # 可以自己打印看看是什么
print(" 收到消息: %r" % body)
channel.basic_consume('eyujun', # 队列名称
callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
)
print(' 正在等待消息,要退出,请按CTRL+C:')
# 开始接收消息
channel.start_consuming()
在发送端send和接收端receive都需要声明管道并在管道中声明queue,我们无法确定哪一端先启动,如果没有声明管道的一段先启动就会报错。
消息队列
RabbitMQ会默认把p发的消息依次分发给各个receive端,跟负载均衡差不多。现在创建多个receive端接收send端发送的消息,启动3个receive端,1个send端,多次send消息,receive会一次接收到消息,也就是说一个receive只接收一个。代码还是上面的代码:
# send.py
import pika
# 建立socket
connection=pika.BlockingConnection(
pika.ConnectionParameters(
host='192.168.99.100',port=5672
))
# 声明一个管道
channel=connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')
# 通过管道
channel.basic_publish(exchange='',
routing_key='eyujun',# queue名字
body='Hello eyujun!')
print('发送 "Hello eyujun!"')
connection.close()
# receive.py
import pika
# 建立socket链接
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='192.168.99.100',port=5672
))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')
def callback(ch, method, properties, body):
print(ch,'\n',method,'\n',properties)
print(" 收到消息: %r" % body)
channel.basic_consume('eyujun', # 队列名称
callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
)
print(' 正在等待消息,要退出,请按CTRL+C:')
# 开始接收消息
channel.start_consuming()
pycharm启动多个receive.py文件,需要在Run菜单中找到Edit Configurations并勾选Allow parallel run
现在假设receive端在接收消息的时候需要处理20s,可以理解为花费20s,在这个时间内宕机或者断电了,消息就没有收到。RabbitMQ默认的处理方式是会自动确认消息是否处理完。其中channel.basic_consume的第三个参数默认为False,如果修改为True,就会扔掉消息!
我们可以将channel.basic_consume的第三个参数修改为True模拟一下,只需要在receive.py文件的callback函数中sleep即可。启动send和receive,在receive等待的20内断掉,这个消息就没了!
# receive.py
import time
import pika
# 建立socket链接
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='192.168.99.100',port=5672
))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')
def callback(ch, method, properties, body): # 回调函数
time.sleep(3)
print(" 收到消息: %r" % body)
channel.basic_consume('eyujun', # 队列名称
callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
True)
print(' 正在等待消息,要退出,请按CTRL+C:')
# 开始接收消息
channel.start_consuming()
send.py文件不变,这时候你就会发现消息会被扔掉!
2. 本站不保证所提供下载的资源的准确性、安全性和完整性,资源仅供下载学习之用!如有链接无法下载、失效或广告,请联系客服处理!
3. 您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源!如用于商业或者非法用途,与本站无关,一切后果请用户自负!