• 收藏
  • 加入书签
添加成功
收藏成功
分享

消息队列与oslo_messaging实现原理

曹挺伟
  
安家(建筑与工程)
2021年1期

打开文本图片集

中图分类号:TU 文献标识码:A 文章编号:(2021)-01-252

前言

OpenStack遵循这样的设计原则:项目之间通过 RESTful API 进行通信,项目内部,不同服务进程之间通过消息队列进行通信,通信的代码实现借由 oslo_messaging 库完成

oslo_messaging 是对 Python 库 kombu 的封装,kombu 是对 AMQP 的封装,所以在讨论 oslo_messaging 前有必要对 AMQP 进行说明

AMQP

Openstack 所支持的消息队列类型中,大部分都是基于AMQP,基于 AMQP 标准,有很多具体的实现,比如 OpenStack 默认的 RabbitMQ,Kafka 等,所以本文着重介绍 AMQP 的架构,RabbitMQ 架构与之类似

架构图如下所示:

在对上图进行说明之前需要先对其中的一些名词进行说明:

·Server/Broker:AMQP 的具体实现,如 RabbitMQ,Kafka

·Producer:生产者

·Consumer:消费者

·Virtual Host:虚拟主机,一个 broker 里可以有多个 vhost,用作不同用户的权限分离

·Exchange:接收 Producers 发送过来的消息,按照一定规则转发到相应的 Message Queues 中

·Queue:将接收到的消息转发到相应的 Consumers

·Channel:消息通道,在客户端的每个连接里,可建立多个 Channel

·Routing Key:Exchange 根据这个关键字将消息转发至不同的 Queue

流程说明:生产者将消息发送给 Exchange,由 Exchange 来决定消息的路由,即决定将消息发送到那个 Queue,然后消费者从 Queue 中取出消息,进行处理,至于 Exchange 将消息转发给哪一个 Queue,这将依赖于 Routing Key,每一个发送的消息都有一个 Routing Key,同样,每一个 Queue 也有一个 Binding Key,Exchange 在进行消息路由时,会查询每一个 Queue,如果某个 Queue 的 Binding Key 与某个消息的 Routing Key 匹配,这个消息会被转发到那个 Queue

简单来说,Exchange 依据 Routing Key 进行匹配,将消息转发给匹配成功的 Message Queue,总共有三种匹配模式:

?1?Direct:Routing Key 为一字符串,匹配规则为全值匹配

?2?Topic:Routing Key 为由 . 分隔的一个个子串组成的字符串,匹配规则为模式匹配,具有两种通配符,星号 '*' 表示任意一个子字符串,井号,'#' 表示任意多个子字符串,Producer 发送的 Routing Key 没有通配符,Message Queue 绑定的 Routing Key 可以有通配符

?3?Fanout Exchange:广播,没有 Routing Key,所有绑定到 Fanout Exchange 的 Message Queue 都能收到来自 Producer 发送的消息

Notifiy 与 oslo.messaging

oslo.messaging 库通过以下两种方式来完成项目各个服务进程之间的通信

?1?远程过程调用(Remote Procedure Call,RPC)通过远程过程调用,一个服务进程可以调用其他远程服务进程的方法,并且有两种方法:call 和 cast,通过 call 的方式调用,远程方法会被同步执行,调用者会被阻塞直到结果返回,通过 cast 的方式调用,远程方法会被异步执行,结果不会立即返回,调用者也不会被阻塞,但是调用者需要用其他方式查询这次远程调用的结果

?2?事件通知(Event Notification)某个服务进程可以把事件通知发送到消息总线上,该消息总线上所有对此类事件感兴趣的服务进程,都可以获得此事件通知并进行进一步的处理,处理的结果并不会发送给事件发送者,这种通信方式,不但可以在同一个项目内部的各个服务进程之间发送通知,还可以实现跨项目之间的通知发送,Ceilometer 就通过这种方式大量获取其他 OpenStack 项目的事件通知,从而进行计量和监控

编程实现

1.创建连接 Channel

2.创建队列,在队列中指明回调函数

3.创建回调函数

4.使用某一角色收发信息

关于回调函数,为了保证数据不被丢失,RabbitMQ 支持消息确认机制,为了保证数据能被正确处理而不仅仅是被 Consumer 收到,那么我们不能采用 no-ack,而应该是在处理完数据之后发送 ack

def process_media(body, message):

print(' ')

print(body)

message.ack()

OpenStack 组件发送消息的格式如下:

{

'message_id': six.text_type(uuid.uuid4()), #消息id号

'publisher_id': 'compute.hos1',#发送者id

'timestamp': timeutils.utcnow(),#时间戳

'priority': 'WARN',#通知优先级

'event_type': 'compute.create_instance', #通知类型

'payload': {'instance_id': 12, ...} #通知内容

}

使用 Python 的测试代码如下:

from kombu.entity import Queue

from kombu.messaging import Consumer

from kombu.connection import Connection

# 处理消息

def process_media(body, message):

print(' ')

print(body)

message.ack()

connection = Connection('amqp://openstack:123456@10.2.36.112:5672//')

queue = Queue('notifications.info', durable=False)

with connection.Consumer(queues=[queue], callbacks=[process_media]) as consumer:

while True:

# 等待消息传入

connection.drain_events()

临沂大学 山东 临沂 276000

*本文暂不支持打印功能

monitor