Python并发编程-线程

第1章 线程介绍

1.1 进程和线程的区别

  1. 地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
  2. 通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
  3. 调度和切换:线程上下文切换比进程上下文切换要快得多。
  4. 在多线程操作系统中,进程不是一个可执行的实体。

1.2 线程的特点

  1. 轻型实体

线程中的实体基本上不拥有系统资源,只是有一点必不可少的、能保证独立运行的资源。线程的实体包括程序、数据和TCB。线程是动态概念,它的动态特性由线程控制块TCB(Thread Control Block)描述。

Tips:TCB包括以下信息:

(1)线程状态。

(2)当线程不运行时,被保存的现场资源。

(3)一组执行堆栈。

(4)存放每个线程的局部变量主存区。

(5)访问同一个进程中的主存和其它资源。

用于指示被执行指令序列的程序计数器、保留局部变量、少数状态参数和返回地址等的一组寄存器和堆栈

  1. 独立调度和分派的基本单位。

在多线程OS中,线程是能独立运行的基本单位,因而也是独立调度和分派的基本单位。由于线程很“轻”,故线程的切换非常迅速且开销小(在同一进程中的)。

  1. 共享进程资源。

线程在同一进程中的各个线程,都可以共享该进程所拥有的资源,这首先表现在:所有线程都具有相同的进程id,这意味着,线程可以访问该进程的每一个内存资源;此外,还可以访问进程所拥有的已打开文件、定时器、信号量机构等。由于同一个进程内的线程共享内存和文件,所以线程之间互相通信不必调用内核。

  1. 可并发执行。

在一个进程中的多个线程之间,可以并发执行,甚至允许在一个进程中所有线程都能并发执行;同样,不同进程中的线程也能并发执行,充分利用和发挥了处理机与外围设备并行工作的能力。

1.3 使用线程的实际场景

图片[1]|Python并发编程-线程|leon的博客

开启一个字处理软件进程,该进程肯定需要办不止一件事情,比如监听键盘输入,处理文字,定时自动将文字保存到硬盘,这三个任务操作的都是同一块数据,因而不能用多进程。只能在一个进程里并发地开启三个线程,如果是单线程,那就只能是,键盘输入时,不能处理文字和自动保存,自动保存时又不能输入和处理文字。

第2章 在python程序中的线程操作

2.1 理论知识

2.1.1 全局解释器锁GIL

Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。

对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

在多线程环境中,Python 虚拟机按以下方式执行:

  1. 设置 GIL;
  2. 切换到一个线程去运行;
  3. 运行指定数量的字节码指令或者线程主动让出控制(可以调用sleep(0));
  4. 把线程设置为睡眠状态;
  5. 解锁 GIL;
  6. 再次重复以上所有步骤。

在调用外部代码(如 C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)编写扩展的程序员可以主动解锁GIL

2.1.2 python线程模块的选择

Python提供了几个用于多线程编程的模块,包括thread、threading和Queue等。thread和threading模块允许程序员创建和管理线程。thread模块提供了基本的线程和锁的支持,threading提供了更高级别、功能更强的线程管理的功能。Queue模块允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。尽量选择使用threading模块。

2.2 threading模块

2.2.1 线程的创建

【示例】:第一种方法,函数

from threading import Thread
import time

def sayhi(name):
time.sleep(2)
print('%s say hi' %name)

if __name__ == '__main__':
t=Thread(target=sayhi,args=('leon',))
t.start()
print('主线程')

【运行结果】:

主线程
leon say hi

【示例】:第二种方法,类继承

from threading import Thread
import time

