基本概念
多任务
同一时间内执行多个任务
并发
看起来像同时运行的就可以称之为并发(交替执行)
并行
真正意义上的同时运行
任务数大于CPU的核数,表示并发的去执行多任务;任务数小于等于CPU的核数,表示并行的去执行多任务。
进程
进程是分配资源的最小单位。当应用程序运行时最少会开启一个进程,此时计算机会为这个进程开启独立的内存空间,不同的进程享有的空间,而一个CPU在同一时刻只能运行一个进程,其他进程都处于等待状态。
线程
线程是系统调度的最小单位。一个进程内部包括一个或者多个线程,这些线程共享此进程的内存空间与资源。相当于把一个任务又细分成若干个子任务,每个线程对应一个子任务
一个进程中可以有多个线程,同一个进程中的线程可以共享此进程中的资源。
多进程和多线程
对于一个CPU来说,在同一时刻只能运行一个进程或者一个线程,而单核CPU往往是在进程或者线程间切换执行,每个进程或者线程得到一定的CPU时间,由于切换的速度很快,在我们看来是多个任务在并行执行(同一时刻多个任务在执行),但实际上是在并发执行(一段时间内多个任务在执行)。
单核CPU的并发往往涉及到进程或者线程的切换,进程的切换比线程的切换消耗更多的时间与资源。在单核CPU下,CPU密集的任务采用多进程或多线程不会提升性能,而在IO密集的任务中可以提升(IO阻塞时CPU空闲)。
而多核CPU就可以做到同时执行多个进程或者多个进程,也就是并行运算。在拥有多个CPU的情况下,往往使用多进程或者多线程的模式执行多个任务。
多进程
Python中的多进程主要使用到multiprocessing
这个库中的Process
类。
使用Process
类创建多进程有两种方式:
- 直接创建
Process
类的实例对象,就可以创建一个新的进程; - 通过继承
Process
类的子类,创建实例对象,也可以创建新的进程;需要注意,继承Process
类的子类需重写父类的run()
方法。
Process
类提供了一些常用的属性和方法:
方法名或属性名 | 功能描述 |
---|---|
run() |
继承Process 类中需要对run() 方法进行重写,该run() 方法中包含的是新进程要执行的代码。 |
start() |
新创建的进程需要手动启动,该方法的功能就是启动新创建的线程。 |
join([timeout]) |
在多进程执行过程,其他进程必须等到调用join() 方法的进程执行完毕(或者执行规定的timeout 时间)后,才能继续执行; |
is_alive() |
判断当前进程是否还活着 |
terminate() |
中断该进程 |
name |
可以为该进程重命名,也可以获得该进程的名称,默认为Process-N,N为从1开始递增的整数 |
daemon |
通过设置该属性为True ,可将新建进程设置为“守护进程” |
pid |
返回进程的ID 号。大多数操作系统都会为每个进程配备唯一的ID 号。 |
创建进程的两种方法
通过Process
类创建进程
使用Process
类创建实例化对象,其本质是调用该类的构造方法创建新进程。Process
类的语法格式如下:
Process([group[, target[, name[, args[, kwargs]]]]])
其中,各个参数的含义为:
- group:该参数未进行实现,不需要传参;
- target:为新建进程指定执行任务,也就是指定一个函数;
- name:为新建进程设置名称;
- args:为 target 参数指定的参数传递非关键字参数,必须是元组类型;
- kwargs:为 target 参数指定的参数传递关键字参数;
实例:
import time
from multiprocessing import Process
def func(name):
print('%s is eating' % name)
time.sleep(2)
print('%s is sleeping' % name)
if __name__ == '__main__':
p = Process(target=func, args=('laobai',))
p.start()
print('test')
test
laobai is eating
laobai is sleeping
通常情况下(主要是Windows下),通过multiprocessing.Process
创建的进程都要写在__name__ == '__main__'
中。上面的例子中有2个进程,主进程和我们创建的进程,print('test')
是主进程输出的内容,print('%s is eating' % name)
和print('%s is sleeping' % name)
是我们创建的进程输出的内容。若不使用__name__ == '__main__'
创建进程,程序会不停的从上至下运行,进入死循环(Windows系统)。
通过Process
继承类创建进程
除了直接使用Process
类创建进程,还可以通过创建Process
的子类来创建进程。
需要注意的是,在创建Process
的子类时,需在子类内容重写run()
方法。实际上,该方法所起到的作用,就如同第一种创建方式中target
参数执行的函数。通过Process
子类创建进程,和使用Process
类一样,先创建该类的实例对象,然后调用start()
方法启动该进程。
import time
from multiprocessing import Process
class MyProcess(Process):
def run(self):
print('hello python')
time.sleep(1)
print('hello laobai')
if __name__ == '__main__':
p = MyProcess()
p.start()
print('hello main process')
hello main process
hello python
hello laobai
推荐直接用
Process
类创建进程创建进程就是在内存中申请一块内存空间将需要运行的代码放进去运行;
一个进程对应在内存中就是一块独立的内存空间,多个进程对应在内存中就是多块独立的内存空间;
进程与进程之间数据默认是无法直接交互的,需要借助于第三方的模块;
常用方法
join
方法
join
方法是让主进程等待子进程代码运行结束之后,再继续运行。不影响其他子进程的执行。
import time
from multiprocessing import Process
def func(name):
print('%s is eating' % name)
time.sleep(2)
print('%s is sleeping' % name)
if __name__ == '__main__':
p = Process(target=func, args=('laobai',))
p.start()
p.join()
print('hello main process')
laobai is eating
laobai is sleeping
hello main process
多个子进程的情况,并为func
方法传一个休眠时间的参数
import time
from multiprocessing import Process
def func(name, s):
print('%s is eating' % name)
time.sleep(s)
print('%s is sleeping' % name)
if __name__ == '__main__':
start_time = time.time()
p_list = []
for i in range(1, 4):
p = Process(target=func, args=('子进程%s' % i, i))
p.start()
p_list.append(p)
for p in p_list:
p.join()
print('hello main process', time.time() - start_time)
子进程2 is eating
子进程3 is eating
子进程1 is eating
子进程1 is sleeping
子进程2 is sleeping
子进程3 is sleeping
hello main process 3.131565809249878
子进程的运行是随机的,总耗时比最长的子进程时间略多一些。
进程对象属性及方法
- 计算机会给每一个运行的进程分配一个
PID
号 - Windows:
- 进入终端,输入
tasklist
tasklist | findstr PID
查看具体的某个进程
- 进入终端,输入
- Mac:
- 进入终端,输入
ps aux
pa aux | grep PID
查看具体的某个进程
- 进入终端,输入
- Python:
current_process().pid
查看当前进程的进程号os.getpid()
查看当前进程的进程号os.getppid()
查看当前子进程的父进程号
from multiprocessing import Process, current_process
import os
def func():
print('%s is running' % current_process().pid)
print('%s is running' % os.getpid())
if __name__ == '__main__':
p = Process(target=func)
p.start()
p.join()
print('hello main process', current_process().pid)
print('hello main process', os.getpid())
print('python', os.getppid())
55477 is running
55477 is running
hello main process 55475
hello main process 55475
python 342
terminate()
杀死当前进程
is_alive()
判断当前进程是否存活
from multiprocessing import Process, current_process
import os
def func():
print('%s is running' % current_process().pid)
print('%s is running' % os.getpid())
if __name__ == '__main__':
p = Process(target=func)
p.start()
print(p.is_alive())
p.terminate()
time.sleep(1)
print(p.is_alive())
p.join()
print('hello main process', current_process().pid)
print('hello main process', os.getpid())
print('python', os.getppid())
True
False
hello main process 55636
hello main process 55636
python 342
daemon
设置守护进程
p.daemon=True
这句代码要放在p.start()
的前面先运行,否则会报错。
守护进程当被守护的进程程序执行完关闭后,守护进程随之结束。
from multiprocessing import Process
import time
def func(name):
print('%s任务正在执行' % name)
time.sleep(3)
print('%s任务执行完毕' % name)
if __name__ == '__main__':
p = Process(target=func, args=('zzz',))
p.daemon = True
p.start()
print("yyy程序结束运行")
yyy程序结束运行
上面的代码,主进程结束了,守护进程p直接终止运行
进程间通信
Queue
模块
每个进程都各自在内存中开辟了独立空间,默认无法共享数据,可以使用multiprocessing
模块的Queue
实现多个进程间的数据传递,Queue
本身是一个消息队列程序。
- 队列:先进先出 (First In First Out,简称FIFO)
- 堆栈:先进后出 (First In Last Out,简称FILO)
Python有个queue
模块,但这个模块不能进行多进程之间的数据传递。而multiprocessing
模块的Queue
类是可以的。
Queue的实例方法 | 功能描述 |
---|---|
put() | 向队列中存放数据,有位置就直接存入,如果没有位置,就阻塞等待 |
get() | 从队列中获取数据,有数据就获取,如果没有数据,就阻塞等待。函数中可以设置timeout 参数,超过时间还是没有取到就报错 |
put_nowait() | 向队列中存放数据,有位置就直接存入,如果没有位置,就报错,不阻塞 |
get_nowait() | 从队列中获取数据,有数据就获取,如果没有数据,就报错,不阻塞 |
full() | 判断当前队列是否满了 |
empty() | 判断当前队列是否空了 |
示例1:
from multiprocessing import Queue
q = Queue(5) # 参数是队列的最大长度,默认为空,不传值为默认值32767,这里输入5
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)
q.put(6) # 当队列数据存满了之后,如果还有数据要存入队列会阻塞,直到有位置让出来,不会报错
示例2:
from multiprocessing import Queue
q = Queue(5) # 参数是队列的最大长度,默认为空,不传值为默认值32767,这里输入5
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5)
# q.put(6) # 当队列数据存满了之后,如果还有数据要存入队列会阻塞,直到有位置让出来,不会报错
p1 = q.get()
p2 = q.get()
p3 = q.get()
p4 = q.get()
p5 = q.get()
p6 = q.get() # 当队列中的数据取空了,如何还要从队列中取数据会阻塞,直到有新的数据可以取,不会报错
示例3:
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
print(q.full())
q.put(5)
print(q.full())
p1 = q.get()
p2 = q.get()
p3 = q.get()
p4 = q.get()
print(q.empty())
p5 = q.get()
print(q.empty())
False
True
False
True
示例4:
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
print(q.full())
q.put(5)
print(q.full())
p1 = q.get()
p2 = q.get()
p3 = q.get()
p4 = q.get()
print(q.empty())
p5 = q.get()
print(q.empty())
try:
p6 = q.get(timeout=3) # 设置超时时间3秒,还是没有取到值,就报错
except Exception as e:
print("没有值可以获取了")
False
True
True
True
没有值可以获取了
注意:
q.full()
、q.empty()
、q.get_nowait()
这三个在多进程的情况下是不精确的。
通过Queue
进行通信
主进程与子进程通信
from multiprocessing import Queue, Process
def producer(q):
q.put('向队列中存入一个数据')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p1.start()
print(q.get())
print("从队列中取出一个数据")
print(q.empty())
向队列中存入一个数据
从队列中取出一个数据
True
子进程与子进程通信
from multiprocessing import Queue, Process
from multiprocessing import set_start_method
def producer(q):
q.put('向队列中存入一个数据')
def consumer(q):
print(q.get())
print("从队列中取出一个数据")
if __name__ == '__main__':
set_start_method('fork') # MAC电脑默认启动进程的方式是fork,而python默认的方式是spawn,所以需要将python启动进程的方式做修改
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
生产者消费者模型
- 生产者:生产/制造东西的
- 消费者:消费/处理东西的
- 队列:作为媒介在生产者和消费者间传递数据
生产者消费者模型 = 生产者 + 消息队列 + 消费者
下面以“商城上架商品,顾客购买商品”为例来演示生产者消费者模型
from multiprocessing import Process, Queue
from multiprocessing import set_start_method
import time
import random
def producer(shops, goods, q):
# 生产者,1件1件的上架商品
for i in range(1, 6): # 共上架5个羽绒服
data = '%s上架的%s%s' % (shops, goods, i)
# 模拟延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(data)
def consumer(customer, q):
# 生产者上架1件商品就买走1件商品
while True:
goods = q.get() # 从队列中获取商品
time.sleep(random.randint(1, 3))
print('%s买走%s' % (customer, goods))
if __name__ == '__main__':
set_start_method('fork')
q = Queue()
p1 = Process(target=producer, args=('波司登', '羽绒服', q))
p2 = Process(target=producer, args=('李宁', '运动套装', q))
c1 = Process(target=consumer, args=('张三', q))
c2 = Process(target=consumer, args=('李四', q))
p1.start()
p2.start()
c1.start()
c2.start()
波司登上架的羽绒服1
张三买走波司登上架的羽绒服1
波司登上架的羽绒服2
李宁上架的运动套装1
波司登上架的羽绒服3
李四买走波司登上架的羽绒服2
波司登上架的羽绒服4
张三买走李宁上架的运动套装1
波司登上架的羽绒服5
李宁上架的运动套装2
李四买走波司登上架的羽绒服3
李宁上架的运动套装3
张三买走波司登上架的羽绒服4
李宁上架的运动套装4
张三买走李宁上架的运动套装2
李四买走波司登上架的羽绒服5
张三买走李宁上架的运动套装3
李四买走李宁上架的运动套装4
李宁上架的运动套装5
张三买走李宁上架的运动套装5
上面的代码实现了2个生产者和2个消费者模拟上架商品购买商品的场景,但在生产的商品全部被消费完后,程序卡住了,没有结束。接下来要在生产者生产完毕,向队列中添加特定的结束符号
from multiprocessing import Process, Queue
from multiprocessing import set_start_method
import time
import random
def producer(shops, goods, q):
# 生产者,1件1件的上架商品
for i in range(1, 6): # 共上架5个羽绒服
data = '%s上架的%s%s' % (shops, goods, i)
# 模拟延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(data)
def consumer(customer, q):
# 生产者上架1件商品就买走1件商品
while True:
goods = q.get() # 从队列中获取商品
if goods is None: break # 判断当前是否是结束标识
time.sleep(random.randint(1, 3))
print('%s买走%s' % (customer, goods))
if __name__ == '__main__':
set_start_method('fork')
q = Queue()
p1 = Process(target=producer, args=('波司登', '羽绒服', q))
p2 = Process(target=producer, args=('李宁', '运动套装', q))
c1 = Process(target=consumer, args=('张三', q))
c2 = Process(target=consumer, args=('李四', q))
p1.start()
p2.start()
c1.start()
c2.start()
p1.join()
p2.join()
# p1,p2两个生产者生产完毕后,向队列中添加特定的结束符号
# 两个消费者,就需要添加2个结束符号
q.put(None)
q.put(None)
波司登上架的羽绒服1
李宁上架的运动套装1
李四买走李宁上架的运动套装1
波司登上架的羽绒服2
李宁上架的运动套装2
张三买走波司登上架的羽绒服1
波司登上架的羽绒服3
李四买走波司登上架的羽绒服2
李四买走波司登上架的羽绒服3
张三买走李宁上架的运动套装2
李宁上架的运动套装3
波司登上架的羽绒服4
李宁上架的运动套装4
波司登上架的羽绒服5
李宁上架的运动套装5
李四买走李宁上架的运动套装3
张三买走波司登上架的羽绒服4
李四买走李宁上架的运动套装4
张三买走波司登上架的羽绒服5
李四买走李宁上架的运动套装5
Process finished with exit code 0
这次生产者和消费者处理正常,并能够正确完成程序的执行。但添加的结束符号的个数被写死了。这里引入JoinableQueue
,使用JoinableQueue
中的task_done()
方法发出信号,表示get()
的返回项已经被处理;使用JoinableQueue
中的join()
方法进行等待,直到队列中所有的项都被处理完。待定将执行到队列中的每个项均调用了task_done()
方法为止。
JoinableQueue
的实例队列q,每当往该队列中存入q.put()
数据的时候,内部会有一个计数器进行+1,每当调用了q.task_done()
的时候,计数器进行-1,q.join()
判断当计数器的值为0时,才执行后面的代码。
from multiprocessing import Process, Queue
from multiprocessing import set_start_method
from multiprocessing import JoinableQueue
import time
import random
def producer(shops, goods, q):
# 生产者,1件1件的上架商品
for i in range(1, 6): # 共上架5个羽绒服
data = '%s上架的%s%s' % (shops, goods, i)
# 模拟延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(data)
def consumer(customer, q):
# 生产者上架1件商品就买走1件商品
while True:
goods = q.get() # 从队列中获取商品
time.sleep(random.randint(1, 3))
print('%s买走%s' % (customer, goods))
q.task_done() # 通知队列已经从里面取出了一个数据并处理完成
if __name__ == '__main__':
set_start_method('fork')
q = JoinableQueue()
p1 = Process(target=producer, args=('波司登', '羽绒服', q))
p2 = Process(target=producer, args=('李宁', '运动套装', q))
c1 = Process(target=consumer, args=('张三', q))
c2 = Process(target=consumer, args=('李四', q))
p1.start()
p2.start()
c1.start()
c2.start()
p1.join()
p2.join()
q.join() # 等待队列中所有的数据都被取完再执行下面的代码
波司登上架的羽绒服1
李宁上架的运动套装1
李四买走波司登上架的羽绒服1
波司登上架的羽绒服2
李宁上架的运动套装2
张三买走李宁上架的运动套装1
波司登上架的羽绒服3
李四买走波司登上架的羽绒服2
李宁上架的运动套装3
张三买走李宁上架的运动套装2
张三买走李宁上架的运动套装3
李四买走波司登上架的羽绒服3
波司登上架的羽绒服4
李宁上架的运动套装4
波司登上架的羽绒服5
张三买走波司登上架的羽绒服4
李四买走李宁上架的运动套装4
李宁上架的运动套装5
张三买走波司登上架的羽绒服5
李四买走李宁上架的运动套装5
上面的代码通过JoinableQueue
解决了判断消费者完成消费的问题,但还是消费者还是卡在了获取数据的代码q.get()
。下面引入守护进程,将消费者设置成守护进程,主进程运行结束,守护进程也就结束了。
from multiprocessing import Process, Queue
from multiprocessing import set_start_method
from multiprocessing import JoinableQueue
import time
import random
def producer(shops, goods, q):
# 生产者,1件1件的上架商品
for i in range(1, 6): # 共上架5个羽绒服
data = '%s上架的%s%s' % (shops, goods, i)
# 模拟延迟
time.sleep(random.randint(1, 3))
print(data)
q.put(data)
def consumer(customer, q):
# 生产者上架1件商品就买走1件商品
while True:
goods = q.get() # 从队列中获取商品
time.sleep(random.randint(1, 3))
print('%s买走%s' % (customer, goods))
q.task_done() # 通知队列已经从里面取出了一个数据并处理完成
if __name__ == '__main__':
set_start_method('fork')
q = JoinableQueue()
p1 = Process(target=producer, args=('波司登', '羽绒服', q))
p2 = Process(target=producer, args=('李宁', '运动套装', q))
c1 = Process(target=consumer, args=('张三', q))
c2 = Process(target=consumer, args=('李四', q))
p1.start()
p2.start()
# 将消费者设置成守护进程
c1.daemon = True
c2.daemon = True
c1.start()
c2.start()
p1.join()
p2.join()
q.join() # 等待队列中所有的数据都被取完再执行下面的代码
波司登上架的羽绒服1
波司登上架的羽绒服2
李宁上架的运动套装1
张三买走波司登上架的羽绒服1
波司登上架的羽绒服3
张三买走李宁上架的运动套装1
李四买走波司登上架的羽绒服2
波司登上架的羽绒服4
李宁上架的运动套装2
波司登上架的羽绒服5
李四买走波司登上架的羽绒服4
张三买走波司登上架的羽绒服3
李宁上架的运动套装3
李四买走李宁上架的运动套装2
张三买走波司登上架的羽绒服5
李宁上架的运动套装4
李宁上架的运动套装5
李四买走李宁上架的运动套装3
张三买走李宁上架的运动套装4
李四买走李宁上架的运动套装5
Process finished with exit code 0
多线程
python中实现多线程要模块threading
的Thread
类来完成
threading
模块提供了一些方法或属性
方法与属性 | 功能描述 |
---|---|
current_thread() | 返回当前线程 |
active_count() | 返回当前活跃的线程数,1个主线程+n个子线程 |
get_ident() | 返回当前线程 |
enumerate() | 返回当前活动 Thread 对象列表 |
main_thread() | 返回主 Thread 对象 |
settrace(func) | 为所有线程设置一个 trace 函数 |
setprofile(func) | 为所有线程设置一个 profile 函数 |
stack_size([size]) | 返回新创建线程栈大小;或为后续创建的线程设定栈大小为 size |
TIMEOUT_MAX | Lock.acquire(), RLock.acquire(), Condition.wait() 允许的最大超时时间 |
threading
模块包含下面的类:
- Thread:基本线程类;
- Lock:互斥锁;
- RLock:可重入锁,使单一进程再次获得已持有的锁(递归锁);
- Condition:条件锁,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值;
- Semaphore:信号锁,为线程间共享的有限资源提供一个”计数器”,如果没有可用资源则会被阻塞;
- Event:事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活;
- Timer:一种计时器;
开启线程的两种方式
使用Thread
类创建多进程有两种方式:
- 直接创建
Thread
类的实例对象,就可以创建一个新的线程; - 通过继承
Thread
类的子类,创建实例对象,也可以创建新的线程;需要注意,继承Thread
类的子类需重写父类的run()
方法。
Thread
类提供了一些常用的属性和方法:
方法与属性 | 功能描述 |
---|---|
start() | 启动线程,等待CPU调度 |
run() | 线程被cpu调度后自动执行的方法 |
getName()、setName()和name | 用于获取和设置线程的名称 |
setDaemon() | 设置为后台线程或前台线程(默认是False,前台线程)。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止。如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程执行完成后,程序才停止。 |
ident | 获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。 |
is_alive() | 判断线程是否是激活的(alive)。从调用start()方法启动线程,到run()方法执行完毕或遇到未处理异常而中断这段时间内,线程是激活的。 |
isDaemon()方法和daemon属性 | 是否为守护线程 |
join([timeout]) | 调用该方法将会使主调线程堵塞,直到被调用线程运行结束或超时。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么主调线程将一直堵塞到被调线程结束。 |
通过Thread
类创建线程
使用Thread
类创建实例化对象,其本质是调用该类的构造方法创建新进程。Thread
类的语法格式如下:
Thread([group [, target [, name [, args [, kwargs]]]]])
其中,各个参数的含义为:
- group: 线程组,目前只能使用None
- target: 指定所创建的线程要调度的目标方法
- name: 线程名,一般不用设置
- args: 以元组的方式,为 target 指定的方法传递参数
- kwargs: 以字典的方式,为 target 指定的方法传递参数
示例:
from threading import Thread
import time
def func(name):
print('%s is eating' % name)
time.sleep(2)
print('%s is sleeping' % name)
t = Thread(target=func,args=('laobai',))
t.start()
print('test')
laobai is eating
test
laobai is sleeping
from threading import Thread
import time
def func(name):
print('%s is eating' % name)
time.sleep(2)
print('%s is sleeping' % name)
if __name__ == '__main__':
t = Thread(target=func, args=('laobai',))
t.start()
print('test')
laobai is eating
test
laobai is sleeping
通过上面的代码可以看出,开启线程的代码不需要一定要书写在main
中,但习惯上都会写在main
里面。
创建线程的开销非常小,几乎是代码一执行线程就已经创建了。(创建进程需要申请内存空间,而创建线程不需要)
通过Thread
继承类创建线程
除了直接使用Thread
类创建线程,还可以通过创建Thread
的子类来创建线程。
需要注意的是,在创建Thread
的子类时,需在子类内容重写run()
方法。实际上,该方法所起到的作用,就如同第一种创建方式中target
参数执行的函数。通过Thread
子类创建线程,和使用Process
类一样,先创建该类的实例对象,然后调用start()
方法启动该线程。
from threading import Thread
import time
class MyThead(Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print('%s is eating' % self.name)
time.sleep(2)
print('%s is sleeping' % self.name)
if __name__ == '__main__':
t = MyThead('laobai')
t.start()
print('test')
laobai is eating
test
laobai is sleeping
常用方法
join方法
join
方法是让主线程等待子线程代码运行结束之后,再继续运行。不影响其他子线程的执行。
from threading import Thread
import time
def func(name):
print('%s is eating' % name)
time.sleep(2)
print('%s is sleeping' % name)
if __name__ == '__main__':
t = Thread(target=func, args=('laobai',))
t.start()
t.join()
print('主线程')
laobai is eating
laobai is sleeping
主线程
线程对象属性与其方法
a. 一个进程下的多个线程拥有同一个PID
from threading import Thread
import os
def func():
print('子线程', os.getpid())
if __name__ == '__main__':
t = Thread(target=func)
t.start()
t.join()
print('主线程', os.getpid())
子线程 85535
主线程 85535
b. current_thread()
当前线程
from threading import Thread, current_thread
import os
def func():
print('子线程', current_thread().name)
if __name__ == '__main__':
t1 = Thread(target=func)
t2 = Thread(target=func)
t1.start()
t2.start()
t1.join()
t2.join()
print('主线程', current_thread().name)
子线程 Thread-1
子线程 Thread-2
主线程 MainThread
c. active_count() 当前正在活跃的进程数
from threading import Thread, current_thread,active_count
import os
def func():
print('子线程', current_thread().name)
time.sleep(1)
if __name__ == '__main__':
t1 = Thread(target=func)
t2 = Thread(target=func)
t1.start()
t2.start()
print('主线程', active_count())
子线程 Thread-1
子线程 Thread-2
主线程 3
守护线程
主线程运行结束之后,不会立刻结束,会等待所有其他非守护子线程结束后才会结束,因为主线程的结束意味着所在进程的结束。
from threading import Thread
import time
def func(name):
print('%s is eating' % name)
time.sleep(2)
print('%s is sleeping' % name)
if __name__ == '__main__':
t = Thread(target=func, args=('laobai',))
t.daemon = True
t.start()
print('主线程')
laobai is eating
主线程
下面的例子2两个子线程,其中1个是守护线程,另1个不是守护线程,注意运行结果:
from threading import Thread
import time
def func1():
print('hello python --> func1')
time.sleep(1)
print('hello world --> func1')
def func2():
print('hello python --> func2')
time.sleep(3)
print('hello world --> func2')
if __name__ == '__main__':
t1 = Thread(target=func1)
t2 = Thread(target=func2)
t1.daemon = True
t1.start()
t2.start()
print('hello world --> main')
hello python --> func1
hello python --> func2
hello world --> main
hello world --> func1
hello world --> func2
主线程运行结束之后,不会立刻结束,会等待所有其他非守护子线程结束后才会结束。
线程互斥锁
Python提供锁的机制,又称互斥锁,互斥锁为资源引入一个状态:锁定/非锁定。
某个线程要更改共享数据时,先将其锁定,此时资源的状态为”锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。
未使用锁:
from threading import Thread
import time
num = 100
def func():
global num
temp = num
time.sleep(0.1)
num = temp - 1
if __name__ == '__main__':
t_list = []
for i in range(100):
t = Thread(target=func)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(num)
99
使用锁:
from threading import Thread, Lock
import time
num = 100
mutex = Lock() # 创建锁
def func():
global num
mutex.acquire() # 锁定
temp = num
time.sleep(0.1)
num = temp - 1
mutex.release() # 解锁
if __name__ == '__main__':
t_list = []
for i in range(100):
t = Thread(target=func)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(num)
0
加锁是将并行处理变成串行处理
多进程和多线程有个最大的区别:对于多线程,同一个变量各自有一份拷贝存在于每个进程,互不影响;而多线程所有的线程公用所有的变量,任何一个变量都可以被任意的一个线程修改。
GIL全局解释器锁
GIL即全局解释器锁(global interpreter lock),每个线程在执行时候都需要先获取GIL,保证同一个时刻只有一个线程可以执行代码,即同一时刻只有一个线程使用CPU,也就是说多线程并不是真正意义上的同时执行。
- GIL不是Python的特点,而是Cpython解释器的特点;
- GIL是保证解释器级别的数据安全;
- 同一个进程下的多个线程无法同时执行,即无法利用多核优势;
- 针对不同的数据还是需要加不同的锁处理;
看如下代码,相较于上面的代码,注释掉了time.sleep(0.1)
:
from threading import Thread, Lock
import time
num = 100
def func():
global num
temp = num
# time.sleep(0.1)
num = temp - 1
if __name__ == '__main__':
t_list = []
for i in range(100):
t = Thread(target=func)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(num)
0
上面的代码执行结果为0,这是因为编辑器有默认的GIL,锁即将并行处理变成串行处理。
from threading import Thread, Lock
import time
num = 100
def func():
global num
temp = num
time.sleep(0.1)
num = temp - 1
if __name__ == '__main__':
t_list = []
for i in range(100):
t = Thread(target=func)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(num)
99
而添加了time.sleep(0.1)
的代码执行结果就变成了99,这不是预期的结果。(为什么要增加time.sleep()
?这里是为了模拟真实的场景,这里的测试代码过于简单,并且本机执行并没有复杂的处理,而真实的项目代码处理会相对复杂,再加上网络延迟、io等情况,就会有延时时间)这就需要我们自己加锁了。
from threading import Thread, Lock
import time
num = 100
mutex = Lock()
def func():
global num
mutex.acquire()
temp = num
time.sleep(0.1)
num = temp - 1
mutex.release()
if __name__ == '__main__':
t_list = []
for i in range(100):
t = Thread(target=func)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(num)
0
- 上面的代码创建了100个线程,启动后都会先去抢GIL;
- 只有1个线程抢到了GIL,向下执行代码,抢到了我们自定义的互斥锁;
- 继续执行代码到
time.sleep()
或其他io
代码块会自动释放GIL,但还继续拥有互斥锁;此时其他的某个线程抢到了GIL; - 第1个线程执行完毕释放互斥锁,第2个线程抢到互斥锁,另外某个线程抢到GIL,以此类推。
多进程与多线程的实际应用场景
目前计算机的CPU都是多核,所以这里不考虑单核的情况。
计算密集型(多核):
多进程:并行,耗时短
多线程:串行,耗时长
IO密集型(多核):
多进程:时间相仿,资源开销大
多线程:时间相仿,资源开销小
进程池和线程池
池的概念
池是用来保证计算机硬件安全的情况下最大限度的利用计算机,它降低了程序的运行效率,但保证了计算机硬件的安全,从而能够让程序正常的运行。
使用Python标准库中的concurrent.futures
模块中的ThreadPoolExecutor
和ProcessPoolExecutor
两个类,分别实现了对threading
模块和multiprocessing
模块的进一步抽象
线程池
ThreadPoolExecutor(5)
线程池的参数输入数字,这里是5,即启动5个线程,这5个线程不会出现重复创建和销毁的过程。括号内可以传数字,不传的话默认会开启当前计算机CPU个数五倍的线程。使用池子时,只需要将需要做的任务往池子里面放就可以,某个线程会自动进行处理。
任务的提交方式:
- 同步:提交任务之后等待任务返回结果,期间不做任何做事;
- 异步:提交任务之后不等待任务返回结果,执行继续往下执行;
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
pool = ThreadPoolExecutor(5)
def func(n):
print(n)
time.sleep(2)
pool.submit(func,1) # 向池子中提交任务,这里是函数func和参数1
print('test')
1
test
根据上面代码的执行结果,1和test几乎同时被输出,可以判断,这里的pool.submit()
是异步提交。
上面的代码只向池中提交了一个数据1,接下来向池中提交20个数据
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
pool = ThreadPoolExecutor(5)
def func(n):
print(n)
time.sleep(2)
for i in range(1, 21):
pool.submit(func, i) # 向池中提交20个数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
从执行结果可以看到,5个值一输出,sleep
2秒,5个值一输出,直到20个值都输出完毕
异步提交是不等结果返回先继续执行,上面的代码中没有接收和输出返回值
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
pool = ThreadPoolExecutor(5)
def func(n):
print(n)
time.sleep(2)
for i in range(1, 21):
res = pool.submit(func, i)
print(res.result())
1
None
2
None
3
None
4
None
5
None
6
None
7
None
8
None
9
None
10
None
11
None
12
None
13
None
14
None
15
None
16
None
17
None
18
None
19
None
20
None
有执行结果可以看到,程序由并发变成了串行,异步变成了同步。
res.result()
获取到的是异步提交的任务返回来的结果,这里是func
的返回值,因为没有return
语句所以返回值是None
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
pool = ThreadPoolExecutor(5)
def func(n):
print(n)
time.sleep(2)
return n + 1000
# 增加一个列表,启动的线程添加到列表中
t_list = []
for i in range(1, 21):
res = pool.submit(func, i)
t_list.append(res)
# 从列表中获取线程继续程序的执行
for t in t_list:
print(t.result())
1
2
3
4
5
6
7
8
9
10
1001
1002
1003
1004
1005
11
12
13
1006
14
15
1007
1008
1009
1010
1617
1011
18
19
1012
20
1013
1014
1015
1016
1017
1018
1019
1020
程序再次由同步变成异步。
下面使用pool.shutdown()
来实现,先全部执行线程,然后同时输出返回结果。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
pool = ThreadPoolExecutor(5)
def func(n):
print(n)
time.sleep(2)
return n + 1000
t_list = []
for i in range(1, 21):
res = pool.submit(func, i)
# print(res.result())
t_list.append(res)
pool.shutdown() # 关闭线程池,等待线程池中所有任务运行完毕
for t in t_list:
print(t.result())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
通过回调方法来获取返回的结果
pool.submit()
有个add_done_callback()
方法,即为异步提交添加回调方法,一旦有结果立刻返回。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
pool = ThreadPoolExecutor(5)
def func(n):
print(n)
time.sleep(2)
return n + 1000
# 定义一个回调函数
def call_back(n):
print(n.result())
t_list = []
for i in range(1, 21):
res = pool.submit(func, i).add_done_callback(call_back) # .add_done_callback() 为pool.submit()添加回调机制,括号内输入需要返回结果的回调函数
t_list.append(res)
1
2
3
4
5
1001
6
1003
7
1004
8
1005
9
1002
10
1006
11
1009
12
1008
13
1010
14
1007
15
1012
16
1015
17
1013
18
1014
19
1011
20
1016
1020
1019
1017
1018
进程池
ProcessPoolExecutor()
进程池的参数是数字,不传默认会开启当前计算机CPU个数的进程。池子造出来之后,里面会固定的几个进程。这几个进程不会出现重复创建和销毁的过程。
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, os
pool = ProcessPoolExecutor(5)
def func(n):
print(n, os.getpid())
time.sleep(2)
return n + 1000
if __name__ == '__main__':
t_list = []
for i in range(1, 21):
res = pool.submit(func, i)
t_list.append(res)
1 7475
2 7474
3 7473
4 7476
5 7477
6 7478
7 7479
8 7480
9 7475
10 7474
11 7473
12 7476
13 7478
14 7477
15 7479
16 7480
17 7475
18 7474
19 7473
20 7476
从执行结果可以看到是5个进程重复执行了4次。
进程的操作都要写在__name__ == '__main__'
中,否则会报错。
下面是增加回调机制来返回结果
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, os
pool = ProcessPoolExecutor(5)
def func(n):
print(n, os.getpid())
time.sleep(2)
return n + 1000
def call_back(n):
print(n.result())
if __name__ == '__main__':
t_list = []
for i in range(1, 21):
res = pool.submit(func, i).add_done_callback(call_back)
t_list.append(res)
1 8014
2 8015
3 8016
4 8017
5 8018
6 8014
1001
7 8016
8 8015
1003
1002
9 8018
10 8017
1005
1004
11 8014
12 8016
13 8015
1006
1007
1008
14 8017
15 8018
1010
1009
16 8014
1011
17 8015
18 8016
1013
1012
19 8017
20 8018
1014
1015
1016
1017
1018
1020
1019