安宁网站建设与制作,网站管理后台密码忘记了,企石网站建设,php是做网站美工的吗1. 并发 vs 并行
线程是程序执行的最小单位#xff0c;一个进程可以由一个或多个线程组成#xff0c;各个线程之间也是交叉执行。 并发#xff0c;相当于单核CPU#xff0c;宏观同时执行#xff0c;微观高速切换 交替执行。多线程、高并发这些词语更多地出现在服务端程序…1. 并发 vs 并行
线程是程序执行的最小单位一个进程可以由一个或多个线程组成各个线程之间也是交叉执行。 并发相当于单核CPU宏观同时执行微观高速切换 交替执行。多线程、高并发这些词语更多地出现在服务端程序里。 并行相当于多核CPU微观同时执行更强调提升性能上限。多进程更多地与高性能计算、分布式计算联系在一起。 多进程同时运行多个独立的进程每个进程有自己独立的内存空间和执行上下文彼此之间相互独立。 多线程同一进程中同时运行多个线程线程是进程的一部分共享同一个进程的内存空间和执行上下文。
多进/线程 与 顺序串行执行的区别 多进程和多线程都可以实现并行和并发的效果但也可以减少任务间的等待时间提高效率但相较于顺序串行执行进程/线程的切换也会引起时间损耗。
多进程 与 多线程的区别
切换速度多个线程共享同一个进程的内存空间和执行上下文线程之间的切换比进程切换更快速开销相对较小。安全性由于进程之间相互独立因此多进程编程更安全而多线程编程需要更加仔细地管理线程之间的同步和互斥小心地处理共享数据防止出现竞态条件等并发问题。
目前电脑主流配置都是四核-八线程的而实际工作的任务数大都大于四个所以也是需要交替来并发执行具体任务的。
话不多说下面举一个例子(同时计算3个不同数字序列的欧拉数)分别用顺序串行执行、多线程调用、多进程调用计算3个task进行计时测速
import threading as th
import multiprocessing as mp
import time
from functools import wrapsdef timer(func): # 函数的通用计时器使用时在函数前面声明timer即可wraps(func)def inner_func():t time.time()rts func()print(ftimer: using {time.time() - t :.5f} s)return rtsreturn inner_funcdef euler_func(n: int) - int:res ni 2while i n // i:if n % i 0:res res // i * (i - 1)while (n % i 0): n n // ii 1if n 1:res res // n * (n - 1)return restask1 list(range(2, 50000, 3)) # 2, 5, ...
task2 list(range(3, 50000, 3)) # 3, 6, ...
task3 list(range(4, 50000, 3)) # 4, 7, ...def job(task: list):for t in task:euler_func(t)timer
def normal(): # 顺序串行执行job(task1)job(task2)job(task3)timer # timer 是上面写的修饰器
def mutlthread(): # 多线程调用th1 th.Thread(targetjob, args(task1, ))th2 th.Thread(targetjob, args(task2, ))th3 th.Thread(targetjob, args(task3, ))# start() 告诉线程/进程你可以开始干活了,程序主逻辑还得继续往下运行th1.start()th2.start()th3.start()# 到 join() 这里咱们是指让线程/进程阻塞住咱的主逻辑比如p1.join()是指p1不干完活我主逻辑不往下进行属于是「阻塞」th1.join()th2.join()th3.join()timer
def multcore(): # 多进程调用p1 mp.Process(targetjob, args(task1, ))p2 mp.Process(targetjob, args(task2, ))p3 mp.Process(targetjob, args(task3, ))# start() 告诉线程/进程你可以开始干活了,程序主逻辑还得继续往下运行p1.start()p2.start()p3.start()# 到 join() 这里咱们是指让线程/进程阻塞住咱的主逻辑比如p1.join()是指p1不干完活我主逻辑不往下进行属于是「阻塞」p1.join()p2.join()p3.join()if __name__ __main__:print(同步串行); normal()print(多线程并发); mutlthread()print(多进程并行); multcore()结果分析多线程并发的方式具有较好的性能表现。
多线程并发可以在同一时间内执行多个子任务利用了计算机多核心的优势可以更高效地完成任务。而同步串行的方式需要等待每个子任务完成后才能进行下一个任务造成了一定的时间延迟。多进程并行的方式虽然也能同时执行多个子任务但由于进程之间的切换和数据通信开销较大导致执行时间略长于多线程并发。
需要注意的是不同的任务和计算环境可能会导致不同的结果。
同步串行
timer: using 0.44006 s
多线程并发
timer: using 0.30993 s
多进程并行
timer: using 0.41000 s2. 多任务模式
实现多任务的方式主要有以下2种: 1、多进程模式 multiprocessing 2、多线程模式 threading
同时执行多个任务通常各个任务之间并不是没有关联的而是需要相互通信和协调有时任务1必须暂停等待任务2完成后才能继续执行有时任务3和任务4又不能同时执行所以多进程和多线程的程序的复杂度要远远高于我们前面写的单进程单线程的程序。
比如算法C/S架构部署时服务端拉rtmp视频流进行视频帧解析、算法推理、结果推流多个相互依赖的任务时可以使用多线程、多进程的方式。
2.1 多进程模式 multiprocessing
进程是操作系统中的一个执行实体每个进程都有自己的内存空间彼此互不影响。一般进程数默认是电脑CPU核数当你的电脑是四核的时候你的电脑进程默认就是4个。
在Python中我们借助多进程包multiprocessing来进行多进程任务处理方式 multiprocessing模块提供了一个Process类来代表一个进程对象
# Process参数解析
multiprocessing.Process(groupNone, targetNone, nameNone, args(), kwargs{}, *, daemonNone)
#group分组
#target表示调用对象即此进程调用的函数名
#name表示进程的别名
#args表示调用对象的位置参数元组即函数的参数
#kwargs表示调用对象的字典# Process类的常用方法
close() 关闭进程
is_alive() 进程是否在运行
join() 等待join语句之前的所有程序执行完毕以后再继续往下运行通常用于进程间的同步
start() 进程准备就绪等待CPU调度
run() strat()调用run方法如果实例化进程时没有传入target参数这star执行默认run()方法# Process类的常用属性
pid 进程ID
name 进程名字下面的例子演示了启动一个子进程(即单进程)并等待其结束一个子进程其实就和我们平常调用单一函数是一样的。
from multiprocessing import Process
import os# 子进程要执行的代码
def run_proc(name):print(Run child process %s (%s)... % (name, os.getpid()))if __name____main__:print(Parent process %s. % os.getpid())#用来获取 主进程 的进程IDp Process(targetrun_proc, args(test,))#实例化进程p调用run_proc函数传入参数对象argsprint(Child process will start.)p.start()#p进程准备就绪p.join()#调用p进程主进程等待p进程执行print(Child process end.)建立多个子进程(即多进程)其实就是多个函数随机同步运行,建立多进程有两种方法一种是直接实例化多个进程对象另一种是进程池Pool
直接利用multiprocessing.Process()来实例化多个子进程调用不同的函数即可。但当任务数需要调用的函数较多时就需要实例化多个进程对象并且写多行p.start()来就绪比较麻烦如下:
from multiprocessing import Process
import random,timedef do_task(task):print(我正在做{}.format(task))time.sleep(random.randint(1,3))def write_task(task):print(我正在写{}.format(task))time.sleep(random.randint(1,3))if __name__ __main__:p1 Process(targetdo_task,args(PPT,))p2 Process(targetwrite_task,args(Sql,))p1.start()p2.start()使用multiprocessing.Pool(processing_number)实例化一个包含processing_number个进程的进程池使用apply_async()方法来异步地提交任务给进程池进行处理。
multiprocessing.Pool Pool(processes_number: int)#process为进程数把上面的例子用进程池Pool表示以后的结果如下
import multiprocessing
import random
import time
from multiprocessing import Process, Queuedef do_task(q, task):print(我正在做{}.format(task))q.put(task) # 将消息入队尾time.sleep(random.randint(1, 3))def listen_task(q, xxx):if (not q.empty()):print(我收到了你做完了{}.format(q.get())) # 从队头取出消息else:print(queue empty)time.sleep(random.randint(1, 3))if __name__ __main__:q Queue() # 注意进程通信要用multiprocessing.Queuep1 Process(targetdo_task,args(q, [PPT,]))p2 Process(targetlisten_task,args(q, [Sql,]))p1.start()p2.start()p1.join()p2.join()print(All subprocesses done.)输出结果如下
Waiting for all subprocesses done...
我正在做PPT
我正在写Sql
All subprocesses done.进程通信由于进程不共享内存因此进程间通信IPC需要使用特定的机制如管道Pipe、队列Queue等。标准库的Queue只能实现线程间的通信其get,put方法是阻塞的Queue.Queue是进程内非阻塞队列multiprocess.Queue是跨进程通信队列。多进程前者是各自私有后者是各子进程共有。
from multiprocessing import Process, Queuedef worker(q):q.put(Hello from process) # 字符串消息入队q.put()if __name__ __main__:q Queue() # 实例化队列qprocess Process(targetworker, args(q,))process.start()process.join()print(q.get()) # 从队头中取出消息q.get()特别说明MultiProcessing.Process创建的进程是有共同父进程的而MultiProcess.Pool创建的进程则不是。
MultiProcessing.Process创建的进程能够使用MultiProcessing的Queue通信但是如果使用的是进程池创建的进程那么就得使用Manager类封装的数据结构了。
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空返回True,反之False
Queue.full() 如果队列满了返回True,反之False
Queue.get([block[, timeout]]) 获取队列timeout等待时间
Queue.get_nowait() 相当Queue.get(False)
非阻塞 Queue.put(item) 写入队列timeout等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)2.2 多线程模式 threading
多线程模式就是一次只启动一个进程但是在这个进程里启动多个线程这样多个线程就可以一起执行多个任务在Python中我们要启动多线程借助于threading模块中的Thread类构建时使用的参数和方法与Process基本一致。
# Thread参数解析
Thread(groupNone, targetNone, nameNone, args(), kwargs{})
#方法
isAlive()
get/setName(name) 获取/设置线程名
start()
join() 创建一个线程就是调用一个函数
import time, threadingdef do_chioce(task):print(我正在{}.format(task))time.sleep(random.randint(1,3))if __name__ __main__:t threading.Thread(targetdo_chioce,args(选PPT模板,)) # 实例化线程tt.start() # 调用t线程创建多个线程就是调用多个函数do_chioce()和do_content()函数都在各自的线程t1, t2中执行彼此互不干扰
import time, threadingdef do_chioce(task):print(我正在{}.format(task))time.sleep(random.randint(1,3))def do_content(task):print(我正在{}.format(task))time.sleep(random.randint(1,3))if __name__ __main__:t1 threading.Thread(targetdo_chioce,args(选PPT模板,))t2 threading.Thread(targetdo_content,args(列PPT大纲,))t1.start()t2.start()线程通信由于线程共享内存因此线程间的数据是可以互相访问的。但是当多个线程同时修改数据时就会出现问题。为了解决这个问题我们需要使用线程同步工具如锁Lock和条件Condition等。
import threadingclass BankAccount:def __init__(self):self.balance 100 # 共享数据self.lock threading.Lock()def deposit(self, amount):with self.lock: # 使用锁进行线程同步balance self.balancebalance amountself.balance balancedef withdraw(self, amount):with self.lock: # 使用锁进行线程同步balance self.balancebalance - amountself.balance balancedef deposit_money(account, amount):for _ in range(100000):account.deposit(amount)def withdraw_money(account, amount):for _ in range(100000):account.withdraw(amount)account BankAccount()# 创建两个线程一个存款一个取款
deposit_thread threading.Thread(targetdeposit_money, args(account, 10))
withdraw_thread threading.Thread(targetwithdraw_money, args(account, 5))# 启动线程
deposit_thread.start()
withdraw_thread.start()# 等待线程结束
deposit_thread.join()
withdraw_thread.join()print(f最终余额: {account.balance})在这个例子中我们创建了一个银行账户对象account初始余额为100。然后我们创建了两个线程一个用于存款一个用于取款。每个线程都会对账户进行一定次数的操作存款或取款。通过使用线程锁在进行存款或取款操作时我们保证了对balance变量的访问是同步的避免了数据竞争和不一致性。
特别说明Python的线程虽然受到全局解释器锁GIL的限制但是对于IO密集型任务如网络IO或者磁盘IO使用多线程可以显著提高程序的执行效率。
3. OpenCV 视频流的多线程方法
线程是进程中的一个执行单元。多线程是指通过在线程之间快速切换对 CPU 的控制称为上下文切换来并发执行多个线程。在我们的示例中我们将看到多线程通过提高 FPS每秒帧数实现更快的实时视频处理。 原因多线程有助于更快的处理
视频处理代码分为两部分从摄像头读取下一个可用帧并对帧进行视频处理例如运行深度学习模型进行人脸识别等。
读取下一帧并在没有多线程的程序中按顺序进行处理。程序等待下一帧可用然后再对其进行必要的处理。读取帧所需的时间主要与请求、等待和将下一个视频帧从相机传输到内存所需的时间有关。对视频帧进行计算所花费的时间无论是在 CPU 还是 GPU 上占据了视频处理所花费的大部分时间。
在具有多线程的程序中读取下一帧并处理它不需要是顺序的。当一个线程执行读取下一帧的任务时主线程可以使用 CPU 或 GPU 来处理最后读取的帧。这样通过重叠两个任务可以减少读取和处理帧的总时间。
没有多线程的代码
# importing required libraries
import cv2
import time# opening video capture stream
vcap cv2.VideoCapture(0)
if vcap.isOpened() is False :print([Exiting]: Error accessing webcam stream.)exit(0)
fps_input_stream int(vcap.get(5))
print(FPS of webcam hardware/input stream: {}.format(fps_input_stream))
grabbed, frame vcap.read() # reading single frame for initialization/ hardware warm-up# processing frames in input stream
num_frames_processed 0
start time.time()
while True :grabbed, frame vcap.read()if grabbed is False :print([Exiting] No more frames to read)break# adding a delay for simulating time taken for processing a frame delay 0.03 # delay value in seconds. so, delay1 is equivalent to 1 second time.sleep(delay) num_frames_processed 1cv2.imshow(frame , frame)key cv2.waitKey(1)if key ord(q):break
end time.time()# printing time elapsed and fps
elapsed end-start
fps num_frames_processed/elapsed
print(FPS: {} , Elapsed Time: {} , Frames Processed: {}.format(fps, elapsed, num_frames_processed))# releasing input stream , closing all windows
vcap.release()
cv2.destroyAllWindows()多线程代码
# importing required libraries
import cv2
import time
from threading import Thread # library for implementing multi-threaded processing# defining a helper class for implementing multi-threaded processing
class WebcamStream:def __init__(self, stream_id0):self.stream_id stream_id # default is 0 for primary camera# opening video capture streamself.vcap cv2.VideoCapture(self.stream_id)if self.vcap.isOpened() is False:print([Exiting]: Error accessing webcam stream.)exit(0)fps_input_stream int(self.vcap.get(5))print(FPS of webcam hardware/input stream: {}.format(fps_input_stream))# reading a single frame from vcap stream for initializingself.grabbed, self.frame self.vcap.read()if self.grabbed is False:print([Exiting] No more frames to read)exit(0)# self.stopped is set to False when frames are being read from self.vcap streamself.stopped True# reference to the thread for reading next available frame from input streamself.t Thread(targetself.update, args())self.t.daemon True # daemon threads keep running in the background while the program is executing# method for starting the thread for grabbing next available frame in input streamdef start(self):self.stopped Falseself.t.start()# method for reading next framedef update(self):while True:if self.stopped is True:breakself.grabbed, self.frame self.vcap.read()if self.grabbed is False:print([Exiting] No more frames to read)self.stopped Truebreakself.vcap.release()# method for returning latest read framedef read(self):return self.frame# method called to stop reading framesdef stop(self):self.stopped True# initializing and starting multi-threaded webcam capture input stream
webcam_stream WebcamStream(stream_id0) # stream_id 0 is for primary camera
webcam_stream.start()
frame []
# processing frames in input stream
num_frames_processed 0
start time.time()
while True:if webcam_stream.stopped is True:breakelse:frame webcam_stream.read()# adding a delay for simulating time taken for processing a framedelay 0.03 # delay value in seconds. so, delay1 is equivalent to 1 secondtime.sleep(delay)num_frames_processed 1 # count the number of framescv2.imshow(frame, frame)key cv2.waitKey(1)if key ord(q):breakend time.time()
webcam_stream.stop() # stop the webcam stream
# printing time elapsed and fps
elapsed end - start
fps num_frames_processed / elapsed
print(FPS: {} , Elapsed Time: {} , Frames Processed: {}.format(fps, elapsed, num_frames_processed))
# closing all windows
cv2.destroyAllWindows()同理对于Flask等架构实现rtsp/rtmp推流拉流时也可以使用多线程完成拉流线程(读取帧)、处理线程(跑算法)、推流线程(发送帧)