网站开发首选,北京装饰公司十大排名榜,做交流网站有哪些,影视网站cpa 如何做1. Python子进程模块subprocess
subprocess 模块允许我们启动一个新进程#xff0c;并连接到它们的输入/输出/错误管道#xff0c;从而获取返回值。
#xff08;1#xff09;run 方法
首先我们来看看 run 方法的使用#xff0c;该方法的参数如下#xff1a;
args并连接到它们的输入/输出/错误管道从而获取返回值。
1run 方法
首先我们来看看 run 方法的使用该方法的参数如下
args表示要执行的命令。必须是一个字符串或字符串参数列表。stdin、stdout 和 stderr子进程的标准输入、输出和错误。其值可以是 subprocess.PIPE、subprocess.DEVNULL、一个已经存在的文件描述符、已经打开的文件对象或者 None。subprocess.PIPE 表示为子进程创建新的管道。subprocess.DEVNULL 表示使用 os.devnull。默认使用的是 None表示什么都不做。另外stderr 可以合并到 stdout 里一起输出。timeout设置命令超时时间。如果命令执行时间超时子进程将被杀死并抛出 TimeoutExpired 异常。check如果该参数设置为 True并且进程退出状态码不是 0则抛出 CalledProcessError 异常。encoding如果指定了该参数则 stdin、stdout 和 stderr 可以接收字符串数据并以该编码方式编码。否则只接收 bytes 类型的数据。shell如果该参数为 True将通过操作系统的 Shell 执行指定的命令。capture_output如果 capture_output True则将捕获 stdout 和 stderr调用时内部的 Popen 对象将自动使用 stdout PIPE 和 stderr PIPE 创建标准输出和标准错误对象传递 stdout 和 stderr 参数时不能同时传递 capture_output 参数。如果希望捕获并将两个 stream 合并为一个使用 stdout PIPE 和 stderr STDOUT。
下面我们来看一个例子run 方法调用方式返回 CompletedProcess 实例
import subprocessargs1 [python, src/Python子进程测试程序1.py]ret subprocess.run(argsargs1, capture_outputTrue, encodingutf-8) # 相当于在命令行执行python src/Python子进程测试程序1.py
print(ret) # CompletedProcess(args[python, src/Python子进程测试程序1.py], returncode0, stdout子进程输出: Hello World!\n, stderr)if ret.returncode 0:print(Success, stdout:, ret.stdout) # Success, stdout: 子进程输出: Hello World!
else:print(Error, stderr:, ret.stderr) # Error, stderr: python: cant open file D:\xxx\src\Python子进程测试程序1_Wrong.py: [Errno 2] No such file or directory其中Python子进程测试程序1.py 内容如下
print(子进程输出: Hello World!)2Popen 方法
Popen 是 subprocess 的核心子进程的创建和管理都靠它处理。
Popen 方法的参数如下
argsShell 命令可以是字符串或者序列类型如列表、元组。bufsize缓冲区大小。当创建标准流的管道对象时使用默认为 -1。0 表示不使用缓冲区1 表示行缓冲仅当 universal_newlines True 时可用也就是文本模式。正数表示缓冲区大小负数表示使用系统默认的缓冲区大小。stdin、stdout、stderr分别表示程序的标准输入、输出、错误句柄。preexec_fn只在 Unix 平台下有效用于指定一个可执行对象callable object它将在子进程运行之前被调用。shell如果该参数为 True将通过操作系统的 Shell 执行指定的命令。cwd用于设置子进程的当前目录。env用于指定子进程的环境变量。如果 env None子进程的环境变量将从父进程中继承。
该方法会创建一个 Popen 对象 Popen 对象有以下几种方法
poll()检查进程是否终止如果终止返回 returncode否则返回 None。wait(timeout)等待子进程终止。communicate(inputNone, timeoutNone)和子进程交互向子进程发送和读取数据。将 input 指定数据发送到 stdin从 stdout 和 stderr 读取数据直到到达文件末尾等待进程终止。所以返回值是一个元组(stdout_data, stderr_data)。如果 timeout 时间内子进程不结束则会抛出 TimeoutExpired 异常。其中需要注意的是捕获异常之后可以再次调用该函数因为子进程并没有被 KILL。因此如果超时结束程序的话需要现正确 KILL 子进程。send_signal(singnal)发送信号到子进程。terminate()停止子进程也就是发送 SIGTERM 信号到子进程。kill()杀死子进程发送 SIGKILL 信号到子进程。
Popen 方法的样例如下
args2 [python, src/Python子进程测试程序2.py]proc subprocess.Popen(argsargs2,stdinsubprocess.PIPE,stdoutsubprocess.PIPE,stderrsubprocess.PIPE,encodingutf-8)
print(proc) # Popen: returncode: None args: [python, src/Python子进程测试程序2.py]stdout, stderr proc.communicate(inputAsanoSaki)
print(stdout:, stdout) # stdout: 子进程输出: AsanoSaki
print(stderr:, stderr) # stderr: 空其中Python子进程测试程序2.py 内容如下
s input()
print(子进程输出: , s)现在我们来看一下 communicate 的用法我们将测试程序修改为运行时间超过一秒
args2 [python, src/Python子进程测试程序2.py]proc subprocess.Popen(argsargs2,stdinsubprocess.PIPE,stdoutsubprocess.PIPE,stderrsubprocess.PIPE,encodingutf-8)try:stdout, stderr proc.communicate(inputAsanoSaki, timeout1) # 设置1s超时时间
except subprocess.TimeoutExpired:print(子进程运行超时)proc.kill() # 需要KILL子进程stdout, stderr proc.communicate() # 捕获异常之后可以再次调用该函数print(stdout:, stdout) # stdout: 子进程输出: AsanoSakiprint(stderr:, stderr) # stderr: 空现在的 Python子进程测试程序2.py 内容如下
s input()
print(子进程输出: , s)for i in range(10**9):pass2. ThreadPoolExecutor线程池
concurrent.futures 模块是 Python3.2 中引入的新模块用于支持异步执行以及在多核 CPU 和网络 I/O 中进行高效的并发编程。线程池的基类是 concurrent.futures 模块中的 ExecutorExecutor 提供了两个子类即 ThreadPoolExecutor 和 ProcessPoolExecutor 简化了跨平台异步编程的实现。其中 ThreadPoolExecutor 用于创建线程池而 ProcessPoolExecutor 用于创建进程池。如果使用线程池/进程池来管理并发编程那么只要将相应的 Task 函数提交给线程池/进程池剩下的事情就由线程池/进程池来搞定。
首先让我们先来理解多进程和多线程两种并发编程的方式
多进程当通过多进程来实现并发编程时程序会将任务分配给多个进程这些进程可以在不同的 CPU 上同时运行。进程之间是独立的各自有自己的内存空间等可以实现真正的并行执行。不过进程之间的通信比较耗时需要使用 IPC进程间通信机制而且进程之间的切换比线程之间的切换耗时所以创建进程的代价较高。多线程当通过多线程来实现并发编程时程序会将任务分配给多个线程这些线程可以在同一个进程中的不同 CPU 核上同时运行。线程之间共享进程的内存空间因此开销比较小。但是需要注意在 Python 解释器中线程是无法实现真正的并行执行因为 Python 有 GIL全局解释器锁它确保同时只有一个线程运行 Python 代码。因此一个 Python 进程中的多个线程并不能并行执行在使用多线程编程时不能完全利用多核 CPU。
ThreadPoolExecutor 创建一个线程池任务可以提交到这个线程池中执行。ThreadPoolExecutor 比 ProcessPoolExecutor 更容易使用且没有像进程那样的开销。它可以让我们在一个 Python 解释器中进行跨线程异步编程因为它规避了 GIL。
Exectuor 提供了如下常用方法
submit(fn, *args, **kwargs)将 fn 函数提交给线程池。*args 代表传给 fn 函数的参数**kwargs 代表以关键字参数的形式为 fn 函数传入参数。map(func, *iterables, timeoutNone, chunksize1)该函数类似于全局函数 map(func, *iterables)只是该函数将会启动多个线程以异步方式立即对 iterables 执行 map 处理。shutdown(waitTrue)关闭线程池。
程序将 fn 函数 submit 给线程池后submit 方法会返回一个 Future 对象Future 类主要用于获取线程任务函数的返回值。由于线程任务会在新线程中以异步方式执行因此线程执行的函数相当于一个“将来完成”的任务所以 Python 使用 Future 来代表。
Future 对象提供了如下方法
cancel()取消该 Future 代表的线程任务。如果该任务正在执行不可取消则该方法返回 False否则程序会取消该任务并返回 True。cancelled()返回 Future 代表的线程任务是否被成功取消。running()如果该 Future 代表的线程任务正在执行、不可被取消该方法返回 True。done()如果该 Funture 代表的线程任务被成功取消或执行完成则该方法返回 True。result(timeoutNone)获取该 Future 代表的线程任务最后返回的结果。如果 Future 代表的线程任务还未完成该方法将会阻塞当前线程其中 timeout 参数指定最多阻塞多少秒。exception(timeoutNone)获取该 Future 代表的线程任务所引发的异常。如果该任务成功完成没有异常则该方法返回 None。add_done_callback(fn)为该 Future 代表的线程任务注册一个“回调函数”当该任务成功完成时程序会自动触发该 fn 函数。
在用完一个线程池后应该调用该线程池的 shutdown() 方法该方法将启动线程池的关闭序列。调用 shutdown() 方法后的线程池不再接收新任务但会将以前所有的已提交任务执行完成。当线程池中的所有任务都执行完成后该线程池中的所有线程都会死亡。
使用线程池来执行线程任务的步骤如下
调用 ThreadPoolExecutor 类的构造器创建一个线程池。定义一个普通函数作为线程任务。调用 ThreadPoolExecutor 对象的 submit() 方法来提交线程任务。当不想提交任何任务时调用 ThreadPoolExecutor 对象的 shutdown() 方法来关闭线程池。
下面我们来看一个例子
from concurrent.futures import ThreadPoolExecutordef thread(num):print(Threads:, num)def getResult(): # 有返回结果的函数return Get Result# 新建ThreadPoolExecutor对象并指定最大的线程数量
with ThreadPoolExecutor(max_workers3) as executor:# 提交多个任务到线程池中executor.submit(thread, 1) # Threads: 1executor.submit(thread, 2) # Threads: 2t executor.submit(getResult)print(t.result()) # Get Result# 或者按如下方式实现
threadPool ThreadPoolExecutor(max_workers3)
for i in range(3):threadPool.submit(thread, i)
threadPool.shutdown(waitTrue)
# Threads: 0
# Threads: 1
# Threads: 23. 系统信息获取模块Psutil
现在可能会有人在想那我们如何获取子进程/线程在运行时的时间开销或者内存占用等信息呢Python 有一个第三方模块 psutil专门用来获取操作系统以及硬件相关的信息比如CPU、磁盘、网络、内存等等。
首先我们需要安装 psutil直接通过 pip 命令安装即可
pip install psutil1查看 CPU 相关信息
import psutilprint(psutil.cpu_count()) # CPU的逻辑数量12print(psutil.cpu_count(logicalFalse)) # CPU的物理核心数量6print(psutil.cpu_times()) # CPU的用户/系统/空闲时间
# scputimes(user26860.4375, system10963.515624999884, idle676060.796875, interrupt740.875, dpc477.75)for _ in range(3):# interval表示每隔0.5s刷新一次percpu表示查看所有的CPU使用率print(psutil.cpu_percent(interval0.5, percpuTrue))
# [21.2, 0.0, 32.4, 0.0, 16.1, 0.0, 0.0, 0.0, 3.2, 0.0, 0.0, 0.0]
# [25.8, 0.0, 3.1, 0.0, 0.0, 0.0, 3.1, 3.1, 0.0, 0.0, 0.0, 18.8]
# [32.4, 0.0, 21.9, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 6.2, 0.0]print(psutil.cpu_stats()) # CPU的统计信息包括上下文切换、中断、软中断以及系统调用次数等
# scpustats(ctx_switches3399680458, interrupts1365489476, soft_interrupts0, syscalls2283205750)print(psutil.cpu_freq()) # CPU的频率
# scpufreq(current2592.0, min0.0, max2592.0)2查看内存及磁盘相关信息
print(psutil.virtual_memory()) # 内存使用情况分别为总内存、可用内存、内存占用率、已使用的内存大小、剩余的内存大小
# svmem(total17022177280, available7125008384, percent58.1, used9897168896, free7125008384)print(psutil.swap_memory()) # 交换内存信息专门用来临时存储数据
# sswap(total6030352384, used5409898496, free620453888, percent89.7, sin0, sout0)print(psutil.disk_partitions()) # 磁盘分区、磁盘使用率和磁盘IO信息
# [sdiskpart(deviceC:\\, mountpointC:\\, fstypeNTFS, optsrw,fixed, maxfile255, maxpath260),
# sdiskpart(deviceD:\\, mountpointD:\\, fstypeNTFS, optsrw,fixed, maxfile255, maxpath260),
# sdiskpart(deviceE:\\, mountpointE:\\, fstypeNTFS, optsrw,fixed, maxfile255, maxpath260)]print(psutil.disk_usage(C:\\)) # 某个磁盘使用情况
# sdiskusage(total160253673472, used101791543296, free58462130176, percent63.5)print(psutil.disk_io_counters()) # 磁盘IO统计信息分别为读次数、写次数、读的字节数、写的字节数、读时间、写时间
# sdiskio(read_count1833834, write_count1831471, read_bytes69098376704, write_bytes59881958400, read_time17952, write_time2323)3查看网络相关信息
print(psutil.net_io_counters()) # 网卡的网络IO统计信息
# snetio(bytes_sent629698806, bytes_recv1756588411, packets_sent1280472, packets_recv2023810, errin0, errout0, dropin0, dropout0)print(psutil.net_io_counters(pernicTrue)) # 列出所有网卡的信息
# {Ethernet: snetio(bytes_sent0, bytes_recv0, packets_sent0, packets_recv0, errin0, errout0, dropin0, dropout0),
# Local Area Connection* 3: snetio(bytes_sent0, bytes_recv0, packets_sent0, packets_recv0, errin0, errout0, dropin0, dropout0),
# Local Area Connection* 4: snetio(bytes_sent0, bytes_recv0, packets_sent0, packets_recv0, errin0, errout0, dropin0, dropout0),
# ......]print(psutil.net_if_addrs()) # 网络接口信息
# {Ethernet: [snicaddr(familyAddressFamily.AF_LINK: -1, address04-D4-C4-74-A4-F0, netmaskNone, broadcastNone, ptpNone), snicaddr(familyAddressFamily.AF_INET: 2, address169.254.216.112, netmask255.255.0.0, broadcastNone, ptpNone)],
# Local Area Connection* 3: [snicaddr(familyAddressFamily.AF_LINK: -1, address38-00-25-26-9C-70, netmaskNone, broadcastNone, ptpNone), snicaddr(familyAddressFamily.AF_INET: 2, address169.254.169.242, netmask255.255.0.0, broadcastNone, ptpNone), snicaddr(familyAddressFamily.AF_INET6: 23, addressfe80::5e4e:63b6:7416:787b, netmaskNone, broadcastNone, ptpNone)],
# ......]print(psutil.net_if_stats()) # 网卡的详细信息包括是否启动、通信类型、传输速度、mtu
# {Ethernet: snicstats(isupFalse, duplexNicDuplex.NIC_DUPLEX_FULL: 2, speed0, mtu1500),
# vEthernet (Default Switch): snicstats(isupTrue, duplexNicDuplex.NIC_DUPLEX_FULL: 2, speed4294, mtu1500),
# Loopback Pseudo-Interface 1: snicstats(isupTrue, duplexNicDuplex.NIC_DUPLEX_FULL: 2, speed1073, mtu1500),
# ......]print(psutil.net_connections()) # 当前机器的网络连接里面接受一个参数默认是inet当然我们也可以指定为其它比如tcp
# [sconn(fd-1, familyAddressFamily.AF_INET: 2, typeSocketKind.SOCK_DGRAM: 2, laddraddr(ip127.0.0.1, port1309), raddr(), statusNONE, pid7516),
# sconn(fd-1, familyAddressFamily.AF_INET: 2, typeSocketKind.SOCK_STREAM: 1, laddraddr(ip127.0.0.1, port9100), raddr(), statusLISTEN, pid6004),
# sconn(fd-1, familyAddressFamily.AF_INET6: 23, typeSocketKind.SOCK_STREAM: 1, laddraddr(ip::, port49667), raddr(), statusLISTEN, pid3768),
# ......]print(psutil.users()) # 当前登录的用户信息
# [suser(nameAsanoSaki, terminalNone, hostNone, started1694966965.3425539, pidNone)]import datetime
print(psutil.boot_time()) # 系统的启动时间1694912508.6818905
print(datetime.datetime.fromtimestamp(psutil.boot_time())) # 2023-09-17 09:01:48.6818904查看进程相关信息
print(psutil.pids()) # 当前存在的所有进程的PID
# [0, 4, 8, 140, 212, 584, 756, 1052, 1160, 1188, 1292, 1364, 1384, ...]print(psutil.pid_exists(0)) # 某个进程是否存在
# Trueprint(psutil.process_iter()) # 所有进程对象Process组成的迭代器
# generator object process_iter at 0x00000263C4D2AF20print(psutil.Process(pid10712)) # 根据PID获取一个进程对应的Process对象
# psutil.Process(pid10712, namepycharm64.exe, statusrunning, started08:56:02)p psutil.Process(pid10712) # 获取该Process对象print(p.name()) # 进程名称pycharm64.exeprint(p.exe()) # 进程的exe路径E:\PyCharm 2020.3.3\bin\pycharm64.exeprint(p.cwd()) # 进程的工作目录E:\PyCharm 2020.3.3\jbr\binprint(p.cmdline()) # 进程启动的命令行[E:\\PyCharm 2020.3.3\\bin\\pycharm64.exe]print(p.status()) # 进程状态runningprint(p.username()) # 进程用户名LAPTOP-23NEHV3U\AsanoSakiprint(p.create_time()) # 进程创建时间返回时间戳1694998562.3625667print(p.cpu_times()) # 进程使用的CPU时间
# pcputimes(user277.09375, system34.265625, children_user0.0, children_system0.0)print(p.memory_info()) # 进程所使用的内存
# pmem(rss1507303424, vms1790066688, num_page_faults1466884, peak_wset1536196608,
# wset1507303424, peak_paged_pool881616, paged_pool876144, peak_nonpaged_pool268096,
# nonpaged_pool154024, pagefile1790066688, peak_pagefile1798070272, private1790066688)print(p.num_threads()) # 进程内的线程数量即这个进程开启了多少个线程68现在我们使用 psutil 模块实现获取 ThreadPoolExecutor 线程任务运行的时间与内存占用信息
args2 [python, src/Python子进程测试程序2.py]proc subprocess.Popen(argsargs2,stdinsubprocess.PIPE,stdoutsubprocess.PIPE,stderrsubprocess.PIPE,encodingutf-8)def getProcessInfo(pid):p psutil.Process(pid) # 获取pid所代表的子进程start_time time.time()memory 0while(True):try:memory max(memory, p.memory_info().rss)except:breakruntime (time.time() - start_time) * 1000return runtime, memorythreadPool ThreadPoolExecutor()
task threadPool.submit(getProcessInfo, proc.pid)
stdout, stderr proc.communicate(inputAsanoSaki)
runtime, memory task.result()
print(runtime) # 510.7400417327881
print(memory) # 40865792threadPool.shutdown(waitTrue)
proc.kill()其中Python子进程测试程序2.py 内容如下
s input()
print(子进程输出: , s)