异步调用与回调机制

提交任务的两种方式:

1. 同步调用:

提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一步代码,导致程序是串行执行。

来看一个拉粑粑的例子:

from concurrent.futures import ThreadPoolExecutor
import time, random


def la(name):
    print("%s 正在拉..." % name)
    time.sleep(random.randint(3, 5))  # 模拟拉的时间
    res = random.randint(7, 13) * "#"  # 模拟拉的数量,随机的"#"字符串
    return {"name": name, "res": res}


def weigh(shit):
    name = shit["name"]
    size = len(shit["res"])  # 获取粑粑的大小
    print("%s 拉了 %s kg" % (name, size))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)  # 定义一个池,可同时处理13个人一起拉

    shit1 = pool.submit(la, "alex").result()  # 提交alex去拉
    weigh(shit1)  # 将alex拉的结果去称重

    shit2 = pool.submit(la, "wupeiqi").result()
    weigh(shit2)

    shit3 = pool.submit(la, "yuanhao").result()
    weigh(shit3)

运行结果:

alex 正在拉...
alex 拉了 8 kg
wupeiqi 正在拉...
wupeiqi 拉了 7 kg
yuanhao 正在拉...
yuanhao 拉了 13 kg

可以看出:上面例子其实是在串行运行,并不高效,需要等待1个人拉了之后,下一个人才能够拉。

我们使用异步调用方式来改写一下。

 

2. 异步调用:

提交完任务后,不在原地等待任务执行完毕。

from concurrent.futures import ThreadPoolExecutor
import time, random


def la(name):
    print("%s 正在拉..." % name)
    time.sleep(random.randint(3, 5))  # 模拟拉的时间
    res = random.randint(7, 13) * "#"  # 模拟拉的数量,随机的"#"字符串
    # return {"name": name, "res": res}
    weigh({"name": name, "res": res})  # 谁先拉完,就将拉的结果提交称重


def weigh(shit):
    name = shit["name"]
    size = len(shit["res"])  # 获取粑粑的大小
    print("%s 拉了 %s kg" % (name, size))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)  # 定义一个池,可同时处理13个人一起拉

    pool.submit(la, "alex")  # 提交alex去拉
    pool.submit(la, "wupeiqi")  # 提交wupeiqi去拉
    pool.submit(la, "yuanhao")  # 提交yuanhao去拉

运行结果:

alex 正在拉...
wupeiqi 正在拉...
yuanhao 正在拉...
wupeiqi 拉了 8 kg  #wupeiqi先拉完进行称重有8kg
alex 拉了 12 kg #alex第二拉完,拉了12kg
yuanhao 拉了 12 kg #yuanhao第三拉完,也拉了12kg

可以看出:程序运行就同时提交3个人去拉,谁先拉完就去称重,然后打印称重结果,变的高效起来了。

 

由于提交拉的请求到池后,提交的就得不到结果了,所以在拉的函数里就直接将结果传给了称重函数,逻辑没有问题,但是耦合性变高了,不好,所以需要解耦,我们再来改一下,还是要实现谁先拉完谁就自动去称重,需要用到回调函数来实现。

from concurrent.futures import ThreadPoolExecutor
import time, random


def la(name):
    print("%s 正在拉..." % name)
    time.sleep(random.randint(3, 5))  # 模拟拉的时间
    res = random.randint(7, 13) * "#"  # 模拟拉的数量,随机的"#"字符串
    return {"name": name, "res": res}


def weigh(shit):
    shit = shit.result()  # 通过回调函数传进来的是一个future对象,需要通过result()获取到值
    name = shit["name"]
    size = len(shit["res"])  # 获取粑粑的大小
    print("%s 拉了 %s kg" % (name, size))


if __name__ == '__main__':
    pool = ThreadPoolExecutor(13)  # 定义一个池,可同时处理13个人一起拉

    pool.submit(la, "alex").add_done_callback(weigh)  # 提交alex去拉,拉完后自动调用weigh函数并将拉的结果传给weigh
    pool.submit(la, "wupeiqi").add_done_callback(weigh)  # 提交wupeiqi去拉,拉完后自动调用weigh函数并将拉的结果传给weigh
    pool.submit(la, "yuanhao").add_done_callback(weigh)  # 提交yuanhao去拉,拉完后自动调用weigh函数并将拉的结果传给weigh

运行结果:

alex 正在拉...
wupeiqi 正在拉...
yuanhao 正在拉...
wupeiqi 拉了 7 kg
yuanhao 拉了 9 kg
alex 拉了 13 kg

可以看出:运行结果和之前的一样,并且实现了解耦。

 

阻塞:指的是进程运行的一种状态,当进程运行时遇到IO了就会进入阻塞状态,会被剥夺走CPU的执行权限。

非阻塞:与阻塞相反。

 

同步不等于阻塞。因为如果提交的任务是一个纯计算的任务,就需要再原地等待,但是cpu一直在计算并没有阻塞。

 

 

版权声明:
作者:admin
链接:https://www.chenxie.net/archives/1945.html
来源:蜀小陈
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>