在 RabbitMQ实现消息持久化及广播模式中,我们已经可以对收到的消息进行过滤,但是现在我想实现双向通信。send端发送receive端接收消息并恢复,这就叫RPC,如何实现?
确切的说,send端发送一条命令,receive端接收指令并返回结果。这就需要两端即是发送端也是接收端,现在我们不叫发送和接收端,而是通过客户端和服务端来介绍,这样更有助于理解!
# client.py
import pika,uuid,time
class RpcClient(object):
def __init__(self):
# 建立socket链接
self.connection=pika.BlockingConnection(
pika.ConnectionParameters(
host='192.168.99.100',port=5672
))
# 声明一个管道
self.channel=self.connection.channel()
result=self.channel.queue_declare('',exclusive=True)
self.callback_queue=result.method.queue
self.channel.basic_consume( self.callback_queue, # 队列名称
self.on_response, # 回调函数
True) # 扔掉消息
def on_response(self,ch,method,props,body):
if self.corr_id == props.correlation_id:
self.response = body.decode() # 队列的返回
def call(self, cmd):
self.response = None
self.corr_id = str(uuid.uuid4()) # 生成一个随机的id
self.channel.basic_publish(exchange='',
routing_key='rpc_queue', # queue名称
properties=pika.BasicProperties(
reply_to=self.callback_queue, # 服务器返回命令到的queue
correlation_id=self.corr_id, # 将id传给服务器端
),
body=cmd) #消息内容
while self.response is None: # 如果response为None就一直收消息
# 没有消息就往下走,有消息就触发on_response回调函数
self.connection.process_data_events() # 非阻塞版的 start_consuming()
print('正在等待返回消息.....')
time.sleep(3)
return self.response
ci_rpc = RpcClient() # 实例化
while True:
cmd=input('准备接受指令:')
response = ci_rpc.call(cmd) # 传入指令
print(" 指令返回结果: %r" % response)
# server.py
import pika,os
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='192.168.99.100',port=5672))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(cmd):
res_cmd=os.popen(cmd).read()
return res_cmd
def on_request(ch, method, props, body): # 收到消息,返回命令执行结果
cmd = body.decode()
print(" 接收指令:%s" % cmd)
response = fib(cmd)
ch.basic_publish(exchange='',
routing_key=props.reply_to, # 获取客户端的随机生成的queue
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=response)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume( 'rpc_queue',
on_request)
print(" 等待指令中....")
channel.start_consuming()
声明:1. 本站所有资源来源于用户上传和网络,因此不包含技术服务请大家谅解!如有侵权请邮件联系客服!
2. 本站不保证所提供下载的资源的准确性、安全性和完整性,资源仅供下载学习之用!如有链接无法下载、失效或广告,请联系客服处理!
3. 您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源!如用于商业或者非法用途,与本站无关,一切后果请用户自负!
2. 本站不保证所提供下载的资源的准确性、安全性和完整性,资源仅供下载学习之用!如有链接无法下载、失效或广告,请联系客服处理!
3. 您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容资源!如用于商业或者非法用途,与本站无关,一切后果请用户自负!