rabbitmq之五种消息模型

  • Post category:other

RabbitMQ 之五种消息模型

RabbitMQ 是一个开源的消息代理,它实现了 AMQP(高级消息队列协议)标准。RabbitMQ 支持五种消息模型,分别是简单模式、工作队列模式、发布/订阅模式、路由模式和主题模式。

简单模式

简单模式是 RabbitMQ 最简单的消息模型,也是最常用的模型。在简单模式中,生产者将消息发送到队列中,消费者从队列中接收消息。一个生产者可以向多个消费者发送消息,但是只有一个消费者可以接收到消息。

以下是一个简单模式的示例:

import pika

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='hello')

# 发送消息
channel.basic_publish(exchange='', routing_key='hello', body='Hello, World!')

print(" [x] Sent 'Hello, World!'")

# 关闭连接
connection.close()
import pika

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='hello')

# 定义回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 接收消息
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上面的示例中,生产者使用 basic_publish() 方法将消息发送到名为 “hello” 的队列中。消费者使用 basic_consume() 方法从队列中接收消息,并使用回调函数处理消息。

工作队列模式

工作队列模式也称为任务队列模式。在工作队列模式中,多个消费者从同一个队列中接收消息。每个消息只能被一个消费者处理。当有多个消费者时,RabbitMQ 将消息平均分配给每个消费者。

以下是一个工作队列模式的示例:

import pika
import time

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='task_queue', durable=True)

# 发送消息
message = ' '.join(sys.argv[1:]) or "Hello, World!"
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties(delivery_mode=2))

print(" [x] Sent %r" % message)

# 关闭连接
connection.close()
import pika
import time

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建队列
channel.queue_declare(queue='task_queue', durable=True)

# 定义回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 设置每个消费者一次只接收一个消息
channel.basic_qos(prefetch_count=1)

# 接收消息
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上面的示例中,生产者使用 basic_publish() 方法将消息发送到名为 “task_queue” 的队列中,并设置消息持久化。消费者使用 basic_qos() 方法设置每个消费者一次只接收一个消息,并使用 basic_ack() 方法确认消息已被处理。

发布/订阅模式

发布/订阅模式也称为广播模式。在发布/订阅模式中,生产者将消息发送到交换机中,交换机将消息广播给所有与之绑定的队列。每个队列都有自己的消费者,它们从队列中接收消息。

以下是一个发布/订阅模式的示例:

import pika

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 发送消息
message = ' '.join(sys.argv[1:]) or "Hello, World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)

print(" [x] Sent %r" % message)

# 关闭连接
connection.close()
import pika

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建交换机
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 创建队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 绑定队列到交换机
channel.queue_bind(exchange='logs', queue=queue_name)

# 定义回调函数
def callback(ch, method, properties, body):
    print(" [x] %r" % body)

# 接收消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上面的示例中,生产者使用 basic_publish() 方法将消息发送到名为 “logs” 的交换机中。消费者使用 queue_bind() 方法将队列绑定到交换机上,并使用 basic_consume() 方法从队列中接收消息。

路由模式

路由模式也称为直连模式。在路由模式中,生产者将消息发送到交换机中,并指定一个路由键。交换机将消息发送到与之绑定的队列中,但只有与指定路由键相同的队列才会接收到消息。

以下是一个路由模式的示例:

import pika

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 发送消息
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or "Hello, World!"
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)

print(" [x] Sent %r:%r" % (severity, message))

# 关闭连接
connection.close()
import pika

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建交换机
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

# 创建队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 绑定队列到交换机
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

# 定义回调函数
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

# 接收消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上面的示例中,生产者使用 basic_publish() 方法将消息发送到名为 “direct_logs” 的交换机中,并指定一个路由键。消费者使用 queue_bind() 方法将队列绑定到交换机上,并指定一个路由键,只有与指定路由键相同的消息才会被接收。

主题模式

主题模式也称为通配符模式。在主题模式中,生产者将消息发送到交换机中,并指定一个主题。交换机将消息发送到与之绑定的队列中,但只有与指定主题相匹配的队列才会接收到消息。主题可以使用通配符 “*” 和 “#” 进行匹配。

以下是一个主题模式的示例:

import pika

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 发送消息
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or "Hello, World!"
channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message)

print(" [x] Sent %r:%r" % (routing_key, message))

# 关闭连接
connection.close()
import pika

# 连接 RabbitMQ 服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 创建交换机
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

# 创建队列
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# 绑定队列到交换机
binding_keys = sys.argv[1:]
if not binding_keys:
    print("Usage: %s [binding_key]..." % (sys.argv[0],))
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

# 定义回调函数
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

# 接收消息
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

在上面的示例中,生产者使用 basic_publish() 方法将消息发送到名为 “topic_logs” 的交换机中,并指定一个主题。消费者使用 queue_bind() 方法将队列绑定到交换机上,并指定一个或多个主题,只有与指定主题相匹配的消息才会被接收。