多进程-互斥锁

一、什么是互斥锁?

就是把多个进程并发修改同一块共享数据的操作变成串行,此举会牺牲效率,但是保证了数据安全。

举个例子,在合租房里,公共使用的卫生间,有同时几个人都要上卫生间,但是卫生间不可能同时给几个人使用,是要看谁先抢到把门锁上就谁使用,等使用完了打开锁之后下一个人才能都继续使用。

在程序上就是,当几个任务并发的时候,由于共享的是同一个操作系统,或者说像打印这类的请求,共享的就是同一个终端,共享就带来了竞争,竞争就导致错乱了,所以引入互斥锁来保证输出的正确,而不会错乱,当然这回导致效率的降低,来看下面的例子。

from multiprocessing import Process
import time


def task(name):
    print("%s 1" % name)
    time.sleep(1)
    print("%s 2" % name)
    time.sleep(1)
    print("%s 3" % name)
    time.sleep(1)


if __name__ == '__main__':
    for i in range(3):
        p = Process(target=task, args=("进程%s" % i,))
        p.start()

程序同时启动3个并发,程序输出:

进程0 1 
进程0 2 
进程1 1 
进程2 1 
进程0 3 
进程1 2 
进程2 2 
进程1 3 
进程2 3

可以看到,程序执行了并发,但是整个结果显示感觉很混乱。

 

来加上 互斥锁:

from multiprocessing import Process, Lock  # Lock模块 互斥锁
import time


def task(name, mutex):
    mutex.acquire()  # 上锁
    print("%s 1" % name)
    time.sleep(1)
    print("%s 2" % name)
    time.sleep(1)
    print("%s 3" % name)
    time.sleep(1)
    mutex.release()  # 释放锁


if __name__ == '__main__':
    mutex = Lock()  # 实例化
    for i in range(3):
        p = Process(target=task, args=("进程%s" % i, mutex))  # 为了保证子进程都是使用了同一个锁,所以将对象传入到子进程
        p.start()

程序输出:

进程0 1
进程0 2
进程0 3
进程1 1
进程1 2
进程1 3
进程2 1
进程2 2
进程2 3

可以看到终端输出变的有序了,但同时也牺牲了效率。

 

二、互斥锁应用场景

模拟抢票

db.txt 内容:

{"count": 1}

程序内容:

from multiprocessing import Process
import json
import time


def search(name):
    dic = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(1)  # 模拟读数据的网络延迟
    print("<%s> 查到剩余票数 [%s]" % (name, dic["count"]))


def get(name):
    dic = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(1)  # 模拟读数据的网络延迟
    if dic["count"] > 0:
        dic["count"] -= 1
        time.sleep(3)  # # 模拟写数据的网络延迟
        json.dump(dic, open("db.txt", "w", encoding="utf-8"))
        print("<%s> 购票成功" % name)


def task(name):
    search(name)
    get(name)


if __name__ == '__main__':
    for i in range(10):  # 模拟10个人抢票
        p = Process(target=task, args=("路人%s" % i,))
        p.start()

程序输出:

<路人1> 查到剩余票数 [1]
<路人2> 查到剩余票数 [1]
<路人0> 查到剩余票数 [1]
<路人4> 查到剩余票数 [1]
<路人3> 查到剩余票数 [1]
<路人5> 查到剩余票数 [1]
<路人6> 查到剩余票数 [1]
<路人7> 查到剩余票数 [1]
<路人8> 查到剩余票数 [1]
<路人9> 查到剩余票数 [1]
<路人1> 购票成功
<路人2> 购票成功
<路人0> 购票成功
<路人4> 购票成功
<路人3> 购票成功
<路人5> 购票成功
<路人6> 购票成功
<路人7> 购票成功
<路人8> 购票成功
<路人9> 购票成功

可以看到我的库里只有1张票,10个人查票是并发查的都查到了有1张票,但是在购票的时候,10个人都显示购票成功了,这显然不对,这就需要加入互斥锁。

from multiprocessing import Process, Lock
import json
import time


def search(name):
    dic = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(1)  # 模拟读数据的网络延迟
    print("<%s> 查到剩余票数 [%s]" % (name, dic["count"]))


def get(name):
    dic = json.load(open("db.txt", "r", encoding="utf-8"))
    time.sleep(1)  # 模拟读数据的网络延迟
    if dic["count"] > 0:
        dic["count"] -= 1
        time.sleep(3)  # # 模拟写数据的网络延迟
        json.dump(dic, open("db.txt", "w", encoding="utf-8"))
        print("<%s> 购票成功" % name)


def task(name, mutex):
    search(name)

    mutex.acquire()   # 只给购票上锁
    get(name)
    mutex.release()


if __name__ == '__main__':
    mutex = Lock()
    for i in range(10):  # 模拟10个人抢票
        p = Process(target=task, args=("路人%s" % i, mutex))
        p.start()

程序输出:

<路人3> 查到剩余票数 [1]
<路人1> 查到剩余票数 [1]
<路人2> 查到剩余票数 [1]
<路人0> 查到剩余票数 [1]
<路人4> 查到剩余票数 [1]
<路人5> 查到剩余票数 [1]
<路人7> 查到剩余票数 [1]
<路人6> 查到剩余票数 [1]
<路人8> 查到剩余票数 [1]
<路人9> 查到剩余票数 [1]
<路人3> 购票成功

 

三、互斥锁与join的区别

join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让task函数中的get任务串行。

def task(name,):
    search(name) # 并发执行

    lock.acquire()
    get(name) #串行执行
    lock.release()

 

四、总结

加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。

虽然可以用文件共享数据实现进程间通信,但问题是:

1、效率低(共享数据基于文件,而文件是硬盘上的数据)

2、需要自己加锁处理

因此我们最好找寻一种解决方案能够兼顾:

1、效率高(多个进程共享一块内存的数据)

2、帮我们处理好锁问题。

这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。

队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。

我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。

 

 

版权声明:
作者:admin
链接:https://www.chenxie.net/archives/1906.html
来源:蜀小陈
文章版权归作者所有,未经允许请勿转载。

THE END
分享
二维码
< <上一篇
下一篇>>