服务器建设网站,政务网站建设 发言,网站发展趋势和前景,百度广告联盟网站Python并发编程#xff1a;从入门到实践 #x1f680;
1. 多线程编程基础 #x1f9f5;
多线程是实现并发的重要方式#xff0c;Python提供了threading模块来支持多线程编程。
1.1 基本线程操作
import threading
import time
from typing import List, Callableclass …Python并发编程从入门到实践
1. 多线程编程基础
多线程是实现并发的重要方式Python提供了threading模块来支持多线程编程。
1.1 基本线程操作
import threading
import time
from typing import List, Callableclass ThreadManager:线程管理器用于创建和管理线程def __init__(self):self.threads: List[threading.Thread] []def create_thread(self, target: Callable, args: tuple ()) - threading.Thread:创建新线程:param target: 线程执行的目标函数:param args: 传递给目标函数的参数:return: 创建的线程对象thread threading.Thread(targettarget, argsargs)self.threads.append(thread)return threaddef start_all(self) - None:启动所有线程for thread in self.threads:thread.start()def join_all(self) - None:等待所有线程完成for thread in self.threads:thread.join()def thread_demo():def worker(name: str, sleep_time: int) - None:线程工作函数print(f线程 {name} 开始工作)time.sleep(sleep_time)print(f线程 {name} 完成工作)# 创建线程管理器manager ThreadManager()# 创建多个线程for i in range(3):manager.create_thread(targetworker, args(fThread-{i}, i))# 启动并等待完成manager.start_all()manager.join_all()1.2 线程同步机制
from threading import Lock, RLock, Condition, Event, Semaphore
from typing import Anyclass ThreadSafe:线程安全的资源访问演示def __init__(self):self._lock Lock()self._rlock RLock()self._condition Condition()self._event Event()self._semaphore Semaphore(2)self._resource 0def lock_example(self) - None:使用Lock进行互斥访问with self._lock:self._resource 1print(f资源值: {self._resource})def rlock_example(self) - None:使用RLock进行可重入锁定with self._rlock:with self._rlock: # 可以重复获取锁self._resource 1def condition_example(self) - None:使用Condition进行条件同步with self._condition:# 等待条件满足self._condition.wait_for(lambda: self._resource 5)print(条件满足继续执行)def event_example(self) - None:使用Event进行事件同步self._event.wait() # 等待事件print(事件被触发)def semaphore_example(self) - None:使用Semaphore限制并发访问with self._semaphore:print(获得信号量)time.sleep(1)print(释放信号量)## 2. 多进程编程 多进程适合CPU密集型任务Python的multiprocessing模块提供了强大的多进程支持。python
from multiprocessing import Process, Pool, Queue, Manager
import osclass ProcessManager:进程管理器用于创建和管理进程staticmethoddef process_worker(name: str) - None:进程工作函数:param name: 进程名称print(f进程 {name} (PID: {os.getpid()}) 开始工作)time.sleep(2)print(f进程 {name} 完成工作)staticmethoddef pool_example() - None:进程池示例with Pool(processes4) as pool:# 使用进程池并行处理任务results pool.map(lambda x: x * x, range(10))print(f进程池计算结果: {results})staticmethoddef queue_example() - None:进程间通信示例def producer(queue: Queue) - None:生产者进程for i in range(5):queue.put(fitem-{i})time.sleep(1)def consumer(queue: Queue) - None:消费者进程while True:item queue.get()print(f消费: {item})# 创建队列和进程q Queue()p1 Process(targetproducer, args(q,))p2 Process(targetconsumer, args(q,))# 启动进程p1.start()p2.start()# 等待生产者完成p1.join()## 3. 异步IO编程 Python的asyncio模块提供了异步编程的支持特别适合IO密集型任务。python
import asyncio
from typing import List
import aiohttpclass AsyncIOManager:异步IO管理器staticmethodasync def async_task(name: str) - str:异步任务示例:param name: 任务名称:return: 任务结果print(f任务 {name} 开始)await asyncio.sleep(1) # 模拟IO操作print(f任务 {name} 完成)return fResult-{name}async def run_tasks(self) - List[str]:运行多个异步任务:return: 任务结果列表tasks [self.async_task(fTask-{i})for i in range(3)]results await asyncio.gather(*tasks)return results## 4. 实战案例并发下载器 结合上述所有概念实现一个高效的并发下载器。python
import asyncio
import aiohttp
import os
from typing import List, Dict
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
import threadingdataclass
class DownloadTask:下载任务数据类url: strfilename: strsize: int 0downloaded: int 0status: str pendingclass ConcurrentDownloader:并发下载器支持多线程、多进程和异步IOdef __init__(self, save_dir: str ./downloads):self.save_dir save_dirself.tasks: Dict[str, DownloadTask] {}self.lock threading.Lock()os.makedirs(save_dir, exist_okTrue)async def async_download(self, url: str, filename: str) - None:异步下载单个文件:param url: 下载URL:param filename: 保存的文件名task DownloadTask(urlurl, filenamefilename)self.tasks[url] tasktry:async with aiohttp.ClientSession() as session:async with session.get(url) as response:if response.status 200:task.size int(response.headers.get(content-length, 0))filepath os.path.join(self.save_dir, filename)with open(filepath, wb) as f:while True:chunk await response.content.read(8192)if not chunk:breakf.write(chunk)task.downloaded len(chunk)task.status completedelse:task.status failedexcept Exception as e:print(f下载错误 {url}: {str(e)})task.status failedasync def batch_download(self, urls: List[Dict[str, str]]) - None:批量下载文件:param urls: URL和文件名的字典列表tasks [self.async_download(item[url], item[filename])for item in urls]await asyncio.gather(*tasks)def get_progress(self) - Dict[str, Dict]:获取下载进度:return: 所有任务的进度信息with self.lock:return {url: {filename: task.filename,size: task.size,downloaded: task.downloaded,status: task.status,progress: (task.downloaded / task.size * 100) if task.size 0 else 0}for url, task in self.tasks.items()}# 使用示例
async def download_demo():下载器示例downloader ConcurrentDownloader()# 准备下载任务urls [{url: http://example.com/file1.zip, filename: file1.zip},{url: http://example.com/file2.pdf, filename: file2.pdf},]# 开始下载await downloader.batch_download(urls)# 获取进度progress downloader.get_progress()print(下载进度:, progress)if __name__ __main__:# 运行异步下载示例asyncio.run(download_demo())5. 最佳实践与注意事项 ⚠️ 线程使用建议 GIL限制了Python多线程在CPU密集型任务上的表现适合IO密集型任务注意线程安全和死锁问题 进程使用建议 适合CPU密集型任务注意进程间通信的开销合理使用进程池 异步IO使用建议 适合大量IO操作场景避免在协程中使用阻塞操作使用异步库而不是同步库 性能优化 根据任务特点选择合适的并发方式避免过度并发合理设置超时和重试机制
6. 拓展学习方向
深入研究Python的GIL机制探索分布式计算框架学习反应式编程研究并发设计模式了解更多异步框架如Trio
通过本文的学习你已经掌握了Python并发编程的核心概念和实践技巧。继续探索和实践你将能够构建更高效的并发应用✨ 如果你觉得这篇文章有帮助欢迎点赞转发也期待在评论区看到你的想法和建议
咱们下一期见