RabbitMQ消息队列的环境配置和简单使用

RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。

RabbitMQ介绍

RabbitMQ特点:

  1. 开源、性能优秀,稳定性保障
  2. 提供可靠性消息投递模式、返回模式
  3. 与Spring AMQP完美整合,API丰富
  4. 集群模式丰富,表达式配置,HA模式,镜像队列模型
  5. 保证数据不丢失的前提做到高可靠性、可用性

MQ典型应用场景:

  1. 异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
  2. 流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
  3. 日志处理
  4. 应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。

Docker安装

Docker提供一种安全、可重复的环境中自动部署软件的方式,鳄鱼君Ba这里使用Docker进行安装RabbitMQ。进入官方下载地址,选择使用Docker安装,跳转到dockerhub查看镜像。

选择3.8.0-beta.4-management进行安装,带有management是含有管理界面的。这里可以直接在docker中拉取镜像和启动,如果不知道如何使用docker和镜像加速的参考:Docker容器的安装以及常见错误 Docker如何进行镜像加速

查询rabbitmq镜像:docker search rabbitmq:management
拉取rabbitmq镜像:docker pull rabbitmq:management

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3.8.0-beta.4-management

--hostname:指定容器主机名称
my-rabbit:指定容器名称
-p:将mq端口号映射到本地
15672:控制台端口号 5672:应用访问端口号

设置为自启
docker run -d --hostname my-rabbit -d --restart=unless-stopped

备选启动同时设置用户和密码

docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

查看本地镜像:docker images

打开浏览器访问:http://192.168.99.100:15672/,进行填写账号密码:默认账号密码都是guest.到此,RabbitMQ已经安装并运行起来了。记得不要关闭docker服务器哟!

这样的链接的话,host就是192.168.99.100,port就是5672,需要注意!

本地安装

安装: http://www.rabbitmq.com/install-standalone-mac.html,3.8.5版本的已经包含erlang,不需要单独安装,速度比较慢,你还是使用docker吧,这里不过多介绍!

安装python rabbitMQ module 使用:pip install pika命令即可,源码:https://pypi.python.org/pypi/pika。目前pika版本为1.1.0,所以说代码可能有些变化。

实现最简单的队列通信

鳄鱼君Ba这里使用docker进行,所以host和port都需要注意,不是本地的。一个send.py用于发送,receive.py接收:

# send.py
import pika

# 建立socket
connection=pika.BlockingConnection(
    pika.ConnectionParameters(
        host='192.168.99.100',port=5672
    ))
# 声明一个管道
channel=connection.channel()

# 在管道中声明queue
channel.queue_declare(queue='eyujun')

# 通过管道
channel.basic_publish(exchange='',
                      routing_key='eyujun',# queue名字
                      body='Hello eyujun!')
print('发送 "Hello eyujun!"')
connection.close()
# receive.py
import pika
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')

def callback(ch, method, properties, body):
    print(ch,'\n',method,'\n',properties) # 可以自己打印看看是什么
    print(" 收到消息: %r" % body)
channel.basic_consume('eyujun', # 队列名称
                        callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
                      )
print(' 正在等待消息,要退出,请按CTRL+C:')

# 开始接收消息
channel.start_consuming()

在发送端send和接收端receive都需要声明管道并在管道中声明queue,我们无法确定哪一端先启动,如果没有声明管道的一段先启动就会报错。

消息队列

RabbitMQ会默认把p发的消息依次分发给各个receive端,跟负载均衡差不多。现在创建多个receive端接收send端发送的消息,启动3个receive端,1个send端,多次send消息,receive会一次接收到消息,也就是说一个receive只接收一个。代码还是上面的代码:

# send.py
import pika

# 建立socket
connection=pika.BlockingConnection(
    pika.ConnectionParameters(
        host='192.168.99.100',port=5672
    ))
# 声明一个管道
channel=connection.channel()

# 在管道中声明queue
channel.queue_declare(queue='eyujun')

# 通过管道
channel.basic_publish(exchange='',
                      routing_key='eyujun',# queue名字
                      body='Hello eyujun!')
print('发送 "Hello eyujun!"')
connection.close()
# receive.py

import pika
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')

def callback(ch, method, properties, body):
    print(ch,'\n',method,'\n',properties)
    print(" 收到消息: %r" % body)
channel.basic_consume('eyujun', # 队列名称
                        callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
                     )
print(' 正在等待消息,要退出,请按CTRL+C:')

# 开始接收消息
channel.start_consuming()

pycharm启动多个receive.py文件,需要在Run菜单中找到Edit Configurations并勾选Allow parallel run

现在假设receive端在接收消息的时候需要处理20s,可以理解为花费20s,在这个时间内宕机或者断电了,消息就没有收到。RabbitMQ默认的处理方式是会自动确认消息是否处理完。其中channel.basic_consume的第三个参数默认为False,如果修改为True,就会扔掉消息!

我们可以将channel.basic_consume的第三个参数修改为True模拟一下,只需要在receive.py文件的callback函数中sleep即可。启动send和receive,在receive等待的20内断掉,这个消息就没了!

# receive.py
import time
import pika
# 建立socket链接
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
    host='192.168.99.100',port=5672
    ))
# 声明管道
channel = connection.channel()
# 在管道中声明queue
channel.queue_declare(queue='eyujun')

def callback(ch, method, properties, body): # 回调函数
    time.sleep(3)
    print(" 收到消息: %r" % body)

channel.basic_consume('eyujun', # 队列名称
                        callback, # 消费消息的函数,如果收到消息,就调用callback函数处理消息
                      True)
print(' 正在等待消息,要退出,请按CTRL+C:')

# 开始接收消息
channel.start_consuming()

send.py文件不变,这时候你就会发现消息会被扔掉!

1. 本网站禁止以任何形式的转载和刊发传播本网站内容!你下载学习了解调试原理之后应在24小时内删除!!!
2. 本网站资源仅供在本网站范围内学习和交流不得超出本范围,不得用于任何其他用途,商用请购买发行版。
3. 转载.刊发或截屏录制等任何形式的传播留存本站内容,本站将追究其法律和赔偿责任,造成本站或第三方损失由你本人赔偿。
4. 本网站提供的任何资源,只是原样采集,原版BUG缺陷如常存在,本站不是开发商请大家谅解,其他任何问题请联系客服处理。
5. 本网站发布的内容若侵犯到您的权益,请联系站长并提供你的版权证明,本站收到后我们将及时删除处理。
6. 请认真阅读本站注册公告和相关协议后使用本网站,你继续浏览代表你已同意本站所有条款。
7. 请勿相信下载后资源文件里的网址链接、QQ、微信、邮箱、电话等联系方式,你主动联系需自行承担全部后果。谨防诈骗!
模板君 » RabbitMQ消息队列的环境配置和简单使用

发表评论