Python 队列(先进先出,缩写为FIFO)是一个基于列表的数据结构,允许在队列的末尾添加元素并在队列的前端移除元素。队列数据结构是很常见的,常用于编写一些高性能的并发程序。下面我们来详细讲解 Python 队列的使用方法。
创建队列
Python 的 queue 模块提供了一个队列类,我们可以使用它来创建队列。首先,我们需要导入 queue 模块:
import queue
然后,我们可以创建一个空队列:
q = queue.Queue()
上述代码创建了一个 FIFO 队列对象。需要注意的是,queue 模块也提供了 LIFO 队列对象(Stack)和优先队列对象(Priority Queue),它们在操作上有所不同。这里我们只讨论 FIFO 队列。
将元素加入队列
我们可以使用 put()
方法将元素加入队列。代码示例如下:
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
上述代码依次向队列中添加了数值为 1、2、3 的三个元素。
如果我们想要在添加元素时等待至少某个时间,可以使用 put()
方法的 block
和 timeout
选项。block
设置为 True 表示等待队列空间可用,而 timeout
表示等待的最长时间。如果在超过 timeout
的时间内队列仍然没有可用的空间,则会抛出 queue.Full
异常。
try:
q.put(4, block=True, timeout=1)
except queue.Full:
print('队列已满!')
上述代码尝试向队列中添加数值为 4 的元素。由于队列已经满了,会抛出 queue.Full
异常,并输出“队列已满!”。
从队列中取出元素
我们可以使用 get()
方法从队列中取出元素。代码示例如下:
q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
while not q.empty():
print(q.get())
上述代码依次从队列中取出元素并打印出来,输出结果为:
1
2
3
如果我们想要在取出元素时等待至少某个时间,可以使用 get()
方法的 block
和 timeout
选项。block
设置为 True 表示等待队列中有元素可取,而 timeout
表示等待的最长时间。如果在超过 timeout
的时间内队列中仍然没有元素可取,则会抛出 queue.Empty
异常。
try:
print(q.get(block=True, timeout=1))
except queue.Empty:
print('队列已空!')
上述代码尝试从队列中取出元素。由于队列已经空了,会抛出 queue.Empty
异常,并输出“队列已空!”。
至此,Python 队列的使用就讲解完成了。
下面是两个完整的示例,分别展示了如何在并发程序中使用 Python 队列:
示例 1:生产者-消费者模型
在生产者-消费者模型中,生产者不停地往队列中添加元素,而消费者不停地从队列中取出元素。下面的示例展示了如何使用 Python 队列实现生产者-消费者模型:
import queue
import threading
import time
q = queue.Queue()
class Producer(threading.Thread):
def run(self):
global q
count = 0
while True:
if q.qsize() < 1000:
q.put('骆昊 Python 教程 %s' % count)
print('生产者生产了骆昊 Python 教程 %s' % count)
count += 1
time.sleep(1)
class Consumer(threading.Thread):
def run(self):
global q
while True:
if q.qsize() > 100:
print('消费者消费了', q.get())
time.sleep(1)
if __name__ == '__main__':
p = Producer()
c = Consumer()
p.start()
c.start()
p.join()
c.join()
上述代码中,我们使用了两个线程来模拟生产者和消费者,它们不停地往队列中添加和移除元素。在实际场景中,我们可能需要使用更多的线程或协程来实现更高效的并发,但这个示例展示了如何使用 Python 队列来实现生产者-消费者模型。
示例 2:多线程下载
在多线程下载场景中,我们可以将下载任务分成若干个块,然后使用多个线程并发下载这些块。下面的示例展示了如何使用 Python 队列调度多个线程来下载文件:
import queue
import threading
import requests
# 下载文件块大小
CHUNK_SIZE = 1024
class Downloader(threading.Thread):
def __init__(self, url, start, end, file):
threading.Thread.__init__(self)
self.url = url
self.start = start
self.end = end
self.file = file
def run(self):
headers = {'Range': 'bytes=%d-%d' % (self.start, self.end)}
res = requests.get(self.url, headers=headers)
self.file.seek(self.start)
self.file.write(res.content)
class DownloadManager:
def __init__(self, url, num_threads=4):
self.url = url
self.num_threads = num_threads
self.file_size = int(requests.head(self.url).headers['Content-Length'])
self.file = None
def _gen_download_tasks(self):
tasks = []
chunk_size = self.file_size // self.num_threads
for i in range(self.num_threads - 1):
start = chunk_size * i
end = start + chunk_size
tasks.append((start, end))
tasks.append((chunk_size * (self.num_threads - 1), self.file_size))
return tasks
def download(self, filename):
self.file = open(filename, 'wb')
tasks = self._gen_download_tasks()
q = queue.Queue()
for task in tasks:
q.put(task)
for i in range(self.num_threads):
worker = Downloader(self.url, q.get(), q.get(), self.file)
worker.start()
worker.join()
if self.file.tell() == self.file_size:
print('文件下载成功!')
else:
print('文件下载失败!')
if __name__ == '__main__':
url = 'https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/Anaconda3-2021.05-Windows-x86_64.exe'
downloader = DownloadManager(url, num_threads=4)
downloader.download('Anaconda3-2021.05-Windows-x86_64.exe')
上述代码中,我们使用了一个类 DownloadManager
来管理整个下载流程。首先,我们取得下载文件的大小,并根据线程数将下载任务分成若干个块。然后,我们使用一个队列来调度多个下载线程,将每个下载任务放入队列中。最后,每个下载线程从队列中取出一个下载任务,向服务器请求对应的数据块,并将数据块写入本地文件中。当所有的下载线程都结束后,我们检查文件大小是否等于文件总大小,来判断文件是否下载成功。