黑马程序员技术交流社区

标题: 【上海校区】RabbitMQ消息队列(七):适用于云计算集群的... [打印本页]

作者: 不二晨    时间: 2018-12-10 10:25
标题: 【上海校区】RabbitMQ消息队列(七):适用于云计算集群的...
在云计算环境中,很多时候需要用它其他机器的计算资源,我们有可能会在接收到Message进行处理时,会把一部分计算任务分配到其他节点来完成。那么,RabbitMQ如何使用RPC呢?在本篇文章中,我们将会通过其它节点求来斐波纳契完成示例。
1. 客户端接口 Client interface为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞知道收到RPC运算的结果。代码如下:
fibonacci_rpc = FibonacciRpcClient()result = fibonacci_rpc.call(4)print "fib(4) is %r" % (result,)复制代码
2. 回调函数队列 Callback queue总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client发送请求的Message然后server返回响应结果。为了收到响应client在publish message时需要提供一个”callback“(回调)的queue地址。code如下:
result = channel.queue_declare(exclusive=True)callback_queue = result.method.queuechannel.basic_publish(exchange='',                      routing_key='rpc_queue',                      properties=pika.BasicProperties(                            reply_to = callback_queue,                            ),                      body=request)# ... and some code to read a response message from the callback_queue ...复制代码2.1 Message properties
AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:

3. 相关id Correlation id在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。
这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。

4. 总结
工作流程:

5. 最终实现The code for rpc_server.py:
#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')def fib(n):    if n == 0:        return 0    elif n == 1:        return 1    else:        return fib(n-1) + fib(n-2)def on_request(ch, method, props, body):    n = int(body)    print " [.] fib(%s)"  % (n,)    response = fib(n)    ch.basic_publish(exchange='',                     routing_key=props.reply_to,                     properties=pika.BasicProperties(correlation_id = /                                                     props.correlation_id),                     body=str(response))    ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request, queue='rpc_queue')print " [x] Awaiting RPC requests"channel.start_consuming()复制代码The server code is rather straightforward:
The code for rpc_client.py:
#!/usr/bin/env pythonimport pikaimport uuidclass FibonacciRpcClient(object):    def __init__(self):        self.connection = pika.BlockingConnection(pika.ConnectionParameters(                host='localhost'))        self.channel = self.connection.channel()        result = self.channel.queue_declare(exclusive=True)        self.callback_queue = result.method.queue        self.channel.basic_consume(self.on_response, no_ack=True,                                   queue=self.callback_queue)    def on_response(self, ch, method, props, body):        if self.corr_id == props.correlation_id:            self.response = body    def call(self, n):        self.response = None        self.corr_id = str(uuid.uuid4())        self.channel.basic_publish(exchange='',                                   routing_key='rpc_queue',                                   properties=pika.BasicProperties(                                         reply_to = self.callback_queue,                                         correlation_id = self.corr_id,                                         ),                                   body=str(n))        while self.response is None:            self.connection.process_data_events()        return int(self.response)fibonacci_rpc = FibonacciRpcClient()print " [x] Requesting fib(30)"response = fibonacci_rpc.call(30)print " [.] Got %r" % (response,)复制代码The client code is slightly more involved:
开始rpc_server.py:
$ python rpc_server.py [x] Awaiting RPC requests复制代码通过client来请求fibonacci数:
$ python rpc_client.py [x] Requesting fib(30)复制代码现在这个设计并不是唯一的,但是这个实现有以下优势:
我们的code还是挺简单的,并没有尝试去解决更复杂和重要的问题,比如:


【转载】仅作分享,侵删
作者:AskHarries
链接:https://juejin.im/post/5ac22cb9f265da237a4d30ce




作者: 不二晨    时间: 2018-12-11 15:54

作者: 小影姐姐    时间: 2018-12-13 14:51





欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2