RabbitMQ 之五种消息模型
RabbitMQ 是一个开源的消息代理,它实现了 AMQP(高级消息队列协议)标准。RabbitMQ 支持五种消息模型,分别是简单模式、工作队列模式、发布/订阅模式、路由模式和主题模式。
简单模式
简单模式是 RabbitMQ 最简单的消息模型,也是最常用的模型。在简单模式中,生产者将消息发送到队列中,消费者从队列中接收消息。一个生产者可以向多个消费者发送消息,但是只有一个消费者可以接收到消息。
以下是一个简单模式的示例:
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='hello')
# 发送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello, World!')
print(" [x] Sent 'Hello, World!'")
# 关闭连接
connection.close()
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='hello')
# 定义回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# 接收消息
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上面的示例中,生产者使用 basic_publish()
方法将消息发送到名为 “hello” 的队列中。消费者使用 basic_consume()
方法从队列中接收消息,并使用回调函数处理消息。
工作队列模式
工作队列模式也称为任务队列模式。在工作队列模式中,多个消费者从同一个队列中接收消息。每个消息只能被一个消费者处理。当有多个消费者时,RabbitMQ 将消息平均分配给每个消费者。
以下是一个工作队列模式的示例:
import pika
import time
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='task_queue', durable=True)
# 发送消息
message = ' '.join(sys.argv[1:]) or "Hello, World!"
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))
print(" [x] Sent %r" % message)
# 关闭连接
connection.close()
import pika
import time
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建队列
channel.queue_declare(queue='task_queue', durable=True)
# 定义回调函数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
# 设置每个消费者一次只接收一个消息
channel.basic_qos(prefetch_count=1)
# 接收消息
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上面的示例中,生产者使用 basic_publish()
方法将消息发送到名为 “task_queue” 的队列中,并设置消息持久化。消费者使用 basic_qos()
方法设置每个消费者一次只接收一个消息,并使用 basic_ack()
方法确认消息已被处理。
发布/订阅模式
发布/订阅模式也称为广播模式。在发布/订阅模式中,生产者将消息发送到交换机中,交换机将消息广播给所有与之绑定的队列。每个队列都有自己的消费者,它们从队列中接收消息。
以下是一个发布/订阅模式的示例:
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 发送消息
message = ' '.join(sys.argv[1:]) or "Hello, World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
# 关闭连接
connection.close()
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')
# 创建队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 绑定队列到交换机
channel.queue_bind(exchange='logs', queue=queue_name)
# 定义回调函数
def callback(ch, method, properties, body):
print(" [x] %r" % body)
# 接收消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上面的示例中,生产者使用 basic_publish()
方法将消息发送到名为 “logs” 的交换机中。消费者使用 queue_bind()
方法将队列绑定到交换机上,并使用 basic_consume()
方法从队列中接收消息。
路由模式
路由模式也称为直连模式。在路由模式中,生产者将消息发送到交换机中,并指定一个路由键。交换机将消息发送到与之绑定的队列中,但只有与指定路由键相同的队列才会接收到消息。
以下是一个路由模式的示例:
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 发送消息
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or "Hello, World!"
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
# 关闭连接
connection.close()
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
# 创建队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 绑定队列到交换机
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
# 定义回调函数
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
# 接收消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上面的示例中,生产者使用 basic_publish()
方法将消息发送到名为 “direct_logs” 的交换机中,并指定一个路由键。消费者使用 queue_bind()
方法将队列绑定到交换机上,并指定一个路由键,只有与指定路由键相同的消息才会被接收。
主题模式
主题模式也称为通配符模式。在主题模式中,生产者将消息发送到交换机中,并指定一个主题。交换机将消息发送到与之绑定的队列中,但只有与指定主题相匹配的队列才会接收到消息。主题可以使用通配符 “*” 和 “#” 进行匹配。
以下是一个主题模式的示例:
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 发送消息
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or "Hello, World!"
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
# 关闭连接
connection.close()
import pika
# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 创建交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
# 创建队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 绑定队列到交换机
binding_keys = sys.argv[1:]
if not binding_keys:
print("Usage: %s [binding_key]..." % (sys.argv[0],))
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)
# 定义回调函数
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
# 接收消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
在上面的示例中,生产者使用 basic_publish()
方法将消息发送到名为 “topic_logs” 的交换机中,并指定一个主题。消费者使用 queue_bind()
方法将队列绑定到交换机上,并指定一个或多个主题,只有与指定主题相匹配的消息才会被接收。