多线程-信号量 Semaphore, Event, 定时器 Timer
一、信号量 Semaphore
信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小.
如下例子:
from threading import Thread, Semaphore
import threading
import time, random
def task():
# sm.acquire()
# print("%s 抢到厕所" % threading.current_thread().getName())
# time.sleep(3)
# sm.release()
# 上面可以写成下面的形式,通过with sm进行上下文管理,可以自动acquire和release
with sm:
print("%s 抢到厕所" % threading.current_thread().getName())
time.sleep(random.randint(1, 3))
if __name__ == '__main__':
sm = Semaphore(3) # 设置有3个厕所位置
for i in range(10):
t = Thread(target=task)
t.start()
运行如下:换行通过我人为换行的方式,可以看到抢的状态
Thread-1 抢到厕所 Thread-2 抢到厕所 Thread-3 抢到厕所 Thread-4 抢到厕所 Thread-5 抢到厕所 Thread-6 抢到厕所 Thread-7 抢到厕所 Thread-8 抢到厕所 Thread-9 抢到厕所 Thread-10 抢到厕所
二、Event
线程的一个关键特性是每个线程都是独立运行且状态不可预测,如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。
为了解决这些问题,我们需要使用threading库中的Event对象,对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
在初始情况下,Event对象中的信号标志被设置为False,如果有线程等待一个Event对象, 而这个Event对象的标志为False,那么这个线程将会被一直阻塞直至该标志为True。
一个线程如果将一个Event对象的信号标志设置为True,它将唤醒所有等待这个Event对象的线程。
如果一个线程等待一个已经被设置为True的Event对象,那么它将忽略这个事件, 继续执行
from threading import Event event.isSet():返回event的状态值,初始为False event.wait():如果 event.isSet()==False将阻塞线程; event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度; event.clear():恢复event的状态值为False。
例如:老师上课,学生听课,
from threading import Thread, Event
import time
event = Event()
def student(name):
print("学生 %s 正在听课" % name)
event.wait()
print("学生 %s 课间活动" % name)
def teacher(name):
print("老师 %s 正在授课" % name)
time.sleep(7)
event.set()
if __name__ == '__main__':
stu1 = Thread(target=student, args=("alex",))
stu2 = Thread(target=student, args=("wxx",))
stu3 = Thread(target=student, args=("yxx",))
stu1.start()
stu2.start()
stu3.start()
t1 = Thread(target=teacher, args=("egon",))
t1.start()
运行结果:
学生 alex 正在听课 学生 wxx 正在听课 学生 yxx 正在听课 老师 egon 正在授课 <------ 这里等待了7秒,老师在上课,然后7秒过后老师执行event.set()宣布下课,将event的状态值为True,学生才能开始课间活动 学生 yxx 课间活动 学生 wxx 课间活动 学生 alex 课间活动
以上是中学的情况,但是在大学期间,学生想上就上,想出去就出去,不用非得老师说可以活动了才行,所以在学生处可以设置 event.wait(3) ,表示学生等3秒,如果3秒老师还没有发号施令,学生就自己走了,不管老师了。
event应用场景:
客户端连接服务器端的时候除了开启连接的线程,还应该开启一个检查是否可连接的线程,当检查通过后才可以连接:
from threading import Thread, Event, currentThread
import time
event = Event()
def conn(): # 连接的方法
print("%s is connecting ..." % currentThread().getName())
event.wait() # 等待通过的信号,才执行连接
print("%s is connected." % currentThread().getName())
def check(): # 检查是否可连接的方法
print("%s is checking..." % currentThread().getName())
time.sleep(5) # 模拟检测连接
event.set() # 可以连接,发送信号
if __name__ == '__main__':
for i in range(3):
t = Thread(target=conn)
t.start()
ch = Thread(target=check)
ch.start()
运行结果:
Thread-1 is connecting ... Thread-2 is connecting ... Thread-3 is connecting ... Thread-4 is checking... Thread-2 is connected. Thread-1 is connected. Thread-3 is connected.
但是上面的例子假如检查不通过就会一直等,所以要设置一个超时时间,但是由不能让它一直连接,所以设置超过3次就返回:
from threading import Thread, Event, currentThread
import time
event = Event()
def conn():
n = 0
while not event.is_set():
if n == 3: # 连接了3次时,返回
print("%s try too many times" % currentThread().getName())
return
print("%s is try %s" % (currentThread().getName(), n))
event.wait(0.5) # 设置等待检查的超时时间
n += 1
print("%s is connected." % currentThread().getName())
def check(): # 检查是否可连接的方法
print("%s is checking..." % currentThread().getName())
time.sleep(5) # 模拟检测连接
event.set() # 可以连接,发送信号
if __name__ == '__main__':
for i in range(3):
t = Thread(target=conn)
t.start()
ch = Thread(target=check)
ch.start()
运行结果:
Thread-1 is try 0 Thread-2 is try 0 Thread-3 is try 0 Thread-4 is checking... Thread-3 is try 1 Thread-1 is try 1 Thread-2 is try 1 Thread-1 is try 2 Thread-2 is try 2 Thread-3 is try 2 Thread-2 try too many times Thread-1 try too many times Thread-3 try too many times
再例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作:
from threading import Thread, Event
import threading
import time, random
def conn_mysql():
count = 1
while not event.is_set():
if count > 3:
raise TimeoutError('链接超时')
print('<%s>第%s次尝试链接' % (threading.current_thread().getName(), count))
event.wait(0.5)
count += 1
print('<%s>链接成功' % threading.current_thread().getName())
def check_mysql():
print('\033[45m[%s]正在检查mysql\033[0m' % threading.current_thread().getName())
time.sleep(random.randint(2, 4))
event.set()
if __name__ == '__main__':
event = Event()
conn1 = Thread(target=conn_mysql)
conn2 = Thread(target=conn_mysql)
check = Thread(target=check_mysql)
conn1.start()
conn2.start()
check.start()
运行结果如下:
<Thread-1>第1次尝试链接 <Thread-2>第1次尝试链接 [Thread-3]正在检查mysql <Thread-2>第2次尝试链接 <Thread-1>第2次尝试链接 <Thread-1>第3次尝试链接 <Thread-2>第3次尝试链接 Exception in thread Thread-2: Traceback (most recent call last): File "C:\Python36\lib\threading.py", line 916, in _bootstrap_inner self.run() File "C:\Python36\lib\threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "D:/PycharmProjects/python_fullstack_middle/第四模块·网络编程进阶&数据库开发/第1章·网络编程进阶/02 多线程/09 Event事件.py", line 100, in conn_mysql raise TimeoutError('链接超时') TimeoutError: 链接超时 Exception in thread Thread-1: Traceback (most recent call last): File "C:\Python36\lib\threading.py", line 916, in _bootstrap_inner self.run() File "C:\Python36\lib\threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "D:/PycharmProjects/python_fullstack_middle/第四模块·网络编程进阶&数据库开发/第1章·网络编程进阶/02 多线程/09 Event事件.py", line 100, in conn_mysql raise TimeoutError('链接超时') TimeoutError: 链接超时
三、定时器 Timer
就是定时做事情。
最简单的例子:
from threading import Timer
def task(name):
print("Hello %s" % name)
t = Timer(5, task, args=("egon",))
t.start()
5秒后打印 Hello egon
来看个验证码的例子:
没有定时器的情况下:
import random
def make_code(n=4):
res = ""
for i in range(n):
s1 = str(random.randint(0, 9))
s2 = chr(random.randint(65, 90)) # 60-90取出的正好是ASCII码中a-z和A-Z
res += random.choice([s1, s2])
return res
print(make_code())
每执行一次输出4位的验证码。
来看加上定时器的验证码:5秒之内没有输入正确的验证码,验证码自动刷新。
from threading import Timer
import random
class Code:
def __init__(self):
self.make_cache()
def make_cache(self, interval=5):
self.cache = self.make_code()
print(self.cache)
self.t = Timer(interval, self.make_cache)
self.t.start()
def make_code(self, n=4):
res = ""
for i in range(n):
s1 = str(random.randint(0, 9))
s2 = chr(random.randint(65, 90)) # 60-90取出的正好是ASCII码中a-z和A-Z
res += random.choice([s1, s2])
return res
def check(self):
while True:
code = input("请输入你的验证码>>:").strip()
if code.upper() == self.cache:
print("验证码输入正确")
self.t.cancel()
break
obj = Code()
obj.check()
运行结果:
FH22 请输入你的验证码>>:sdfg 请输入你的验证码>>:fnhsd 请输入你的验证码>>:2Z76 <--- 5秒之后出现的新验证码 dfdgs 请输入你的验证码>>:2z76 验证码输入正确
共有 0 条评论