详解Python 队列(先进先出)

  • Post category:Python

Python 队列(先进先出,缩写为FIFO)是一个基于列表的数据结构,允许在队列的末尾添加元素并在队列的前端移除元素。队列数据结构是很常见的,常用于编写一些高性能的并发程序。下面我们来详细讲解 Python 队列的使用方法。

创建队列

Python 的 queue 模块提供了一个队列类,我们可以使用它来创建队列。首先,我们需要导入 queue 模块:

import queue

然后,我们可以创建一个空队列:

q = queue.Queue()

上述代码创建了一个 FIFO 队列对象。需要注意的是,queue 模块也提供了 LIFO 队列对象(Stack)和优先队列对象(Priority Queue),它们在操作上有所不同。这里我们只讨论 FIFO 队列。

将元素加入队列

我们可以使用 put() 方法将元素加入队列。代码示例如下:

q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)

上述代码依次向队列中添加了数值为 1、2、3 的三个元素。

如果我们想要在添加元素时等待至少某个时间,可以使用 put() 方法的 blocktimeout 选项。block 设置为 True 表示等待队列空间可用,而 timeout 表示等待的最长时间。如果在超过 timeout 的时间内队列仍然没有可用的空间,则会抛出 queue.Full 异常。

try:
    q.put(4, block=True, timeout=1)
except queue.Full:
    print('队列已满!')

上述代码尝试向队列中添加数值为 4 的元素。由于队列已经满了,会抛出 queue.Full 异常,并输出“队列已满!”。

从队列中取出元素

我们可以使用 get() 方法从队列中取出元素。代码示例如下:

q = queue.Queue()
q.put(1)
q.put(2)
q.put(3)
while not q.empty():
    print(q.get())

上述代码依次从队列中取出元素并打印出来,输出结果为:

1
2
3

如果我们想要在取出元素时等待至少某个时间,可以使用 get() 方法的 blocktimeout 选项。block 设置为 True 表示等待队列中有元素可取,而 timeout 表示等待的最长时间。如果在超过 timeout 的时间内队列中仍然没有元素可取,则会抛出 queue.Empty 异常。

try:
    print(q.get(block=True, timeout=1))
except queue.Empty:
    print('队列已空!')

上述代码尝试从队列中取出元素。由于队列已经空了,会抛出 queue.Empty 异常,并输出“队列已空!”。

至此,Python 队列的使用就讲解完成了。

下面是两个完整的示例,分别展示了如何在并发程序中使用 Python 队列:

示例 1:生产者-消费者模型

在生产者-消费者模型中,生产者不停地往队列中添加元素,而消费者不停地从队列中取出元素。下面的示例展示了如何使用 Python 队列实现生产者-消费者模型:

import queue
import threading
import time

q = queue.Queue()

class Producer(threading.Thread):
    def run(self):
        global q
        count = 0
        while True:
            if q.qsize() < 1000:
                q.put('骆昊 Python 教程 %s' % count)
                print('生产者生产了骆昊 Python 教程 %s' % count)
                count += 1
            time.sleep(1)

class Consumer(threading.Thread):
    def run(self):
        global q
        while True:
            if q.qsize() > 100:
                print('消费者消费了', q.get())
            time.sleep(1)

if __name__ == '__main__':
    p = Producer()
    c = Consumer()
    p.start()
    c.start()
    p.join()
    c.join()

上述代码中,我们使用了两个线程来模拟生产者和消费者,它们不停地往队列中添加和移除元素。在实际场景中,我们可能需要使用更多的线程或协程来实现更高效的并发,但这个示例展示了如何使用 Python 队列来实现生产者-消费者模型。

示例 2:多线程下载

在多线程下载场景中,我们可以将下载任务分成若干个块,然后使用多个线程并发下载这些块。下面的示例展示了如何使用 Python 队列调度多个线程来下载文件:

import queue
import threading
import requests

# 下载文件块大小
CHUNK_SIZE = 1024

class Downloader(threading.Thread):
    def __init__(self, url, start, end, file):
        threading.Thread.__init__(self)
        self.url = url
        self.start = start
        self.end = end
        self.file = file

    def run(self):
        headers = {'Range': 'bytes=%d-%d' % (self.start, self.end)}
        res = requests.get(self.url, headers=headers)
        self.file.seek(self.start)
        self.file.write(res.content)

class DownloadManager:
    def __init__(self, url, num_threads=4):
        self.url = url
        self.num_threads = num_threads
        self.file_size = int(requests.head(self.url).headers['Content-Length'])
        self.file = None

    def _gen_download_tasks(self):
        tasks = []
        chunk_size = self.file_size // self.num_threads
        for i in range(self.num_threads - 1):
            start = chunk_size * i
            end = start + chunk_size
            tasks.append((start, end))
        tasks.append((chunk_size * (self.num_threads - 1), self.file_size))
        return tasks

    def download(self, filename):
        self.file = open(filename, 'wb')
        tasks = self._gen_download_tasks()
        q = queue.Queue()
        for task in tasks:
            q.put(task)
        for i in range(self.num_threads):
            worker = Downloader(self.url, q.get(), q.get(), self.file)
            worker.start()
            worker.join()
        if self.file.tell() == self.file_size:
            print('文件下载成功!')
        else:
            print('文件下载失败!')

if __name__ == '__main__':
    url = 'https://mirrors.tuna.tsinghua.edu.cn/anaconda/archive/Anaconda3-2021.05-Windows-x86_64.exe'
    downloader = DownloadManager(url, num_threads=4)
    downloader.download('Anaconda3-2021.05-Windows-x86_64.exe')

上述代码中,我们使用了一个类 DownloadManager 来管理整个下载流程。首先,我们取得下载文件的大小,并根据线程数将下载任务分成若干个块。然后,我们使用一个队列来调度多个下载线程,将每个下载任务放入队列中。最后,每个下载线程从队列中取出一个下载任务,向服务器请求对应的数据块,并将数据块写入本地文件中。当所有的下载线程都结束后,我们检查文件大小是否等于文件总大小,来判断文件是否下载成功。