class Sayhi(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
time.sleep(2)
print('%s say hi' % self.name)

if __name__ == '__main__':
t = Sayhi('leon')
t.start()
print('主线程')

【运行结果】:

主线程
leon say hi

2.2.2 多线程与多进程

【示例】:pid的比较

from threading import Thread
from multiprocessing import Process
import os

def work():
print('hello',os.getpid())

if __name__ == '__main__':
# part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
t1=Thread(target=work)
t2=Thread(target=work)
t1.start()
t2.start()
print('主线程/主进程pid',os.getpid())

# part2:开多个进程,每个进程都有不同的pid
p1=Process(target=work)
p2=Process(target=work)
p1.start()
p2.start()
print('主线程/主进程pid',os.getpid())

【运行结果】:

hello 4932
hello 4932
主线程/主进程pid 4932
主线程/主进程pid 4932
hello 5752
hello 12792

【示例】:开启效率的比较

import time
from threading import Thread
from multiprocessing import Process

def func(n):
n + 1

if __name__ == '__main__':
start = time.time()
t_lst = []
for i in range(100):
t = Thread(target=func,args=(i,))
t.start()
t_lst.append(t)
for t in t_lst:t.join()
t1 = time.time() - start

start = time.time()
t_lst = []
for i in range(100):
t = Process(target=func, args=(i,))
t.start()
t_lst.append(t)
for t in t_lst: t.join()
t2 = time.time() - start

print('线程时间:%s' %t1)
print('进程时间:%s' %t2)

【运行结果】:内存数据的共享问题

线程时间:0.031244277954101562
进程时间:4.120954990386963

【示例】:内存数据的共享问题

from threading import Thread
from multiprocessing import Process

def work():
global n
n=0

if __name__ == '__main__':
# n=100
# p=Process(target=work)
# p.start()
# p.join()
# print('主',n) # 子进程p已经将自己的全局n改成了0,但改的仅是它自己的,查看父进程的n仍然为100

n=1
t=Thread(target=work)
t.start()
t.join()
print('主',n) # 查看结果为0,因为同一进程内的线程之间共享进程内的数据

【运行结果】:

# 主 100
主 0

2.2.3 多线程实现socket

【服务端】:

import threading
import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)

def action(conn):
while True:
data=conn.recv(1024)
print(data)
conn.send(data.upper())

if __name__ == '__main__':
while True:
conn,addr=s.accept()
p=threading.Thread(target=action,args=(conn,))
p.start()

【客户端】:

import socket

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(('127.0.0.1',8080))

while True:
msg=input('>>: ').strip()
if not msg:continue

s.send(msg.encode('utf-8'))
data=s.recv(1024)
print(data)

【运行结果】:

# 服务端
b'client1'
b'client2'

# 客户端1
>>: client1
b'CLIENT1'
>>:

# 客户端2
>>: client2
b'CLIENT2'
>>:

2.2.4 Thread类的其他方法

Thread实例对象的方法
# isAlive(): 返回线程是否活动的。
# getName(): 返回线程名。
# setName(): 设置线程名。

threading模块提供的一些方法:
# threading.currentThread(): 返回当前的线程变量。
# threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
# threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

【示例】:

from threading import Thread
import threading

def work():
import time
time.sleep(3)
print(threading.current_thread().getName())

if __name__ == '__main__':
# 在主进程下开启线程
t=Thread(target=work)
t.start()

print(threading.current_thread().getName())
print(threading.current_thread()) # 主线程
print(threading.enumerate()) # 连同主线程在内有两个运行的线程
print(threading.active_count())
print('主线程/主进程')

【运行结果】:

MainThread
<_MainThread(MainThread, started 956)>
[<_MainThread(MainThread, started 956)>, <Thread(Thread-1, started 10592)>]
2
主线程/主进程
Thread-1

2.2.5 守护线程

Tips:无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行。

1.对主进程来说,运行完毕指的是主进程代码运行完毕

2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束。

主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

【示例】:

import time
from threading import Thread

def func1():
while True:
print('*'*10)
time.sleep(1)

def func2():
print('in func2')
time.sleep(5)

t = Thread(target=func1,)
t.daemon = True
t.start()
t2 = Thread(target=func2,)
t2.start()
# t2.join()
print('主线程')

【运行结果】:

# **********
# in func2
# **********
# **********
# **********
# **********
# 主线程

**********
in func2
主线程
**********
**********
**********
**********

2.3 锁

2.3.1 互斥锁

【示例】:同步锁的引用

