官网:https://docs.python.org/dev/library/concurrent.futures.html concurrent.futures 模块提供了高度封装的异步调用接口 ThreadPoolExecutor 线程池,提供异步调用 ProcessPoolExecutor 进程池,提供异步调用 Both implement the same interface, which is defined by the abstract Executor class.
1、submit(fn, *args, **kwargs) 异步提交任务 2、map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 3、shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前 4、result(timeout=None) 取得结果 5、add_done_callback(fn) 回调函数
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
from multiprocessing import Process
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor # 两个接口用法一模一样
import os, time, random
def task(name):
print("name: %s pid:%s run" % (name, os.getpid()))
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
# 以前的写法
# for i in range(10):
# p = Process(target=task, args=("egon",))
# p.start()
# print("主")
# 进程池的写法
pool = ProcessPoolExecutor(4) # 指定池最多装4个进程,不指定默认为cpu核心数
for i in range(10):
pool.submit(task, "egon%s" % i) # 异步调用:提交10个任务给池,提交后不在原地等待
主 name: egon0 pid:55492 run name: egon1 pid:55444 run name: egon2 pid:51764 run name: egon3 pid:50304 run name: egon4 pid:55444 run name: egon5 pid:50304 run name: egon6 pid:55492 run name: egon7 pid:55444 run name: egon8 pid:51764 run name: egon9 pid:55492 run
从结果可以看出:从始至终pid就只有4个,说明就只有这4个进程在工作,池里处理完一个,就马上处理等待的任务。并且先打印主进程信息“主”,说明poll.submit() 是异步提交的。
使用 pool.shutdown()
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor # 两个接口用法一模一样
import os, time, random
def task(name):
print("name: %s pid:%s run" % (name, os.getpid()))
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
pool = ProcessPoolExecutor(4) # 指定池最多装4个进程,不指定默认为cpu核心数
for i in range(10):
pool.submit(task, "egon%s" % i) # 异步调用:提交10个任务给池,提交后不在原地等待
pool.shutdown() # 把任务提交入口先关闭,默认参数为wait=True ,池实际上就是在维护一个计数器,关掉任务入口池才能获得准确的任务数,继而保证所有任务的已结束,然后运行主进程
name: egon0 pid:51476 run name: egon1 pid:59372 run name: egon2 pid:55708 run name: egon3 pid:46732 run name: egon4 pid:51476 run name: egon5 pid:59372 run name: egon6 pid:55708 run name: egon7 pid:46732 run name: egon8 pid:59372 run name: egon9 pid:46732 run 主
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
# 线程池
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor # 两个接口用法一模一样
from threading import currentThread
import os, time, random
def task():
print("name: %s pid:%s run" % (currentThread().getName(), os.getpid()))
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
pool = ThreadPoolExecutor(5)
for i in range(10):
name: ThreadPoolExecutor-0_0 pid:57440 run name: ThreadPoolExecutor-0_1 pid:57440 run name: ThreadPoolExecutor-0_2 pid:57440 run name: ThreadPoolExecutor-0_3 pid:57440 run name: ThreadPoolExecutor-0_4 pid:57440 run name: ThreadPoolExecutor-0_1 pid:57440 run name: ThreadPoolExecutor-0_0 pid:57440 run name: ThreadPoolExecutor-0_1 pid:57440 run name: ThreadPoolExecutor-0_4 pid:57440 run name: ThreadPoolExecutor-0_3 pid:57440 run 主
可以看到线程这里我们线程池设置了5个,从始至终就是那5个线程在运行,由于是线程,都在一个进程下,所以他们的pid都是一样的,这里同样用了pool.shutdown()等待线程池任务运行完了才运行的主线程,如果不用pool.shutdown()的话,你可能会看到 有一部分线程先运行,然后就运行主线程任务了,因为开的是线程,速度非常快,所以会出现这样的情况。
