一.发布hello world
首先我们看一个最简单的消息队列系统
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 """ 4 @author: zengchunyun 5 """ 6 import pika 7 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) 10 channel = connection.channel() 11 12 channel.queue_declare(queue="hello") # 在发送之前,我们要确保这个队列存在,如果我们发送一个消息到不存在的队列,rabbitmq会认为这个是垃圾消息 13 # 我们已经创建了一个队列名为hello,待会我们的消息都会发送到这个队列 14 15 16 # rabbitmq不允许我们直接将消息发送到队列,而是通过一个交换器,现在我们使用一个特殊对交换器,它是一个空对字符串标识对交换器,它能确保我们对消息该放到哪个 17 # 队列,这个队列需要特殊对rouning_key 18 channel.basic_publish(exchange="", 19 routing_key="hello", 20 body="hello world",) 21 22 print(" [x] Sent 'hello world") 23 24 # 在退出之前,我们需要确保网络缓冲区已经清空,且我们对消息的确已经发送到rabbitMQ,我们可以关闭这个连接 25 connection.close()
发布者
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 """ 4 @author: zengchunyun 5 """ 6 import pika 7 import time 8 9 connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost")) 10 channel = connection.channel() 11 12 channel.queue_declare(queue="hello") # 为了确保每次订阅的队列都存在,我们先声明一个队列 13 14 # 接收队列消息需要通过回调函数来接收 15 16 17 def callback(ch, method, properties, body): 18 print(" [x] Received %r" % body) 19 time.sleep(3) 20 ch.basic_ack(delivery_tag = method.delivery_tag) 21 22 channel.basic_consume(callback, 23 queue="hello", 24 no_ack=True) # 这里我们需要告诉rabbitMQ这个回调函数会从我们的hello队列接收消息,关闭消息确认标记, 25 # 那么当worker工作中异常,如没有完成任务就关闭了连接,可能会丢失任务,使用no_ack=True默认只要rabbit分配任务给该worker了,就会将任务从队列删除 26 27 print(" [*] Waiting for messages. To exit press CTRL+C") 28 channel.start_consuming() # 我们这里进入了一个永不终止的循环等待数据状态
订阅者
上面这个系统只是单一的消息队列,