多线程同步之 Lock(锁)
如果多个线程共同对某个数据修改,则可能出现不可预料的结果,这个时候就需要使用互斥锁来进行同步。例如,在三个线程对共同变量 num 进行 100 万次加减操作之后,其 num 的结果不为 0
示例,不加锁的意外情况(thread_no_lock.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import threading
num = 0
def task_thread(n):
global num
for i in range(1000000):
num = num + n
num = num - n
t1 = threading.Thread(target=task_thread, args=(6,))
t2 = threading.Thread(target=task_thread, args=(17,))
t3 = threading.Thread(target=task_thread, args=(11,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print(num)运行结果如下
1
-19
每次执行的结果是随机的,为 0 的概率非常小,之所以会出现不为 0 的情况,是因为这里同时有许多语句修改 num 的值,当一个线程正在执行 num+n,另一个线程正在执行 num-n,从而导致之前的线程执行 num-n 时 num 的值已经不是之前的值,因此最终结果不为 0。为了保证数据的正确性,需要使用互斥锁对多个线程进行同步,限制当一个线程正在访问数据时,其他线程只能等待,直到前面的线程释放锁。
使用
threading.Thread
对象的Lock
和Rlock
可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些每次只允许一个线程操作的数据,可以将其操作放到 acquire 方法和 release 方法之间。示例,加互斥锁后运行结果始终一致(thread_sync_lock.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29import threading
num = 0
lock = threading.Lock()
def task_thread(n):
global num
# 获取锁,用于线程同步
lock.acquire()
for i in range(1000000):
num = num + n
num = num - n
# 释放锁,开启下一个线程
lock.release()
t1 = threading.Thread(target=task_thread, args=(6,))
t2 = threading.Thread(target=task_thread, args=(17,))
t3 = threading.Thread(target=task_thread, args=(11,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print(num)无论执行多少次,执行结果都为 0
多线程同步之 Semaphore(信号量)
互斥锁是只允许一个线程访问共享数据,而信号量是同时允许一定数量的线程访问共享数据,比如 银行柜台有 5 个窗口,允许同时有 5 个人办理业务,后面的人只能等待,待柜台有人办理完业务后才可以进入相应的柜台办理。
示例,使用信号量控制并发(thread_sync_Semaphore.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import threading
import time
# 同时只有5个人能办理业务
semaphore = threading.BoundedSemaphore(5)
# 模拟银行业务办理
def yewubanli(name):
semaphore.acquire()
time.sleep(3)
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} {name} 正在办理业务")
semaphore.release()
thread_list = []
for i in range(12):
t = threading.Thread(target=yewubanli, args=(i,))
thread_list.append(t)
for thread in thread_list:
thread.start()
for thread in thread_list:
thread.join()运行结果如下
1
2
3
4
5
6
7
8
9
10
11
122023-03-28 14:23:10 0 正在办理业务
2023-03-28 14:23:10 4 正在办理业务
2023-03-28 14:23:10 1 正在办理业务
2023-03-28 14:23:10 2 正在办理业务
2023-03-28 14:23:10 3 正在办理业务
2023-03-28 14:23:13 6 正在办理业务
2023-03-28 14:23:13 9 正在办理业务
2023-03-28 14:23:13 8 正在办理业务
2023-03-28 14:23:13 7 正在办理业务
2023-03-28 14:23:13 5 正在办理业务
2023-03-28 14:23:16 10 正在办理业务
2023-03-28 14:23:16 11 正在办理业务可以看出,同一时刻只有5个人正在办理业务,即同一时刻只有 5 个线程获得资源运行。可以通过信号量来控制多线程的并发数。
多线程同步之 Condition
条件对象 Condition 能让一个线程 A 停下来,等待其他线程 B,线程 B 满足了某个条件后通知线程 A 继续运行。线程首先获取一个条件变量锁,如果条件不足,则该线程等待并释放条件变量锁;如果条件满足,就继续执行线程,执行完成后可以通知其他状态为 wait 的线程执行。其他处于 wait 状态的线程接收到通知后会重新判断条件以确定是否继续执行。
示例,使用条件对象 Condition 同步多线程(thread_sync_condition.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43import threading
class Boy(threading.Thread):
def __init__(self, cond, name):
super(Boy, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
print(self.name + ": 嫁给我吧!?")
self.cond.notify() # 唤醒一个挂起的线程,让 HanMeiMei 表态
self.cond.wait() # 释放内部所占用的锁,同时线程被挂起,直到接收到通知被唤醒或超时,等待 HanMeiMei 回答
print(self.name + ": 我单膝下跪,送上戒指!")
self.cond.notify()
self.cond.wait()
print(self.name + ": Li 太太,你的选择太明智了。")
self.cond.release()
class Girl(threading.Thread):
def __init__(self, cond, name):
super(Girl, self).__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.acquire()
self.cond.wait() # 等待 Lilei 求婚
print(self.name + ": 没有情调,不够浪漫,不答应")
self.cond.notify()
self.cond.wait()
print(self.name + ": 好吧,答应你了")
self.cond.notify()
self.cond.release()
cond = threading.Condition()
boy = Boy(cond, "LiLei")
girl = Girl(cond, "HanMeiMei")
girl.start()
boy.start()程序实例化了一个 Condition 对象 cond,一个 Boy 对象 boy,一个 Girl 对象 girl;
程序首先启动了 girl 线程,girl 虽然获取到了条件变量锁 cond,但又执行了 wait 并释放条件变量锁,自身进入阻塞状态;
boy 线程启动后,就获得了条件变量锁 cond 并发出了消息,之后通过 notify 唤醒另一个挂起的线程,并释放条件变量锁等待 girl 的回答;
后面的过程都是重复这些步骤。最后通过 release 程序释放资源.
多线程同步之 Event
Event 用于多线程之间的通信。一个线程发出一个信号,其他一个或多个线程等待,调用 Event 对象的 wait 方法,线程则会阻塞等待,直到别的线程 set 之后才会被唤醒。
示例,使用 Event 实现多线程同步(thread_sync_event.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45import threading
import time
class Boy(threading.Thread):
def __init__(self, cond, name):
super().__init__()
self.cond = cond
self.name = name
def run(self):
print(self.name + ": 嫁给我吧!?")
self.cond.set() # 唤醒一个挂起的线程,让 HanMeiMei 表态
time.sleep(0.5)
self.cond.wait()
print(self.name + ": 我单膝下跪,送上戒指!")
self.cond.set()
time.sleep(0.5)
self.cond.wait()
self.cond.clear()
print(self.name + ": Li 太太,你的选择太明智了。")
class Girl(threading.Thread):
def __init__(self, cond, name):
super().__init__()
self.cond = cond
self.name = name
def run(self):
self.cond.wait() # 等待 LiLei 的求婚
self.cond.clear()
print(self.name + ": 没有情调,不够浪漫,不答应")
self.cond.set()
time.sleep(0.5)
self.cond.wait()
print(self.name + ": 好吧,答应你了")
self.cond.set()
cond = threading.Event()
boy = Boy(cond, "LiLei")
girl = Girl(cond, "HanMeiMei")
boy.start()
girl.start()运行结果如下
1
2
3
4
5LiLei: 嫁给我吧!?
HanMeiMei: 没有情调,不够浪漫,不答应
LiLei: 我单膝下跪,送上戒指!
HanMeiMei: 好吧,答应你了
LiLei: Li 太太,你的选择太明智了。Event 内部默认内置了一个标志,初始值为 False。上述代码中对象 girl 通过
wait()
方法进入等待状态,直到对象 boy 调用该 Event 的 set() 方法将内置标志设置为 True 时,对象 girl 再继续运行。对象 boy 最后调用 Event 的clear()
方法将内置标志设置为 False,恢复初始状态。
多线程优先级队列 queue
python 的 queue 模块中提供了同步的,线程安全的队列类,包括先进先出队列 Queue、后进先出队列 LifoQueue 和优先级队列 PriorityQueue。这些队列都实现了锁原语,可以直接使用来实现线程之间的同步。
示例,生产者消费者模式(thread_queue.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30import time
import threading
import queue
# 先进先出
q = queue.Queue(maxsize=5)
# q = queue.LifoQueue(maxsize=3)
# q = queue.PriorityQueue(maxsize=3)
def ProducerA():
count = 1
while True:
q.put(f"冷饮 {count}")
print(f"{time.strftime('%H:%M:%S')} A 放入: [冷饮 {count}]")
count += 1
time.sleep(1)
def ConsumerB():
while True:
print(f"{time.strftime('%H:%M:%S')} B 取出: [{q.get()}]")
time.sleep(5)
p = threading.Thread(target=ProducerA)
c = threading.Thread(target=ConsumerB)
c.start()
p.start()运行结果如下
1
2
3
4
5
6
7
8
9
10
1116:46:32 A 放入: [冷饮 1]
16:46:32 B 取出: [冷饮 1]
16:46:33 A 放入: [冷饮 2]
16:46:34 A 放入: [冷饮 3]
16:46:35 A 放入: [冷饮 4]
16:46:36 A 放入: [冷饮 5]
16:46:37 B 取出: [冷饮 2]
16:46:37 A 放入: [冷饮 6]
16:46:38 A 放入: [冷饮 7]
16:46:42 B 取出: [冷饮 3]
16:46:42 A 放入: [冷饮 8]以上代码是实现生产者消费者模型的一个比较简单的例子。在并发编程中,使用生产者和消费者模式能够解决大多数并发问题。如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续产生数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。
生产者和消费者模式是通过一个容器(队列)来解决生产者和消费者的强耦合问题。因为生产者和消费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者产生完数据之后不用等待消费者处理,可以直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列中取。阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
多线程之线程池 pool
在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象需要获取内存资源或其他更多资源。虚拟机也将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收。同样的道理,多任务情况下每次都会生成一个新线程,执行任务后资源再被回收就显得非常低效,因此线程池就是解决这个问题的办法。类似的例子还有连接池,进程池等。
将任务添加到线程池中,现成池会自动指定一个空闲的线程去执行任务,当超过线程池的最大线程数时,任务需要等待有新的空闲线程后才会被执行。
我们可以使用 threading 模块以及 queue 模块定制线程池,也可以用 multiprocessing 模块。from multiprocessing import Pool
这样导入的 Pool 表示的是进程池,from multiprocessing.dummy import Pool
这样导入的 Pool 表示的是线程池。
示例,线程池实例(thread_pool.py)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import time
from multiprocessing.dummy import Pool as ThreadPool
def fun(n):
time.sleep(2)
start_time = time.time()
for i in range(5):
fun(i)
print(f"单线程执行程序耗时: {time.time() - start_time}")
start_time2 = time.time()
# 开启 5 个 worker,没有参数时默认是 CPU的核心数
pool = ThreadPool(processes=5)
# 在线程中执行 urllib2.urlopen(url) 并返回执行结果
results2 = pool.map(fun, range(5))
pool.close()
pool.join()
print(f"线程池(5) 并发执行耗时: {time.time() - start_time2}")上述代码模拟运行一个耗时 2 秒的任务,比较顺序执行 5 次和线程池(并发数为5)执行的耗时,运行结果如下
1
2单线程执行程序耗时: 10.018553018569946
线程池(5) 并发执行耗时: 2.02367901802063显然并发执行效率更高,接近单词执行的时间
总结: Python 多线程适合用在 IO 密集型任务中。对于 IO 密集型任务来说,较少时间用在 CPU 计算上,较多时间用在 IO 上,如文件读写,web 请求,数据库请求等;而对于计算密集型任务,应该使用多进程。