首页 » 编程 » Python » Python学习 » 网络编程进阶 » 正文

生产者消费者模型

为什么要使用生产者消费者模型

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者和消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。

 

来看例子:

例子说明:本示例模拟了生产者(厨师)生产包子,给顾客吃包子的过程,模拟了3个厨师同时生产包子,每个厨师生产5个,每个包子生产时间是0.5秒,生产好之后放到队列里,然后模拟了2个顾客吃包子,每次都总队列里拿包子,吃每个包子需要花费1秒的时间。

为什么要给p1, p2, p3加上join,是为了保证所有的包子都已经生产完成放到队列里了。

为什么要加q.put(None)  None这里把它当做一个信号,当所有的包子都生产完成了之后就在队列里最后加上这个信号,当顾客从队列里拿到这个信号的时候,就表示已经没有包子了,进程退出

为什么要加两个 q.put(None)  因为这里模拟2个顾客,如果只有1个None信号,但顾客1拿到第一个None之后,他知道没有了就会退出进程,但是顾客2就拿不到了,所以就会一直在哪里等待 就会导致程序卡死,所以需要2个信号,有多少个顾客就需要多少个信号。

from multiprocessing import Process, Queue
import time


def producer(q):
    for i in range(5):  # 模拟5包子
        res = "包子%s" % i
        time.sleep(0.5)
        print("生产者生产了%s" % res)
        q.put(res)


def consumer(q):
    while True:
        res = q.get()
        if res is None: break  # 如果消费者得到了None表示队列里没有包子了,直接退出
        time.sleep(1)
        print("消费者吃了%s" % res)


if __name__ == '__main__':
    # 容器
    q = Queue()

    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()  # 保证生产者生产完成
    p2.join()
    p3.join()
    q.put(None)  # 当生产者生产完成时,在队列里追加一个None信号表示完成了
    q.put(None)  # 有多少个消费者就需要put多少个None信号

    print("主")

运行输出:

生产者生产了包子0
生产者生产了包子0
生产者生产了包子0
生产者生产了包子1
生产者生产了包子1
生产者生产了包子1
消费者吃了包子0
生产者生产了包子2
消费者吃了包子0
生产者生产了包子2
生产者生产了包子2
生产者生产了包子3
生产者生产了包子3
生产者生产了包子3
消费者吃了包子0
生产者生产了包子4
消费者吃了包子1
生产者生产了包子4
生产者生产了包子4
消费者吃了包子1
消费者吃了包子1
消费者吃了包子2
消费者吃了包子2
消费者吃了包子2
消费者吃了包子3

消费者吃了包子3
消费者吃了包子3
消费者吃了包子4
消费者吃了包子4
消费者吃了包子4

 

JoinableQueue的使用

上面的示例其实比较low,有多少个消费者就要发送多少个信号,我们可以使用JoinableQueue来改写一下:

from multiprocessing import Process, JoinableQueue
import time


def producer(q):
    for i in range(2):  # 模拟10厨师
        res = "包子%s" % i
        time.sleep(0.5)
        print("生产者生产了%s" % res)
        q.put(res)
    q.join()  # 等待消费者把包子都取走了之后再结束


def consumer(q):
    while True:
        res = q.get()
        if res is None: break  # 如果消费者得到了None表示队列里没有包子了,直接退出
        time.sleep(1)
        print("消费者吃了%s" % res)
        q.task_done()  # 给队列发送信号告诉队列已经吃了1个了


if __name__ == '__main__':
    # 容器
    q = JoinableQueue()

    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True  # 吃完后就c1进程就没有必要存在了,随父进程一起死
    c2.daemon = True  # 吃完后就c1进程就没有必要存在了,随父进程一起死

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()  # 保证生产者生产完成
    p2.join()
    p3.join()
    # q.put(None)  # 当生产者生产完成时,在队列里追加一个None信号表示完成了
    # q.put(None)  # 有多少个消费者就需要put多少个None信号

    print("主")

运行输出:

生产者生产了包子0
生产者生产了包子0
生产者生产了包子0
生产者生产了包子1
生产者生产了包子1
生产者生产了包子1
消费者吃了包子0
消费者吃了包子0
消费者吃了包子0
消费者吃了包子1
消费者吃了包子1
消费者吃了包子1

 

 

发表评论

*