昆明网站制作在线,怎样做知道网站,石家庄做公司网站,网站开发建设公司地址前序博客中已经介绍了基于多进程的并发编程#xff0c;本篇主要介绍基于多线程的并发编程。 1 全局解释锁
1.1 定义 全局解释锁(Global Interpreter Lock#xff0c;简称GIL)是Python(特别是CPython)解释器中的一个机制#xff0c;这个机制会限制同一时间只有一个线程执行P… 前序博客中已经介绍了基于多进程的并发编程本篇主要介绍基于多线程的并发编程。 1 全局解释锁
1.1 定义 全局解释锁(Global Interpreter Lock简称GIL)是Python(特别是CPython)解释器中的一个机制这个机制会限制同一时间只有一个线程执行Python字节码。GIL的好处主要有以下两点
保护解释器的全局状态在多线程环境中Python解释器内部的许多数据结构是全局共享的GIL通过确保同一时刻只有一个线程执行字节码防止了多个线程同时修改这些数据结构而导致的数据不一致或崩溃。简化内存管理Python的内存管理系统需要在对象分配和释放时更新引用计数。GIL使得这一操作变得线程安全而无需使用复杂的锁机制。 但GIL也会带来一些负面影响。比如使Python多线程无法充分利用多核CPU的优势。所以本篇要介绍的Python多线程并没有实现真正的并行(Parallel)执行而是并发(Concurrent)执行。 Tips: 并发是指在同一时间段内多个任务交替执行可能是单核处理器通过时间片轮转实现也可能是在多核处理器上通过操作系统的调度实现。虽然多个任务看起来是同时进行的但实际上每个任务在某个时间点上并没有真正同时执行。并行是指在同一时刻多个任务真正同时执行。并行通常需要多核处理器或多个处理器核心每个任务在不同的处理器核心上运行从而实现真正的同时执行。
2 多线程 Python中的threading模块可以实现多线程。本篇主要基于该模块介绍python中多线程的一般用法。
2.1 创建和启动线程 Python中可以通过两种方法创建线程。一种是通过重写threading.Thread中的run方法而是直接把目标函数传递给threading.Thread类。具体代码举例
import threading
import time
import datetime# 方式1: 继承Thread类
class MyThread(threading.Thread):def run(self):print(fThread {self.name} is running:)time.sleep(2)print(fThread {self.name} is completed.)# 方式2: 直接传递目标函数
def thread_function(name):print(fThread {name} is running:)time.sleep(2)print(fThread {name} is completed.)# 创建和启动线程
threads []
for i in range(3):t1 MyThread(namefThread-{i1})t2 threading.Thread(targetthread_function, args(fThread-{i4},))threads.append(t1)threads.append(t2)
print(Start all threads:,datetime.datetime.now())
# 启动所有线程
for t in threads:t.start()
# 等待所有线程完成
for t in threads:t.join()print(All threads completed:,datetime.datetime.now())其代码执行结果如下
Start all threads: 2024-05-24 17:18:28.677731
Thread Thread-1 is running:
Thread Thread-4 is running:
Thread Thread-2 is running:
Thread Thread-5 is running:
Thread Thread-3 is running:
Thread Thread-6 is running:
All threads completed: 2024-05-24 17:18:30.682527从代码运行结果中可以看到6个线程总共运行了2s这似乎与前文说的“Python多线程只能并发无法真正并行执行”的结论违背。出现这种情况是因为Python线程在执行time.sleep时释放了GIL使得其他线程可以继续执行。在这种情况下所有6个线程几乎同时启动并进入休眠状态而不是一个接一个地进行所以总的执行时间将是最长的单个线程的执行时间即2秒而不是所有线程执行时间的总和。 将上述代码中的每个线程的任务改成CPU密集型任务再来看代码运行结果
import threading
import datetimedef target_function():sum0for i in range(10000):for j in range(10000):sumi*j# 创建和启动线程
threads []
for i in range(3):t1 threading.Thread(targettarget_function)threads.append(t1)print(验证执行一次target_function的时间)
print(验证开始:,datetime.datetime.now())
target_function()
print(验证结束:,datetime.datetime.now())
print(3个线程执行开始:,datetime.datetime.now())
# 启动所有线程
for t in threads:t.start()
# 等待所有线程完成
for t in threads:t.join()print(3个线程执行结束:,datetime.datetime.now())其执行结果如下
验证执行一次target_function的时间
验证开始: 2024-05-24 18:14:40.580747
验证结束: 2024-05-24 18:14:46.259962
3个线程执行开始: 2024-05-24 18:14:46.260006
3个线程执行结束: 2024-05-24 18:15:02.237260从运行结果中可以看到单次target_function的执行时间不到6s而3个线程总的执行时间大致16s和3个线程顺序执行的时间差不多。
2.2 线程同步 线程同步可以防止多个线程在同一时间访问或修改共享数据从而导致数据错误或程序行为异常。Python提供了多种线程同步机制常见的包括锁、条件变量、信号量和事件等。
2.2.1 锁 在Python多线程编程中锁可以用于同步对多种共享资源的访问确保线程安全和数据一致性。这些资源包括全局变量和实例变量、数据结构(如列表、字典等)、文件和I/O资源、数据库连接、线程之间的通信资源以及网络资源。目前threading模块中有两种锁类
LockLock对象是最基本的同步原语有两种状态锁定和非锁定。线程可以通过acquire()方法获得锁如果锁已经被其他线程获得则调用线程会被阻塞直到锁被释放。线程通过release()方法释放锁允许其他线程获得锁。RLock(可重入锁)RLock对象允许同一个线程多次获得同一个锁而不会引起死锁。每次调用acquire()方法都会增加锁的计数直到调用相应次数的release()方法锁才会被释放。
具体代码举例如下
import threading
import queuecounter 0
def increment_counter():#累加器global counterfor _ in range(1000):with counter_lock: # 使用with语句简化锁的获取和释放counter 1def producer():#生产者线程for i in range(10):#手动获取或释放锁queue_lock.acquire()shared_queue.put(i)queue_lock.release()def consumer():#消费者线程while True:with queue_lock:if not shared_queue.empty():item shared_queue.get()else:breakclass SharedResource:def __init__(self):self.value 0def increment(self):with rlock:self.value 1self.double()def double(self):with rlock:self.value * 2
def task():for _ in range(10):resource.increment()
if __name____main__:#1.全局变量锁counter_lock threading.Lock()threads [threading.Thread(targetincrement_counter) for _ in range(10)]for thread in threads:thread.start()for thread in threads:thread.join()print(全局变量的最终值:,counter)#2.共享队列锁shared_queue queue.Queue()queue_lock threading.Lock()producer_thread threading.Thread(targetproducer)consumer_thread threading.Thread(targetconsumer)producer_thread.start()producer_thread.join()consumer_thread.start()consumer_thread.join()#3.可重入锁rlock threading.RLock()resource SharedResource()threads [threading.Thread(targettask, namefThread-{i1}) for i in range(3)]for thread in threads:thread.start()for thread in threads:thread.join()print(fFinal value: {resource.value})其代码运行结果如下
全局变量的最终值: 10000
Final value: 21474836462.2.2 信号量 信号量(Semaphore)是用于控制资源访问数量的同步原语。信号量内部有一个计数器每次调用acquire()方法时计数器减1每次调用release()方法时计数器加1。如果计数器为0调用acquire()的线程将被阻塞直到其他线程调用release()。其用法举例如下
import threading
import time
import datetime
semaphore threading.Semaphore(2) # 最多允许两个线程同时执行
def task(name):semaphore.acquire()print(fThread {name} starting:,datetime.datetime.now())time.sleep(5)print(fThread {name} finished:,datetime.datetime.now())semaphore.release()threads [threading.Thread(targettask, args(fThread-{i1},)) for i in range(5)]
for thread in threads:thread.start()
for thread in threads:thread.join()
print(All tasks completed)其执行结果如下
Thread Thread-1 starting: 2024-05-25 16:23:12.605692
Thread Thread-2 starting: 2024-05-25 16:23:12.605832
Thread Thread-1 finished: 2024-05-25 16:23:17.608979
Thread Thread-2 finished: 2024-05-25 16:23:17.608901
Thread Thread-3 starting: 2024-05-25 16:23:17.609524
Thread Thread-4 starting: 2024-05-25 16:23:17.609970
Thread Thread-3 finished: 2024-05-25 16:23:22.613643
Thread Thread-4 finished: 2024-05-25 16:23:22.613916
Thread Thread-5 starting: 2024-05-25 16:23:22.615044
Thread Thread-5 finished: 2024-05-25 16:23:27.618141
All tasks completed从代码运行结果可以看到限定资源数量为2之后线程3、4是等到线程1、2结束才开始执行的。
2.3 线程通信 线程通信指的是线程之间传递信息和协调行为的机制。线程通信可以通过多种方式实现包括队列、事件、条件变量、信号量等。这里仅介绍事件和的条件变量。
2.3.2 事件
事件(Event)是一种用于线程间通信和同步的机制。事件对象允许一个或多个线程等待某个事件的发生并在事件发生时被唤醒。threading类中的Event类主要包括以下几个方法
set()将内部标志设置为True并唤醒所有等待的线程。clear()将内部标志设置为False。wait(timeoutNone)如果内部标志为True则立即返回否则阻塞线程直到内部标志被设置为True或超时。is_set()返回内部标志的当前状态。
其用法举例如下
import threading
import timedef waiter(event):print(Waiter: Waiting for the event to be set...)event.wait() # 等待事件被设置为Trueprint(Waiter: Event has been set, proceeding...)def setter(event):time.sleep(3) # 模拟一些工作print(Setter: Setting the event)event.set() # 将事件设置为True# 创建事件对象
event threading.Event()# 创建并启动线程
waiter_thread threading.Thread(targetwaiter, args(event,))
setter_thread threading.Thread(targetsetter, args(event,))waiter_thread.start()
setter_thread.start()waiter_thread.join()
setter_thread.join()其执行结果如下
Waiter: Waiting for the event to be set...
Setter: Setting the event
Waiter: Event has been set, proceeding...2.3.2 条件变量 条件变量(Condition)允许一个线程等待特定条件的发生并在条件满足时通知其他线程。它通常与锁一起使用。threading类中的Condition类一般包括以下方法
wait()当前线程等待直到其他线程调用notify()或notify_all()方法唤醒它。notify()唤醒一个等待中的线程。notify_all()唤醒所有等待中的线程。
其具体用法举例如下
import threading
import time
import randomcondition threading.Condition()
queue []class Producer(threading.Thread):def run(self):global queuewhile True:item random.randint(1, 10)with condition:queue.append(item)print(fProduced {item})condition.notify() # 唤醒消费者线程time.sleep(random.random())class Consumer(threading.Thread):def run(self):global queuewhile True:with condition:while not queue:condition.wait() # 等待生产者线程的通知item queue.pop(0)print(fConsumed {item})time.sleep(random.random())# 创建并启动线程
producer Producer()
consumer Consumer()producer.start()
consumer.start()producer.join()
consumer.join()补充
补充1: 释放GIL的操作
目前Python中会导致线程释放GIL的操作主要包括以下几种
I/O操作例如文件读写、网络请求等。某些内置函数和标准库函数例如 time.sleep()、select.select()、socket 模块中的操作。使用C语言编写的扩展模块可以显式地释放和重新获取GIL。