首页 » 编程 » Python-7.并发编程 » 正文

进程池和线程池

在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:

服务的开启的进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,于是我们必须对服务端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途,例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制。

 

介绍

官网:https://docs.python.org/dev/library/concurrent.futures.html
concurrent.futures 模块提供了高度封装的异步调用接口
ThreadPoolExecutor 线程池,提供异步调用
ProcessPoolExecutor 进程池,提供异步调用
Both implement the same interface, which is defined by the abstract Executor class.

基本方法

1、submit(fn, *args, **kwargs)
异步提交任务

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循环submit的操作

3、shutdown(wait=True) 
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

4、result(timeout=None)
取得结果

5、add_done_callback(fn)
回调函数

 

什么时候用进程池:计算密集型

什么时候用线程池:IO密集型

进程池线程池本质还是在帮你开进程或线程。

为什么要用池:池对你的进程线程加以限制保证机器可以健康稳定的运行。

 

进程池:

介绍

The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.

from multiprocessing import Process
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor  # 两个接口用法一模一样
import os, time, random


def task(name):
    print("name: %s  pid:%s run" % (name, os.getpid()))
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    # 以前的写法
    # for i in range(10):
    #     p = Process(target=task, args=("egon",))
    #     p.start()
    # print("主")

    # 进程池的写法
    pool = ProcessPoolExecutor(4)  # 指定池最多装4个进程,不指定默认为cpu核心数
    for i in range(10):
        pool.submit(task, "egon%s" % i)  # 异步调用:提交10个任务给池,提交后不在原地等待
    print("主")

运行结果:

主
name: egon0 pid:55492 run
name: egon1 pid:55444 run
name: egon2 pid:51764 run
name: egon3 pid:50304 run

name: egon4 pid:55444 run
name: egon5 pid:50304 run

name: egon6 pid:55492 run
name: egon7 pid:55444 run
name: egon8 pid:51764 run

name: egon9 pid:55492 run

从结果可以看出:从始至终pid就只有4个,说明就只有这4个进程在工作,池里处理完一个,就马上处理等待的任务。并且先打印主进程信息“主”,说明poll.submit() 是异步提交的。

 

主进程想等待所有进程池里的任务都运行完了在执行主进程,相当于执行一个join操作,怎么做呢?

使用 pool.shutdown()

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor  # 两个接口用法一模一样
import os, time, random


def task(name):
    print("name: %s  pid:%s run" % (name, os.getpid()))
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    pool = ProcessPoolExecutor(4)  # 指定池最多装4个进程,不指定默认为cpu核心数
    for i in range(10):
        pool.submit(task, "egon%s" % i)  # 异步调用:提交10个任务给池,提交后不在原地等待
    pool.shutdown()  # 把任务提交入口先关闭,默认参数为wait=True ,池实际上就是在维护一个计数器,关掉任务入口池才能获得准确的任务数,继而保证所有任务的已结束,然后运行主进程
    print("主")

运行结果:

name: egon0 pid:51476 run
name: egon1 pid:59372 run
name: egon2 pid:55708 run
name: egon3 pid:46732 run
name: egon4 pid:51476 run
name: egon5 pid:59372 run
name: egon6 pid:55708 run
name: egon7 pid:46732 run
name: egon8 pid:59372 run
name: egon9 pid:46732 run
主

可以看到主进程打印信息在最后打印的,池里的任务全部处理完了。

 

线程池:

介绍

ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=”)
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.

Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.

New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.

用法和进程池用法一模一样,把ProcessPoolExecutor换成ThreadPoolExecutor,其余用法全部相同。

# 线程池
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor  # 两个接口用法一模一样
from threading import currentThread
import os, time, random


def task():
    print("name: %s  pid:%s run" % (currentThread().getName(), os.getpid()))
    time.sleep(random.randint(1, 3))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(5)
    for i in range(10):
        pool.submit(task)
    pool.shutdown()
    print("主")

运行结果:

name: ThreadPoolExecutor-0_0 pid:57440 run
name: ThreadPoolExecutor-0_1 pid:57440 run
name: ThreadPoolExecutor-0_2 pid:57440 run
name: ThreadPoolExecutor-0_3 pid:57440 run
name: ThreadPoolExecutor-0_4 pid:57440 run
name: ThreadPoolExecutor-0_1 pid:57440 run
name: ThreadPoolExecutor-0_0 pid:57440 run
name: ThreadPoolExecutor-0_1 pid:57440 run
name: ThreadPoolExecutor-0_4 pid:57440 run
name: ThreadPoolExecutor-0_3 pid:57440 run
主

可以看到线程这里我们线程池设置了5个,从始至终就是那5个线程在运行,由于是线程,都在一个进程下,所以他们的pid都是一样的,这里同样用了pool.shutdown()等待线程池任务运行完了才运行的主线程,如果不用pool.shutdown()的话,你可能会看到 有一部分线程先运行,然后就运行主线程任务了,因为开的是线程,速度非常快,所以会出现这样的情况。

 

发表评论

*