多线程(Thread) 也称轻量级进程,是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。线程自身不拥有系统资源,只拥有一些在运行中必不可少的资源,但它可与同属一个进程的其他线程共享进程所拥有的全部资源。一个线程可以创建和撤销另一个线程,同一个进程中的多个线程之间可以并发执行。
线程有就绪、阻塞、运行 三种基本状态:
- 就绪状态是指线程具备运行的所有条件,逻辑上可以运行,在等待处理器;
- 运行状态是指线程占有处理器,正在运行;
- 阻塞状态是指线程在等待一个事件(如某个信号量),逻辑上不可执行.
Python 多线程简介
有的读者可能会想到 Python 中的多线程,由于全局锁 GIL (Global interpreter Lock) 限制了 Python 中的多线程,同一时刻只能有一个线程运行,无法发挥多核 CPU 的优势。首先需要明确 GIL 并不是 Python 的特性,它是在实现 Python 解析器(CPython) 时所引入的一个概念。就好比 C++ 是一套语言(语法)标准,可以用不同的编译器来编译成可执行代码,比较有名的编译器如 GCC、Intel C++、Visual C++ 等。Python 也一样,同一段代码可以通过 CPython、PyPy、Psyco 等不同的 Python 执行环境来执行。像其中的 CPython 就有 GIL。然而因为 CPython 是大部分环境下默认的 Python 执行环境,所以在很多人的概念里 CPython 就是 Python,也就想当然的把 GIL 归结为 Python 语言的 缺陷。因此,这里需要明确一点: GIL 并不是 Python 的特性,Python 完全可以不依赖于 GIL。
GIL 本质就是一把互斥锁,既然是互斥锁,那么所有互斥锁的本质都是一样的,都是将并发运行编程串行,以此来控制同一时间内共享数据只能被一个任务修改,进而保证数据的安全。由于 CPython 的内存管理机制,因此需要确保共享数据 的访问安全,即加锁处理(GIL)。
有了 GIL 的存在,同一时刻同一进程中只有一个线程被执行,那么读者可能要问了,进程可以利用多核,而 Python 的多线程却无法利用多核优势,Python 的多线程是不是没用了?答案当然不是。
首先明确我们线程执行的任务是什么?是做计算(计算密集型 )还是做输入/输出(IO密集型),不同的场景使用不同的方法。多核 CPU 意味着可以有多个核并行完成计算,多核提升的是计算性能,但每个 CPU 一旦遇到 IO 阻塞,仍然需要等待,所以多核对 IO 密集型任务没什么太高提升。
示例1,计算密集型任务 - 多进程(
jsmjx_multi_process.py
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import time
import os
from multiprocessing import Process
# 计算密集型任务
def work():
res = 0
for i in range(100000000):
res *= i
if __name__ == '__main__':
l = []
print("本机为", os.cpu_count(), "核 CPU") # 本机为 8 核
start_time = time.time()
for i in range(8):
p = Process(target=work) # 多进程
l.append(p)
p.start()
for p in l:
p.join()
stop_time = time.time()
print("计算密集型任务,多进程耗时 %s" %(stop_time - start_time))运算结果如下
1
2本机为 8 核 CPU
计算密集型任务,多进程耗时 3.082679033279419示例2,计算密集型任务 - 多线程(
jsmjx_multi_thread.py
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26import os
import time
from threading import Thread
# 就算密集型任务
def work():
res = 0
for i in range(100000000):
res *= i
if __name__ == '__main__':
l = []
print("本机为", os.cpu_count(), "核 CPU")
start_time = time.time()
for i in range(8):
p = Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
stop_time = time.time()
print("计算密集型任务,多线程耗时 %s" %(stop_time - start_time))运行结果如下
1
2本机为 8 核 CPU
计算密集型任务,多线程耗时 16.9545841217041示例3,IO密集型任务 - 多进程(
iomjx_multi_process.py
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24import os
import time
from multiprocessing import Process
# IO 密集型任务
def work():
time.sleep(2)
print("===>", file=open("tmp.txt", "w"))
if __name__ == '__main__':
l = []
print("本机为", os.cpu_count(), "核 CPU")
start_time= time.time()
for i in range(400):
p = Process(target=work)
l.append(p)
p.start()
for p in l:
p.join()
stop_time = time.time()
print("IO 密集型任务,多进程耗时 %s" %(stop_time - start_time))运行结果如下
1
2本机为 8 核 CPU
IO 密集型任务,多进程耗时 4.815764427185059示例4,IO密集型任务 - 多线程(
iomjx_multi_thread.py
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23import os
import time
from threading import Thread
def work():
time.sleep(2)
print("===>", file=open("tmp.txt", "w"))
if __name__ == "__main__":
l = []
print("本机为", os.cpu_count(), "核 CPU")
start_time = time.time()
for i in range(400):
p = Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
stop_time = time.time()
print("IO 密集型任务,多线程耗时 %s" %(stop_time - start_time))运行结果如下
1
2本机为 8 核 CPU
IO 密集型任务,多线程耗时 2.0425140857696533
结论: 在 Python 中,对于计算密集型任务,多进程占优势;对于 IO 密集型任务,多线程占优势。
当然,对运行一个程序来说,随着 CPU 的增多执行效率肯定会有所提高 ,这是因为一个程序基本上不会是纯计算或纯 IO,所以我们只能相对的去看一个程序到底是计算密集型还是 IO 密集型。
多线程编程之 threading 模块
Python 提供多线程编程的模块有以下两个: threading
和 _thread
。其中 _thread
模块提供了低级别的基本功能来支持多线程功能,提供 简单的锁来确保同步,推荐使用 threading
模块。threading
模块对 _thread
进行了封装,提供了更高级别,功能更强,更易于使用的线程管理的功能,对线程的支持更完善,绝大多数情况下,只需要使用 threading
这个高级模块就够了。
使用 threading 进行多线程操作有以下两种方法:
方法一: 创建
threading.Thread
类的实例,调用其start()
方法(multi_thread_1.py
)。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30import time
import threading
def task_thread(counter):
print(f"线程名称: {threading.current_thread().name} 参数: {counter} 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
num = counter
while num:
time.sleep(3)
num -= 1
print(f"线程名称: {threading.current_thread().name} 参数: {counter} 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
if __name__ == '__main__':
print(f"主线程开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# 初始化三个线程,传递不同的参数
t1 = threading.Thread(target=task_thread, args=(3,))
t2 = threading.Thread(target=task_thread, args=(2,))
t3 = threading.Thread(target=task_thread, args=(1,))
# 开启三个线程
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print(f"主线程结束时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")程序实例化了三个 Thread 类的实例,并向任务函数传递不同的参数,使它们运行不同的时间后结束,
start()
方法开启线程,join()
方法阻塞主线程,等待当前线程运行结束。运行结果如下
1
2
3
4
5
6
7
8主线程开始时间: 2023-03-27 14:56:18
线程名称: Thread-1 (task_thread) 参数: 3 开始时间: 2023-03-27 14:56:18
线程名称: Thread-2 (task_thread) 参数: 2 开始时间: 2023-03-27 14:56:18
线程名称: Thread-3 (task_thread) 参数: 1 开始时间: 2023-03-27 14:56:18
线程名称: Thread-3 (task_thread) 参数: 1 开始时间: 2023-03-27 14:56:21
线程名称: Thread-2 (task_thread) 参数: 2 开始时间: 2023-03-27 14:56:24
线程名称: Thread-1 (task_thread) 参数: 3 开始时间: 2023-03-27 14:56:27
主线程结束时间: 2023-03-27 14:56:27方法二: 继承 Thread 类,在子类中重写
run()
方法和__init__()
方法(multi_thread_2.py
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35import time
import threading
class MyThread(threading.Thread):
def __init__(self, counter):
super().__init__()
self.counter = counter
def run(self):
print(f"线程名称: {threading.current_thread().name} 参数: {self.counter} 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
counter = self.counter
while counter:
time.sleep(3)
counter -= 1
print(f"线程名称: {threading.current_thread().name} 参数: {self.counter} 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
if __name__ == "__main__":
print(f"主线程开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# 初始化三个线程,传递不同的参数
t1 = MyThread(3)
t2 = MyThread(2)
t3 = MyThread(1)
# 开启三个线程
t1.start()
t2.start()
t3.start()
# 等待运行结束
t1.join()
t2.join()
t3.join()
print(f"主线程结束时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")以上程序自定义线程类 MyThread,继承自 threading.Thread,并重写了
__init__()
方法和run()
方法,其中 run() 方法相当于之前示例中的任务函数,运行结果如下所示1
2
3
4
5
6
7
8主线程开始时间: 2023-03-27 15:36:31
线程名称: Thread-1 参数: 3 开始时间: 2023-03-27 15:36:31
线程名称: Thread-2 参数: 2 开始时间: 2023-03-27 15:36:31
线程名称: Thread-3 参数: 1 开始时间: 2023-03-27 15:36:31
线程名称: Thread-3 参数: 1 开始时间: 2023-03-27 15:36:34
线程名称: Thread-2 参数: 2 开始时间: 2023-03-27 15:36:37
线程名称: Thread-1 参数: 3 开始时间: 2023-03-27 15:36:40
主线程结束时间: 2023-03-27 15:36:40如果继承 Thread 类,想要调用外部传入函数,可以如下所示(
multi_thread_3.py
)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39import time
import threading
def task_thread(counter):
print(f"线程名称: {threading.current_thread().name} 参数: {counter} 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
num = counter
while num:
time.sleep(3)
num -= 1
print(f"线程名称: {threading.current_thread().name} 参数: {counter} 开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
class MyThread(threading.Thread):
def __init__(self, target, args):
super().__init__()
self.target = target
self.args = args
def run(self):
self.target(*self.args)
if __name__ == '__main__':
print(f"主线程开始时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# 初始化三个线程,传递不同的参数
t1 = MyThread(target=task_thread, args=(3,))
t2 = MyThread(target=task_thread, args=(2,))
t3 = MyThread(target=task_thread, args=(1,))
# 开始三个线程
t1.start()
t2.start()
t3.start()
# 等待线程运行结束
t1.join()
t2.join()
t3.join()
print(f"主线程结束时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")通过
self.target
来接收外部传入的参数,通过self.args
来接收外部函数的参数,这样就可以使用继承 Thread 的线程类调用外部传入的函数,原理和方法是相同的,运行结果不变。