多线程基础概念
并行与并发
- 并行:同时处理多个任务,必须在多核环境下
- 一段时间内同时处理多个任务,单核也可以并发
并发手段
- 线程:内核空间的调度
- 进程:内核空间的调度
- 协程:用户空间的调度
线程可以允许程序在同一进程空间中并发运行多个操作。本次主要介绍Python标准库中的多线程模块threading。
threading模块
线程初始化
使用threading模块的Thread类初始化对象然后调用start方法启动线程。
初始化的五个线程的执行逻辑中的print方法打印字符串及换行符出现了随机分布,即出现了资源竞争。
给线程传递参数
args传递位置参数,kwargs传递关键字参数。
Thread常用参数和方法
可以看到Thread函数的初始化方法中的参数如下:
name
表示线程名称,默认情况下,线程名称是Thread-N
,N是一个较小的十进制数。我们可以传递name参数,控制线程名称。
以下会导入logging模块来显示线程的名称等详细信息
其中logging模块的basicConfig函数的format中的%(threadName)s就是用来输出当前线程的名称的。
线程可以重名, 线程名并不是线程的唯一标识,但是通常应该避免线程重名,通常的处理手段是加前缀
daemon
Daemon:守护
和Daemon线程相对应的还有Non-Daemon线程,在此Thread初始化函数中的daemon参数即表示线程是否是Daemon线程。
- Daemon线程:会伴随主线程结束而结束(可以理解为主线程结束,守护线程结束)
- Non-Daemon线程:不会随着主线程结束而结束,主线程需要等待Non-Daemon结束
Thread.join()
如果想等Daemon线程执行完成之后主线程再退出,可以使用线程对象的join()
方法
使用join函数只有主线程就需要等待Daemon线程执行完成在推出。
join函数的原型:join(self, timeout=None)
join方法会阻塞直到线程退出或者超时, timeout 是可选的,如果不设置timeout, 会一直等待线程退出。如果设置了timeout,会在超时之后退出或者线程执行完成退出。
因为join函数总是返回None,因此在超时时间到达之后如果要知道线程是否还是存活的,可以调用is_alive()方法判断线程是否存活。
threading常用方法
enumerate()
列出当前所有的存活的线程
local()
线程共享内存、状态和资源。但是thread模块的local类的对象的属性, 只在当前线程可见。
Thread类的派生
Python中可以通过继承 Thread
类并重写 run
方法来编写多线程的逻辑,此时逻辑函数就是run。
通过继承方式派生而来的子类对象可以同时执行start方法和run方法,结果是一样的,都是执行子类的run方法。但是非继承的方式不能同时使用start方法和run方法,会报错。
派生时逻辑函数的参数传递
Timer类
Timer类:Thread类的派生类,也在threading模块中。意为定时器,用作线程的延迟执行。
Timer类的初始化方法:__init__(self, interval, function, args=None, kwargs=None)
- interval:时间间隔,即几秒之后开始执行function
- function:线程执行的逻辑函数
- args:位置参数
- kwargs:关键字参数
代码
**Timer.cancel()**:取消仍然存活的定时器,如果定时器已经开始执行function,则无法取消。
**Timer.setDaemon(True)**:设置定时器为守护线程
线程同步
当使用多个线程来访问同一个数据时,会经常出现资源争用等线程安全问题(比如多个线程都在操作同一数据导致数据不一致),这时候我们就可以使用一些同步技术来解决这类问题。比如Event,Lock,Condition,Barrier,Semaphore等等。
Event
Event对象内置一个标志,这个标志可以由**set()方法和clear()方法设定。线程可以使用wait()**方法进行阻塞等待,知道Event对象内置标志被set。
- **clear(self)**:设置内置标志为False
- **set(self)**:设置内置标志为True
- **wait(self, timeout=None)**:开始阻塞,直到内置标志被设置为True(即wait会阻塞线程直到set方法被调用或者超时)
- **is_set(self)**:当且仅当内置标志为True的时候返回True
代码
以下代码实现的逻辑是:一个boss和五个睡觉工人,只要有一个工人完成了睡觉任务,那么就唤醒boss和其他工人。
执行start()方法,测试结果
可以看到:worker-2退出之后,boss和另外四个worker也瞬间就退出了。所以event对象的内置状态被set之后,相关线程就不再wait了。
- event:在线程之间发送信号,通常用于某个线程需要等待其他线程处理完成某些动作之后才能启动
wait()方法的timeout参数
程序每隔3s就会输出一次结果,直到执行set()方法才会停止。因此我们可以写一个定时器(类似于Thread类的派生类Timer)。
代码
延迟5s之后执行了逻辑函数,也可以使用cancel函数取消。(要注意参数的传递,此处Timer初始化不能使用关键字参数)
Lock
event是用来同步线程之间的操作的,但是如果要控制共享资源的访问那就需要用到锁机制了,在Python标准库中的实现就是内置的lock类。
**threading.Lock()**函数会创建一个lock类的对象。
锁对象是一个同步原语(synchronization primitive),lock对象主要有以下三个方法:
- acquire(): acquire(blocking=True, timeout=-1) -> bool 获得锁(即锁定锁)。成功获得锁返回True,没有获得锁则返回False。
- release(): release() 释放锁
- locked(): locked() -> bool 检查锁是否被锁住
代码
以下代码实现了在多个进程同时对资源进行访问时,进行加锁和解锁的操作,保证加减操作和赋值操作组合之后的原子性。
测试输出
可见,各项操作之间保持相互原子性,没有出现干扰。
因为lock类实现了__enter__
和__exit__
两个魔术方法,因此支持上下文管理器,可以修改以上Counter类的实现方法如下:
即使用上下文管理器来代替try...finally...
语句,测试输出应该以以上结果一致。
acquire方法的blocking参数
当blocking=True时,A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法,如果在B线程中再次执行lock.acquire()方法,则B线程阻塞。
- 正如以上代码实现,当有n个线程需要修改一个共享资源的时候,其他线程在获取锁之前都处于阻塞状态。(python的阻塞都会让出cpu的时间片,因此不是忙等待)
当blocking=Fasle时,A线程中执行了lock.acquire()方法之后并且没有执行到lock.release()方法,如果在B线程中再次执行lock.acquire()方法,则B线程不会阻塞,并且acquire函数返回False。
acquire方法的timeout参数
当blocking=True并且timeout>0时,acquire会一直阻塞到超时或者锁被释放。
acquire(0)的参数传递
模拟acquire方法的默认参数,编写一下函数进行模拟参数传递的过程:
可见第一个位置参数,替代了blocking。也就是说lock.acquire(0)等效于lock.acquire(blocking=False)
RLock
正常的lock对象是不能多次调用acquire
的,但是可重用锁RLock
可以多次调用 acquire
而不阻塞,而且 release
时也要执行和 acquire
一样的次数。
Condition
除了Event对象之外,线程同步还可以使用条件同步机制Condition。一类线程等待特定条件,而另一类线程发出特定条件满足的信号。
在Condition的帮助中有以下几个方法:
- 初始化方法:**init(self, lock=None)**。如果给定了lock参数,那么必须是Lock或者Rlock对象,并且被当做底层锁来使用。如果没有指定,那么会创建一个RLock对象的锁,也被当做底层锁来使用。
- 实现了
__enter__
和__exit__
方法,支持上下文管理器。 - notify(self, n=1):唤醒一个或多个在当前Condition上等待的其他线程,如果此方法的调用线程没有获得锁,那么在调用的时候就会报错RuntimeError
- notify_all(self):唤醒所有线程
- wait(self, timeout=None):一直等待着知道被notifyed或者发生超时
实例代码
以下代码实现的是:有一个生产者线程,会生产若干次,每次生产结束后需要通知所有的消费者线程来消费,因此下面代码使用的是notify_all方法。
测试结果(一个生产者,三个消费者)
可见,生产者每生产一次,所有的消费者就会去消费。如果想控制每次生产之后通知几个消费者来消费,那么就可以使用notify方法,指定消费者线程个数。
代码如下
测试结果(一个生产者,三个消费者,每次生产之后只通知一个消费者去消费)
- Condition 通常用于生产者消费者模式, 生产者生产消息之后, 使用notify 或者 notify_all 通知消费者消费。
- 消费者使用wait方法阻塞等待生产者通知
- notify通知指定个wait的线程, notify_all通知所有的wait线程
- 无论notify/notify_all还是wait 都必须先acqurie, 完成后必须确保release, 通常使用with语法
Barrier
Barrier类存在于threading模块中,中文可以翻译成栅栏
可以看到Barrier的主要方法和属性:
__init__(self, parties, action=None, timeout=None)
:初始化方法,创建一个Barrierparties
:所有参与的线程的数量action
:所有的线程都wait之后并且在线程释放之前就会执行这个action函数,相当于集结之后要做的事情。timeout
:相当于给需要等待的每个线程的wait方法加上timeout参数,超时则barrier不再生效
abort(self)
:将Barrier设置成broken状态reset(self)
:将Barrier重置为最初状态wait(self, timeout=None)
:在Barrier前等待,返回在Barrier前等待的下标,从0到parties-1broken
:如果Barrier处于broken状态则返回Truen_waiting
:当前已经在Barrier处等待的线程的数量parties
:需要在Barrier处等待的线程的数量
示例代码
测试结果
可见,所有的线程都会一直等待,知道所有的线程都到期了,然后就通过barrier,继续执行后续操作。
Barrier会建立一个控制点,所有参与的线程都会阻塞,直到所有参与的“各方”达到这一点。 它让线程分开启动,然后暂停,直到它们都准备好再继续。因此,这一点可以理解为各个线程的一个集结点。
abort函数的使用
将Barrier设置成broken状态。所有线程在参与集结过程中,只要执行了barrier.abort方法,那么正在等待的线程都会抛出threading.BrokenBarrierError异常。可以理解为,只要有一个线程确定已经到不了Barrier并且通知了Barrier,那么Barrier就会执行abort方法,通知所有正在wait的线程放弃集结。
实例代码
测试结果
Semaphore
Semaphore类存在于threading模块中
信号量内部管理者一个计数器,这个计数器的值等于release()方法调用的次数减去acquire()方法调用的次数然后再加上初始值value,value默认为1。
可以看到Semaphore的主要方法:
__init__(self, value=1)
:初始化一个信号量,value为内部计数器赋初值,默认为1acquire(self, blocking=True, timeout=None)
:获取信号量,内部计数器减一release(self)
:释放信号量,内部计数器加一
示例代码
测试结果
这个测试结果显示:三个线程获取连接池中的两个连接,结果出现了一个线程等待其他线程执行完成之后再获取连接的过程。
Queue
Condition线程同步部分用来传递数据的是一个封装在生产者消费者模型中的元素data(正常使用情况下一般封装的都是一个列表,类似与Barrier部分的连接池中的conns列表)。
Python的queue模块中提供了同步的、线程安全的队列类,包括三种队列:
- FIFO(先入先出)队列Queue
- LIFO(后入先出)队列LifoQueue
- 优先级队列PriorityQueue
这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。因此我们可以使用queue模块来替换掉生产者消费者中的全局元素,代码如下:
测试结果
每生产一次,消费者就会消费一次。当消费者线程,读取Queue则调用Queue.get()方法,若Queue为空时消费者线程获取不到内容,就会阻塞在这里,直到成功获取内容。
##线程同步总结
- Event:主要用于线程之间的事件通知
- Lock,Rlock:主要用于保护共享资源
- Condition:主要用于生产者消费者模型,可以理解为Event和Lock的结合体
- Barrier:同步指定个等待的线程
- Semaphore:主要用于保护资源,和Lock的区别在于可以多个线程访问共享资源,而锁一次只能一个线程访问到共享资源,即锁是value=1的信号量
- Queue:使用FIFO队列进行同步,适用于生产者消费者模型
GIL
GIL(Global Interpreter Lock):全局解释器锁
Python代码的执行由Python 主循环来控制,Python 在设计之初就考虑到要在解释器的主循环中,同时只有一个线程在执行,即在任意时刻,只有一个线程在解释器中运行。对Python 主循环的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。
因此Python多线程程序的执行顺序如下:
- 设置GIL
- 切换到一个线程去运行
- 运行
- 结束线程
- 解锁GIL
- 重复以上步骤
因此,Python的多线程并没有实现并行,只是实现了并发而已。如果要实现真正的并行,那就需要使用Python的多进程模块multiprocessing(multiprocessing模块的宗旨是像管理线程一样来管理进程)。
参考资料
- threading — Manage Concurrent Operations Within a Process
- Python线程同步机制: Locks, RLocks, Semaphores, Conditions, Events和Queues