多进程/多线程编程
基于Python的多进程/多线程编程
参考链接
本文不会详细介绍每个类的方法,只记录主要思想。关于具体的方法,可以参考官方文档。
概览
在多进程/多进程编程中,主要关注以下几个方面:
- 进程/线程的创建、启动、停止(start & join)
- 进程间的通信(Queue、Pipe[Connection]、Manager)
- 进程/线程的共享资源管理(共享内存、共享变量)
- 进程/线程间的同步与互斥(同步原语:Lock、Event、Condition、Semaphore、Barrier)
- 守护进程/线程(Daemon)
- 进程/线程池(Pool)
关于使用上下文管理器(with
)
multiprocessing和threading模块中含有acquire
和release
方法,可以使用with
语句来确保资源的正确释放。
1
2
3
4
5
6
7
8
9
with lock:
# do something
# 等价于
lock.acquire()
try:
# do something
finally:
lock.release()
上下文管理器的实现需要实现__enter__
和__exit__
方法,这样可以确保在退出时正确关闭资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Queue:
def __init__(self):
self.q = []
def put(self, item):
self.q.append(item)
def get(self):
return self.q.pop(0)
def __enter__(self): # 进入上下文
return self
def __exit__(self, exc_type, exc_value, traceback): # 退出上下文
pass
进程/线程的创建、启动、停止
进程创建/启动/停止
python使用multiprocessing.Process
类来创建进程,通过start
方法启动进程,通过join
方法等待进程结束。
启动方法控制了进程是否继承父进程的资源,有三种启动方法:spawn
、fork
、forkserver
,详细见文档
1
2
3
4
5
6
7
8
9
from multiprocessing import Process
def f(name):
print(f'hello {name}')
p = Process(target=f, args=('world',))
p.start() # 启动进程
p.join() # 等待进程结束,阻塞当前进程
print('done')
或者通过继承Process
类来实现进程:
1
2
3
4
5
6
7
8
9
10
11
12
13
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self): # 重写run方法
print(f'hello {self.name}')
p = MyProcess('world')
p.start()
p.join()
线程创建/启动/停止
python使用threading.Thread
类来创建线程,通过start
方法启动线程,通过join
方法等待线程结束。
1
2
3
4
5
6
7
8
9
from threading import Thread
def f(name):
print(f'hello {name}')
t = Thread(target=f, args=('world',))
t.start() # 启动线程,会在独立的控制线程中调用run()方法
t.join() # 等待线程结束,阻塞当前线程
print('done')
或者通过继承Thread
类来实现线程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from threading import Thread
class MyThread(Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f'hello {self.name}')
t = MyThread('world')
t.start()
t.join()
print('done')
进程间的通信(消息机制)
使用多进程时,一般使用消息机制实现进程间通信,尽可能避免使用同步原语,例如锁。
多线程中,由于线程共享内存,可以直接访问共享变量,但是需要使用同步原语来保证线程安全。
注意,Queue和Pipe的通信通过pickle模块将对象序列化再进行传递,因此需要确保对象是可序列化的。
同时Queue和Pipe默认是阻塞的,如果队列为空或者管道没有数据,get
方法会阻塞。可以通过指定block[,timeout]
来设置非阻塞模式。
Queue
Queue对象: 使用put
和get
方法来实现进程间的消息传递。
SimpleQueue对象:Queue的简化版本。
JoinableQueue对象:Queue的衍生,可以使用task_done
和join
方法来实现任务的完成状态。
Pipe
Pipe对象:其实现了一个双向管道,可以实现进程间的双向通信。Pipe返回两个Connection对象。
Connection对象:可用send
和recv
方法来实现双工通信,实质上是socket的封装。
1
2
3
4
5
6
from multiprocessing import Process, Pipe
from multiprocessing.connection import Connection
conn1, conn2 = Pipe() # Pipe返回两个Connection对象
conn1.send('hello')
print(conn2.recv())
进程/线程的共享资源管理
多进程中,由于进程间内存是独立的,因此需要使用共享内存或者共享变量来实现进程间的数据共享。 多线程中,由于线程共享内存,可以直接访问共享变量,但是需要使用同步原语来保证线程安全。
进程(Value、Array、Manager)
Value对象:共享内存中的单个值。
1
2
3
4
from multiprocessing import Value, Process
import ctypes
counter = Value(ctypes.c_int, 0) # 创建一个int类型的共享变量,第一个参数为类型,第二个参数为初始值
Array对象:共享内存中的数组。
Manager对象:管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。实现进程间的数据共享,可以使用Value
、Array
、Queue
、Lock
等。具有更高的灵活性。
1
2
3
4
5
6
7
8
9
10
from multiprocessing import Manager, Process
# 使用上下文管理器
with Manager() as manager:
counter = manager.Value('i', 0) # 创建一个int类型的共享变量,第一个参数为类型,第二个参数为初始值
dict_ = manager.dict() # 创建一个共享字典
# 或者使用Manager()创建
manager = Manager()
counter = manager.Value('i', 0)
dict_ = manager.dict()
线程(共享变量)
由于线程共享内存,可以直接访问共享变量,但是需要使用同步原语来保证线程安全。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from threading import Lock, Thread
# 共享资源
counter = 0
# 创建一个锁对象
lock = Lock()
# 线程工作的函数
def update_counter(name):
global counter # 指定counter为全局变量的引用
print(f"{name}: 准备更新计数器。")
# 请求锁
lock.acquire()
try:
print(f"{name}: 已获得锁。")
current_counter = counter
print(f"{name}: 当前计数器值为 {current_counter}。")
counter = current_counter + 1
print(f"{name}: 更新后的计数器值为 {counter}。")
finally:
# 无论如何都要释放锁
lock.release()
print(f"{name}: 已释放锁。")
# 创建线程
thread1 = Thread(target=update_counter, args=('线程1',))
thread2 = Thread(target=update_counter, args=('线程2',))
# 启动线程
thread1.start()
thread2.start()
# 等待线程完成
thread1.join()
thread2.join()
print(f"最终计数器值为 {counter}。")
进程/线程的同步原语
同步原语有助于解决进程/线程间的同步与互斥问题。 在python中多进程与多线程的同步原语十分相似,只是使用的模块不同。
Lock 锁
原始锁(非递归锁)对象,一旦一个进程或者线程拿到了锁,后续的任何其他进程或线程的其他请求都会被阻塞直到锁被释放。
锁常用于保护共享资源,防止多个进程或者线程同时访问。
1
2
3
4
5
6
from threading import Lock
lock = Lock()
lock.acquire() # 获取锁
# do something
lock.release() # 释放锁
RLock 递归锁
递归锁对象,递归锁必须由持有线程、进程亲自释放。如果某个进程或者线程拿到了递归锁,这个进程或者线程可以再次拿到这个锁而不需要等待。但是这个进程或者线程的拿锁操作和释放锁操作的次数必须相同。
Semaphore 信号量
信号量对象管理一个原子性的计数器Semaphore.value
,该计数器因acquire()
方法的调用而递减,因release()
方法的调用而递增, 计数器的值永远不会小于零;当 acquire() 方法发现计数器为零时,将会阻塞,直到其它线程调用 release() 方法。
信号量常用于控制对共享资源的访问,例如限制同时访问的进程/线程数量。
Thread BoundedSemaphore对象: 信号量的衍生,可以设置上限,防止信号量的计数器超过上限。常用于保护数量有限的资源,例如数据库服务器。
Event 事件
事件对象管理一个内部标志,可以用set()
方法将其设置为True
,clear()
方法将其设置为False
。wait()
方法将会阻塞直到标志为True
。这是线程/进程间通信的最简单机制之一:一个线程发出事件信号,而其他线程等待该信号。
事件常用于线程/进程间的通知机制,例如一个线程等待另一个线程的通知。
Condition 条件
条件对象管理一个内部锁,该锁可以从外部传入或在缺省情况下自动创建。其它方法必须在持有关联的锁的情况下调用。wait()
方法释放锁,然后阻塞直到其它线程调用notify()
方法或notify_all()
方法唤醒它。一旦被唤醒,wait()
方法重新获取锁并返回。它也可以指定超时时间。
注意: notify()
方法和 notify_all()
方法并不会释放锁,这意味着被唤醒的线程不会立即从它们的 wait() 方法调用中返回,而是会在调用了 notify() 方法或 notify_all() 方法的线程最终放弃了锁的所有权后返回。
1
2
3
4
5
6
7
8
9
10
11
12
# 消费一个条目
with condition: # 通过with语句获取锁
while not an_item_is_available(): # 由于notify()方法不会释放锁,因此wait()可能要经过不确定的时间才会返回,导致notify()的条件可能已经不再满足(比如已经被其他线程消费),因此需要循环检查条件
condition.wait() # 释放锁并等待
get_an_available_item()
# 生产一个条目
with condition: # 通过with语句获取锁
make_an_item_available()
condition.notify() # 注意:notify()方法不会释放锁
do_something() # 执行完
# 退出with语句后,锁会被释放
使用条件变量的典型编程风格是将锁用于同步某些共享状态的权限,那些对状态的某些特定改变感兴趣的线程,它们重复调用wait()方法,直到看到所期望的改变发生。如生产者-消费者问题。
Barrier 栅栏
栅栏类提供一个简单的同步原语,用于应对固定数量的进程/线程需要彼此相互等待的情况。创建Barrier
对象时,需指定一个数值,当调用wait()
方法的次数达到这个数值时,所有处于wait()
阻塞的进程/线程会被释放,而后栅栏会重置。
常用于进程/线程间的同步,例如多个进程/线程需要等待其他进程/线程完成后再继续执行。
守护进程/线程
在创建进程/线程时,可以设置daemon
属性为True
,这样创建的进程/线程会随着主进程/线程的退出而退出。
进程/线程池
进程/线程池用于管理多个进程/线程,可以通过池对象来调度进程/线程的执行。线程池可以对线程进行复用,大大减少线程创建和销毁所带来的性能消耗。
待补充
Process Pool对象:进程池对象,可以通过apply
、map
等方法来实现进程池的调度。
ThreadPool对象:线程池对象,可以通过submit
、map
等方法来实现线程池的调度。
并发编程
参考资料:
基础概念
并发编程的三要素:
- 原子性:一个操作是不可分割的,要么全部执行成功,要么全部执行失败。
- 可见性:一个线程对共享变量的修改,对其他线程是立即可见的。
- 有序性:程序执行的顺序按照代码的先后顺序执行。
线程的状态:
- 新建状态:创建线程对象。
- 就绪状态:调用start()方法,线程进入就绪状态,等待CPU调度。
- 运行状态:CPU调度线程,线程进入运行状态(Run())
- 阻塞状态:线程调用sleep()、join()、wait()等方法,线程进入阻塞状态。
- 死亡状态:线程执行完毕,或者调用stop()方法,线程进入死亡状态。
并发与并行:
- 并发:多个任务交替执行,但是在同一时间段内只有一个任务在执行,例如多线程。
- 并行:多个任务同时执行,相互不争抢资源,例如多进程。
并发编程的问题
数据竞争
多个线程同时访问同一个共享变量,至少有一个线程对共享变量进行了写操作,这种情况下就会发生数据竞争。
死锁
多个任务正在等待必须由另一线程释放的某个共享变量才能继续运行,比如A线程、B线程、C线程形成A等B、B等C、C等A,造成闭环死锁。
形成死锁必须满足的四种条件,也称为Coffman条件:
- 互斥:死锁中涉及的资源必须是不可共享的,一次只能有一个任务可以使用该资源。
- 占有并等待条件:一个任务在占有某一互斥资源时又请求另一互斥资源,且他在等待时不会释放任何资源。
- 不可剥夺:资源只能被持有他的任务释放
- 循环等待:任务1等待任务2所占用资源。。。任务n等待任务1所占用资源形成循环等待。
避免死锁的方式:
- 忽略:发生死锁重新执行
- 检测:检测程序是否有死锁,如jconsole
- 预防:根据Coffman条件进行预防,让他不能达到四个满足条件
- 规避:任务执行前,对空闲资源和任务需要的资源进行分析,是否会形成死锁来判断任务是否可以执行。
活锁
任务1和任务2两个并发线程执行需要两个资源,任务1执行先对资源1进行了加锁,任务2执行先对资源2进行了加锁,当任务1需要用资源2,因为被任务2持有,没法获取,于是他就释放了资源1,任务2也是一样,互换人质,然后下一步执行又发现任务1获取不到资源1咯,一直循环下去就形成了活锁。
活锁占CPU且占内存,因为他们是一直运行下去的,而且存在资源交换,而死锁只占内存,不占用CPU,因为他们已经没法继续执行下去了。
优先级反转
低优先权任务持有了高优先权任务所需的资源,就会发生优先权反转,低优先任务会先执行,然后释放资源,高优先权任务才能继续执行。
资源不足
某个任务在系统无法获取维持其继续执行下去的资源,就会发生资源不足问题。通过公平原则可以解决此问题,通过算法实现类似于排队原则,避免大家争抢出现资源都没抢够资源不足无法继续运行的问题,但是公平原则会导致效率降低。