import time
from threading import Lock,Thread

def func(lock):
global n
lock.acquire()
temp = n
time.sleep(0.2)
n = temp - 1
lock.release()

n = 10
t_lst = []
lock = Lock()
for i in range(10):
t = Thread(target=func,args=(lock,))
t.start()
t_lst.append(t)

for t in t_lst: t.join()
print(n)

【运行结果】:

0 # 若不加锁结果为9

Tips:互斥锁与join的区别:

  1. 不加锁:并发执行,速度快,数据不安全
  2. 不加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
  3. 如果start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高

2.3.2 死锁和递归锁

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

解决方法:递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

【示例】:科学家吃面死锁

import time
from threading import Lock,Thread

noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print('%s拿到面条啦'%name)
fork_lock.acquire()
print('%s拿到叉子了'%name)
print('%s吃面'%name)
fork_lock.release()
noodle_lock.release()

def eat2(name):
fork_lock.acquire()
print('%s拿到叉子了'%name)
time.sleep(1)
noodle_lock.acquire()
print('%s拿到面条啦'%name)
print('%s吃面'%name)
noodle_lock.release()
fork_lock.release()

Thread(target=eat1,args=('leon',)).start()
Thread(target=eat2,args=('shadow',)).start()
Thread(target=eat1,args=('张三',)).start()
Thread(target=eat2,args=('李四',)).start()

【运行结果】:

leon拿到面条啦
leon拿到叉子了
leon吃面
shadow拿到叉子了
张三拿到面条啦
# 出现死锁

【示例】:解决科学家吃面死锁

import time
from threading import Thread,RLock # 递归锁

fork_lock = noodle_lock = RLock() # 一个钥匙串上的两把钥匙
def eat1(name):
noodle_lock.acquire() # 一把钥匙
print('%s拿到面条啦'%name)
fork_lock.acquire()
print('%s拿到叉子了'%name)
print('%s吃面'%name)
fork_lock.release()
noodle_lock.release()

def eat2(name):
fork_lock.acquire()
print('%s拿到叉子了'%name)
time.sleep(1)
noodle_lock.acquire()
print('%s拿到面条啦'%name)
print('%s吃面'%name)
noodle_lock.release()
fork_lock.release()

Thread(target=eat1,args=('leon',)).start()
Thread(target=eat2,args=('shadow',)).start()
Thread(target=eat1,args=('张三',)).start()
Thread(target=eat2,args=('李四',)).start()

【运行结果】:

leon拿到面条啦
leon拿到叉子了
leon吃面
shadow拿到叉子了
shadow拿到面条啦
shadow吃面
张三拿到面条啦
张三拿到叉子了
张三吃面
李四拿到叉子了
李四拿到面条啦
李四吃面

2.3.3 信号量

同进程的一样,Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

【示例】:同时只有4个线程可以获得semaphore,即可以限制最大连接数为4

import time
from threading import Semaphore,Thread
def func(sem,a,b):
sem.acquire()
time.sleep(1)
print(a+b)
sem.release()

sem = Semaphore(4)
for i in range(10):
t = Thread(target=func,args=(sem,i,i+5))
t.start()

【运行结果】:

7
5
9
11
15
17
13
19
23
21 # 4个4个显示

2.3.4 事件

同进程的一样。

event.isSet():返回event的状态值;
event.wait():如果 event.isSet()==False将阻塞线程;
event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
event.clear():恢复event的状态值为False。

【示例】:模拟数据库3次连接失败报错

import time
import random
from threading import Thread,Event

def connect_db(e):
count = 0
while count < 3:
e.wait(0.4) # 状态为False的时候,我只等待1s就结束
if e.is_set() == True:
print('连接数据库')
break
else:
count += 1
print('第%s次连接失败'%count)
else:
raise TimeoutError('数据库连接超时')

def check_web(e):
time.sleep(random.randint(0,3))
e.set()

e = Event()
t1 = Thread(target=connect_db,args=(e,))
t2 = Thread(target=check_web,args=(e,))
t1.start()
t2.start()

【运行结果】:

