多进程-生产者消费者模型,JoinableQueue的使用

一、生产者消费者模型

为什么要使用生产者消费者模型

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者和消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。

 

来看例子:

例子说明:本示例模拟了生产者(厨师)生产包子,给顾客吃包子的过程,模拟了3个厨师同时生产包子,每个厨师生产5个,每个包子生产时间是0.5秒,生产好之后放到队列里,然后模拟了2个顾客吃包子,每次都总队列里拿包子,吃每个包子需要花费1秒的时间。

为什么要给p1, p2, p3加上join,是为了保证所有的包子都已经生产完成放到队列里了。

为什么要加q.put(None)  None这里把它当做一个信号,当所有的包子都生产完成了之后就在队列里最后加上这个信号,当顾客从队列里拿到这个信号的时候,就表示已经没有包子了,进程退出

为什么要加两个 q.put(None)  因为这里模拟2个顾客,如果只有1个None信号,但顾客1拿到第一个None之后,他知道没有了就会退出进程,但是顾客2就拿不到了,所以就会一直在哪里等待 就会导致程序卡死,所以需要2个信号,有多少个顾客就需要多少个信号。

from multiprocessing import Process, Queue
import time


def producer(q):
    for i in range(5):  # 模拟5包子
        res = "包子%s" % i
        time.sleep(0.5)
        print("生产者生产了%s" % res)
        q.put(res)


def consumer(q):
    while True:
        res = q.get()
        if res is None: break  # 如果消费者得到了None表示队列里没有包子了,直接退出
        time.sleep(1)
        print("消费者吃了%s" % res)


if __name__ == '__main__':
    # 容器
    q = Queue()

    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()  # 保证生产者生产完成
    p2.join()
    p3.join()
    q.put(None)  # 当生产者生产完成时,在队列里追加一个None信号表示完成了
    q.put(None)  # 有多少个消费者就需要put多少个None信号

    print("主")

运行输出:

生产者生产了包子0
生产者生产了包子0
生产者生产了包子0
生产者生产了包子1
生产者生产了包子1
生产者生产了包子1
消费者吃了包子0
生产者生产了包子2
消费者吃了包子0
生产者生产了包子2
生产者生产了包子2
生产者生产了包子3
生产者生产了包子3
生产者生产了包子3
消费者吃了包子0
生产者生产了包子4
消费者吃了包子1
生产者生产了包子4
生产者生产了包子4
消费者吃了包子1
消费者吃了包子1
消费者吃了包子2
消费者吃了包子2
消费者吃了包子2
消费者吃了包子3
主
消费者吃了包子3
消费者吃了包子3
消费者吃了包子4
消费者吃了包子4
消费者吃了包子4

 

二、JoinableQueue的使用

这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

方法介绍:

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

 

上面的示例其实比较low,有多少个消费者就要发送多少个信号,我们可以使用JoinableQueue来改写一下:

from multiprocessing import Process, JoinableQueue
import time


def producer(q):
    for i in range(2):  # 模拟10厨师
        res = "包子%s" % i
        time.sleep(0.5)
        print("生产者生产了%s" % res)
        q.put(res)
    q.join()  # 等待消费者把包子都取走了之后再结束


def consumer(q):
    while True:
        res = q.get()
        if res is None: break  # 如果消费者得到了None表示队列里没有包子了,直接退出
        time.sleep(1)
        print("消费者吃了%s" % res)
        q.task_done()  # 给队列发送信号告诉队列已经吃了1个了


if __name__ == '__main__':
    # 容器
    q = JoinableQueue()

    # 生产者们
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))
    c1.daemon = True  # 吃完后就c1进程就没有必要存在了,随父进程一起死
    c2.daemon = True  # 吃完后就c1进程就没有必要存在了,随父进程一起死

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()  # 保证生产者生产完成
    p2.join()
    p3.join()
    #1、主进程等生产者p1、p2、p3结束
    #2、而p1、p2、p3是在消费者把所有数据都取干净之后才会结束
    #3、所以一旦p1、p2、p3结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将生产者们设置成守护进程

    print("主")

运行输出:

生产者生产了包子0
生产者生产了包子0
生产者生产了包子0
生产者生产了包子1
生产者生产了包子1
生产者生产了包子1
消费者吃了包子0
消费者吃了包子0
消费者吃了包子0
消费者吃了包子1
消费者吃了包子1
消费者吃了包子1
主

 

三、生产者消费者模型总结

1、程序中有两类角色

一类负责生产数据(生产者)
一类负责处理数据(消费者)

2、引入生产者消费者模型为了解决的问题是

平衡生产者与消费者之间的速度差
程序解开耦合

3、如何实现生产者消费者模型

生产者<--->队列<--->消费者

 

版权声明:
作者:admin
链接:https://www.chenxie.net/archives/1910.html
来源:蜀小陈
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>