异步调用与回调机制
提交任务的两种方式:
1. 同步调用:
提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一步代码,导致程序是串行执行。
来看一个拉粑粑的例子:
from concurrent.futures import ThreadPoolExecutor
import time, random
def la(name):
print("%s 正在拉..." % name)
time.sleep(random.randint(3, 5)) # 模拟拉的时间
res = random.randint(7, 13) * "#" # 模拟拉的数量,随机的"#"字符串
return {"name": name, "res": res}
def weigh(shit):
name = shit["name"]
size = len(shit["res"]) # 获取粑粑的大小
print("%s 拉了 %s kg" % (name, size))
if __name__ == '__main__':
pool = ThreadPoolExecutor(13) # 定义一个池,可同时处理13个人一起拉
shit1 = pool.submit(la, "alex").result() # 提交alex去拉
weigh(shit1) # 将alex拉的结果去称重
shit2 = pool.submit(la, "wupeiqi").result()
weigh(shit2)
shit3 = pool.submit(la, "yuanhao").result()
weigh(shit3)
运行结果:
alex 正在拉... alex 拉了 8 kg wupeiqi 正在拉... wupeiqi 拉了 7 kg yuanhao 正在拉... yuanhao 拉了 13 kg
可以看出:上面例子其实是在串行运行,并不高效,需要等待1个人拉了之后,下一个人才能够拉。
我们使用异步调用方式来改写一下。
2. 异步调用:
提交完任务后,不在原地等待任务执行完毕。
from concurrent.futures import ThreadPoolExecutor
import time, random
def la(name):
print("%s 正在拉..." % name)
time.sleep(random.randint(3, 5)) # 模拟拉的时间
res = random.randint(7, 13) * "#" # 模拟拉的数量,随机的"#"字符串
# return {"name": name, "res": res}
weigh({"name": name, "res": res}) # 谁先拉完,就将拉的结果提交称重
def weigh(shit):
name = shit["name"]
size = len(shit["res"]) # 获取粑粑的大小
print("%s 拉了 %s kg" % (name, size))
if __name__ == '__main__':
pool = ThreadPoolExecutor(13) # 定义一个池,可同时处理13个人一起拉
pool.submit(la, "alex") # 提交alex去拉
pool.submit(la, "wupeiqi") # 提交wupeiqi去拉
pool.submit(la, "yuanhao") # 提交yuanhao去拉
运行结果:
alex 正在拉... wupeiqi 正在拉... yuanhao 正在拉... wupeiqi 拉了 8 kg #wupeiqi先拉完进行称重有8kg alex 拉了 12 kg #alex第二拉完,拉了12kg yuanhao 拉了 12 kg #yuanhao第三拉完,也拉了12kg
可以看出:程序运行就同时提交3个人去拉,谁先拉完就去称重,然后打印称重结果,变的高效起来了。
由于提交拉的请求到池后,提交的就得不到结果了,所以在拉的函数里就直接将结果传给了称重函数,逻辑没有问题,但是耦合性变高了,不好,所以需要解耦,我们再来改一下,还是要实现谁先拉完谁就自动去称重,需要用到回调函数来实现。
from concurrent.futures import ThreadPoolExecutor
import time, random
def la(name):
print("%s 正在拉..." % name)
time.sleep(random.randint(3, 5)) # 模拟拉的时间
res = random.randint(7, 13) * "#" # 模拟拉的数量,随机的"#"字符串
return {"name": name, "res": res}
def weigh(shit):
shit = shit.result() # 通过回调函数传进来的是一个future对象,需要通过result()获取到值
name = shit["name"]
size = len(shit["res"]) # 获取粑粑的大小
print("%s 拉了 %s kg" % (name, size))
if __name__ == '__main__':
pool = ThreadPoolExecutor(13) # 定义一个池,可同时处理13个人一起拉
pool.submit(la, "alex").add_done_callback(weigh) # 提交alex去拉,拉完后自动调用weigh函数并将拉的结果传给weigh
pool.submit(la, "wupeiqi").add_done_callback(weigh) # 提交wupeiqi去拉,拉完后自动调用weigh函数并将拉的结果传给weigh
pool.submit(la, "yuanhao").add_done_callback(weigh) # 提交yuanhao去拉,拉完后自动调用weigh函数并将拉的结果传给weigh
运行结果:
alex 正在拉... wupeiqi 正在拉... yuanhao 正在拉... wupeiqi 拉了 7 kg yuanhao 拉了 9 kg alex 拉了 13 kg
可以看出:运行结果和之前的一样,并且实现了解耦。
阻塞:指的是进程运行的一种状态,当进程运行时遇到IO了就会进入阻塞状态,会被剥夺走CPU的执行权限。
非阻塞:与阻塞相反。
同步不等于阻塞。因为如果提交的任务是一个纯计算的任务,就需要再原地等待,但是cpu一直在计算并没有阻塞。
共有 0 条评论