第1次连接失败
第2次连接失败
连接数据库

2.3.5 条件

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

简单地说:使得线程等待,只有满足某条件时,才释放n个线程。

【示例】:

from threading import Thread,Condition

def func(con,i):
con.acquire()
con.wait() # 等钥匙
print('在第%s个循环里'%i)
con.release()
con = Condition()

for i in range(10):
Thread(target=func,args = (con,i)).start()

while True:
num = int(input('>>>'))
con.acquire()
con.notify(num) # 造钥匙,钥匙是一次性的
con.release()

【运行结果】:

>>>2
>>>在第0个循环里
在第1个循环里
3
>>>在第4个循环里
在第3个循环里
在第2个循环里
3
>>>在第7个循环里
在第5个循环里
在第6个循环里
4
>>>在第9个循环里
在第8个循环里

2.3.6 定时器

定时器,指定n秒后执行某个操作。

【示例】:时间同步

import time
from threading import Timer
def func():
print('时间同步') # 1-3

while True:
t = Timer(5,func).start() # 非阻塞的
time.sleep(5)

【运行结果】:

时间同步
时间同步
时间同步
时间同步
时间同步

2.4 线程队列

2.4.1 队列

【示例】:队列,先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

【运行结果】:

first
second
third

2.4.2 栈

【示例】:栈,先进后出

import queue

q=queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

【运行结果】:

third
second
first

2.4.3 优先级队列

【示例】:优先级队列,按照ASCII码顺序取出

import queue

q = queue.PriorityQueue() # 优先级队列
q.put((20,'a'))
q.put((10,'b'))
q.put((30,'c'))
q.put((1,'d'))
q.put((1,'e'))

print(q.get())
print(q.get())
print(q.get())

【运行结果】:

(1, 'd')
(1, 'e')
(10, 'b')

2.5 线程池

2.5.1 线程池介绍

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用

2.5.2 基本方法

# 异步提交任务
submit(fn, *args, **kwargs)

# 取代for循环submit的操作
map(func, *iterables, timeout=None, chunksize=1)

# 相当于进程池的pool.close()+pool.join()操作
# wait=True,等待池内所有任务执行完毕回收完资源后才继续
# wait=False,立即返回,并不会等待池内的任务执行完毕
# 但不管wait参数为何值,整个程序都会等到所有任务执行完毕submit和map必须在shutdown之前
shutdown(wait=True)

# 取得结果
result(timeout=None)

# 回调函数
add_done_callback(fn)

2.5.3 回调函数

【示例】:

import time
from concurrent.futures import ThreadPoolExecutor

def func(n):
time.sleep(2)
print(n)
return n*n

def call_back(m):
print('结果是 %s'%m.result())

tpool = ThreadPoolExecutor(max_workers=5) # 默认不要超过cpu个数*5
for i in range(8):
tpool.submit(func,i).add_done_callback(call_back)

# tpool.map(func,range(8)) # 拿不到返回值
# t_lst = []
# for i in range(8):
# t = tpool.submit(func,i)
# t_lst.append(t)
# tpool.shutdown() # close+join
#
# print('主线程')
# for t in t_lst:print('***',t.result())

【运行结果】:

2
结果是 4
3
结果是 9
4
结果是 16
1
结果是 1
0
结果是 0
5
结果是 25
6
结果是 36
7
结果是 49

# 2
# 1
# 0
# 3
# 4
# 7
# 5
# 8
# 6
# 9
# 12
# 10
# 13
# 14
# 11
# 15
# 19
# 16
# 17
# 18
# 0
# 1
# 4
# 2
# 3
# 5
# 6
# 7
# 主线程
# *** 0
# *** 1
# *** 4
# *** 9
# *** 16
# *** 25
# *** 36
# *** 49
温馨提示:本文最后更新于2022-12-20 20:57:45,已超过494天没有更新。某些文章具有时效性,若文章内容或图片资源有错误或已失效,请联系站长。谢谢!
转载请注明本文链接:https://blog.leonshadow.cn/763482/1323.html
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享