多进程-互斥锁
一、什么是互斥锁?
就是把多个进程并发修改同一块共享数据的操作变成串行,此举会牺牲效率,但是保证了数据安全。
举个例子,在合租房里,公共使用的卫生间,有同时几个人都要上卫生间,但是卫生间不可能同时给几个人使用,是要看谁先抢到把门锁上就谁使用,等使用完了打开锁之后下一个人才能都继续使用。
在程序上就是,当几个任务并发的时候,由于共享的是同一个操作系统,或者说像打印这类的请求,共享的就是同一个终端,共享就带来了竞争,竞争就导致错乱了,所以引入互斥锁来保证输出的正确,而不会错乱,当然这回导致效率的降低,来看下面的例子。
from multiprocessing import Process
import time
def task(name):
print("%s 1" % name)
time.sleep(1)
print("%s 2" % name)
time.sleep(1)
print("%s 3" % name)
time.sleep(1)
if __name__ == '__main__':
for i in range(3):
p = Process(target=task, args=("进程%s" % i,))
p.start()
程序同时启动3个并发,程序输出:
进程0 1 进程0 2 进程1 1 进程2 1 进程0 3 进程1 2 进程2 2 进程1 3 进程2 3
可以看到,程序执行了并发,但是整个结果显示感觉很混乱。
来加上 互斥锁:
from multiprocessing import Process, Lock # Lock模块 互斥锁
import time
def task(name, mutex):
mutex.acquire() # 上锁
print("%s 1" % name)
time.sleep(1)
print("%s 2" % name)
time.sleep(1)
print("%s 3" % name)
time.sleep(1)
mutex.release() # 释放锁
if __name__ == '__main__':
mutex = Lock() # 实例化
for i in range(3):
p = Process(target=task, args=("进程%s" % i, mutex)) # 为了保证子进程都是使用了同一个锁,所以将对象传入到子进程
p.start()
程序输出:
进程0 1 进程0 2 进程0 3 进程1 1 进程1 2 进程1 3 进程2 1 进程2 2 进程2 3
可以看到终端输出变的有序了,但同时也牺牲了效率。
二、互斥锁应用场景
模拟抢票
db.txt 内容:
{"count": 1}
程序内容:
from multiprocessing import Process
import json
import time
def search(name):
dic = json.load(open("db.txt", "r", encoding="utf-8"))
time.sleep(1) # 模拟读数据的网络延迟
print("<%s> 查到剩余票数 [%s]" % (name, dic["count"]))
def get(name):
dic = json.load(open("db.txt", "r", encoding="utf-8"))
time.sleep(1) # 模拟读数据的网络延迟
if dic["count"] > 0:
dic["count"] -= 1
time.sleep(3) # # 模拟写数据的网络延迟
json.dump(dic, open("db.txt", "w", encoding="utf-8"))
print("<%s> 购票成功" % name)
def task(name):
search(name)
get(name)
if __name__ == '__main__':
for i in range(10): # 模拟10个人抢票
p = Process(target=task, args=("路人%s" % i,))
p.start()
程序输出:
<路人1> 查到剩余票数 [1] <路人2> 查到剩余票数 [1] <路人0> 查到剩余票数 [1] <路人4> 查到剩余票数 [1] <路人3> 查到剩余票数 [1] <路人5> 查到剩余票数 [1] <路人6> 查到剩余票数 [1] <路人7> 查到剩余票数 [1] <路人8> 查到剩余票数 [1] <路人9> 查到剩余票数 [1] <路人1> 购票成功 <路人2> 购票成功 <路人0> 购票成功 <路人4> 购票成功 <路人3> 购票成功 <路人5> 购票成功 <路人6> 购票成功 <路人7> 购票成功 <路人8> 购票成功 <路人9> 购票成功
可以看到我的库里只有1张票,10个人查票是并发查的都查到了有1张票,但是在购票的时候,10个人都显示购票成功了,这显然不对,这就需要加入互斥锁。
from multiprocessing import Process, Lock
import json
import time
def search(name):
dic = json.load(open("db.txt", "r", encoding="utf-8"))
time.sleep(1) # 模拟读数据的网络延迟
print("<%s> 查到剩余票数 [%s]" % (name, dic["count"]))
def get(name):
dic = json.load(open("db.txt", "r", encoding="utf-8"))
time.sleep(1) # 模拟读数据的网络延迟
if dic["count"] > 0:
dic["count"] -= 1
time.sleep(3) # # 模拟写数据的网络延迟
json.dump(dic, open("db.txt", "w", encoding="utf-8"))
print("<%s> 购票成功" % name)
def task(name, mutex):
search(name)
mutex.acquire() # 只给购票上锁
get(name)
mutex.release()
if __name__ == '__main__':
mutex = Lock()
for i in range(10): # 模拟10个人抢票
p = Process(target=task, args=("路人%s" % i, mutex))
p.start()
程序输出:
<路人3> 查到剩余票数 [1] <路人1> 查到剩余票数 [1] <路人2> 查到剩余票数 [1] <路人0> 查到剩余票数 [1] <路人4> 查到剩余票数 [1] <路人5> 查到剩余票数 [1] <路人7> 查到剩余票数 [1] <路人6> 查到剩余票数 [1] <路人8> 查到剩余票数 [1] <路人9> 查到剩余票数 [1] <路人3> 购票成功
三、互斥锁与join的区别
join是将一个任务整体串行,而互斥锁的好处则是可以将一个任务中的某一段代码串行,比如只让task函数中的get任务串行。
def task(name,):
search(name) # 并发执行
lock.acquire()
get(name) #串行执行
lock.release()
四、总结
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1、效率低(共享数据基于文件,而文件是硬盘上的数据)
2、需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。
这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
共有 0 条评论