佛山外贸网站,北京设计企业网站,网站如何做sem推广,济南网站建设多少钱这里所谓的低级#xff0c;高级是指封装抽象的程度。低级指os.fork()高级是指 multiprocessing包一般根据业务需求#xff0c;一个主进程负责维护接收#xff0c; 不同的子进程处理不同的需求。根据各同需求组合多进程多线程多进程#xff0b;多线程协程也可基于uvloop事件…这里所谓的低级高级是指封装抽象的程度。低级指os.fork()高级是指 multiprocessing包一般根据业务需求一个主进程负责维护接收 不同的子进程处理不同的需求。根据各同需求组合多进程多线程多进程多线程协程也可基于uvloop事件启动方式低级版def main_process():r os.fork()if r 0 :print(sub_process_buiness)else:print(main process buiness)高级版def sub_process_write_data():passdef sub_process_read_date():passdef main_handler():from multiprocessing import Processw1Process(targetsub_process_write_data)w1.start()w1.join()r1Process(targetsub_process_read_data)r1.start()r1.join()if __name__ __main__:main_handler()以下部分转自作者晓可加油链接https://www.jianshu.com/p/414e89248285來源简书著作权归作者所有。商业转载请联系作者获得授权非商业转载请注明出处。Num01--进程的创建-forkPython的os模块封装了常见的系统调用其中就包括fork可以在Python程序中轻松创建子进程。import os# 注意fork函数只在Unix/Linux/Mac上运行windows不可以pid os.fork()if pid 0:print(我是子进程)else:print(我是父进程)以上代码加以说明如下程序执行到os.fork()时操作系统会创建一个新的进程(子进程)然后复制父进程的所有信息到子进程中。然后父进程和子进程都会从fork()函数中得到一个返回值在子进程中这个值一定是0而父进程中是子进程的 id号。在Unix/Linux操作系统中提供了一个fork()系统函数它非常特殊。普通的函数调用调用一次返回一次但是fork()调用一次返回两次因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程)然后分别在父进程和子进程内返回。子进程永远返回0而父进程返回子进程的ID。这样做的理由是一个父进程可以fork出很多子进程所以父进程要记下每个子进程的ID而子进程只需要调用getppid()就可以拿到父进程的ID。Num02--多进程修改全局变量codingutf-8import osimport timenum 0注意fork函数只在Unix/Linux/Mac上运行windows不可以pid os.fork()if pid 0:num1print(我是子进程---num%d%num)else:time.sleep(1)num1print(我是父进程---num%d%num)在多进程中个每个进程中所有数据(包括全局变量)都各拥有一份互不影响。Num03--多次fork问题Test01--fork两次产生四个进程codingutf-8import osimport time注意fork函数只在Unix/Linux/Mac上运行windows不可以pid os.fork()if pid 0:print(我是第一次fork中的子进程)else:print(我是第一次fork中的父进程)pid os.fork()if pid 0:print(我是第二次fork中的子进程)else:print(我是第二次fork中的父进程)time.sleep(1)Test02--fork两次产生三个进程! /usr/bin/env python3-- coding:utf-8 --import osimport timedef sing():print(--我是第一次fork的子进程--)time.sleep(1)def dance():ppid os.fork()if ppid 0:print(--我是第二次fork的父进程--)time.sleep(1)elif ppid 0:print(--我是第二次fork的子进程--)time.sleep(1)def main():pid os.fork()if pid 0:dance()elif pid 0:sing()if name main:main()Num04--进程的第一种创建方式-multiprocessingmultiprocessing模块提供了一个Process类来代表一个进程对象。codingutf-8from multiprocessing import Processimport os子进程要执行的代码def fun_proc(name):print(子进程运行中name %s ,pid%d % (name, os.getpid()))if namemain:print(父进程 %d % os.getpid())p Process(targetfun_proc, args(我是子进程,))print(子进程将要执行)p.start()p.join()print(子进程已结束)对以上代码加以说明1用Process类创建子进程时只需要传入一个执行函数和函数的参数(是一个元组)。2调用start()方式启动子进程。3join()方法可以等待子进程结束后再继续往下运行通常用于进程间的同步。Test01--Process的语法如下Process([group [, target [, name [, args [, kwargs]]]]])target表示这个进程实例所调用对象args表示调用对象的位置参数元组kwargs表示调用对象的关键字参数字典name为当前进程实例的别名group大多数情况下用不到表示在哪个组Process类常用方法is_alive()判断进程实例是否还在执行join([timeout])是否等待进程实例执行结束或等待多少秒start()启动进程实例(创建子进程)run()如果没有给定target参数对这个对象调用start()方法时就将执行对象中的run()方法terminate()不管任务是否完成立即终止Process类常用属性name当前进程实例别名默认为Process-NN为从1开始递增的整数pid当前进程实例的PID值Test02--创建一个进程对象#!/usr/bin/env python# -*- coding: utf-8 -*-# Date : 2017-04-25 16:36:47# Author : xiaokefrom multiprocessing import Processimport osfrom time import sleep# 子进程要执行的代码def fun_proc(name, age, **kwargs):for i in range(5):print(子进程运行中name %s,age%d ,pid%d... % (name, age,os.getpid()))print(kwargs)sleep(1)if __name____main__:print(父进程 %d % os.getpid())p Process(targetfun_proc, args(我是子进程,66), kwargs{得分:666})print(子进程将要执行)p.start()sleep(1)# p.terminate()# 提前结束子进程不管子进程的任务是否完成p.join()print(子进程已结束)# 结果如下# 父进程 7744# 子进程将要执行# 子进程运行中name 我是子进程,age66 ,pid8064...# {得分: 666}# 子进程运行中name 我是子进程,age66 ,pid8064...# {得分: 666}# 子进程运行中name 我是子进程,age66 ,pid8064...# {得分: 666}# 子进程运行中name 我是子进程,age66 ,pid8064...# {得分: 666}# 子进程运行中name 我是子进程,age66 ,pid8064...# {得分: 666}# 子进程已结束Test03--创建两个进程对象#!/usr/bin/env python# -*- coding: utf-8 -*-# Date : 2017-04-25 16:36:47# Author : xiaoke#codingutf-8from multiprocessing import Processimport timeimport osdef worker_1(interval):print(worker_1,父进程(%s),当前进程(%s)%(os.getppid(),os.getpid()))t_start time.time()time.sleep(interval) #程序将会被挂起interval秒t_end time.time()print(worker_1,执行时间为%0.2f秒%(t_end - t_start))def worker_2(interval):print(worker_2,父进程(%s),当前进程(%s)%(os.getppid(),os.getpid()))t_start time.time()time.sleep(interval)t_end time.time()print(worker_2,执行时间为%0.2f秒%(t_end - t_start))def main():#输出当前程序的IDprint(进程ID%s%os.getpid())#创建两个进程对象target指向这个进程对象要执行的对象名称#如果不指定name参数默认的进程对象名称为Process-NN为一个递增的整数p1Process(targetworker_1,args(2,))p2Process(targetworker_2,namexiaoke,args(1,))#使用进程对象名称.start()来创建并执行一个子进程#这两个进程对象在start后就会分别去执行worker_1和worker_2方法中的内容p1.start()p2.start()#同时父进程仍然往下执行如果p2进程还在执行将会返回Trueprint(p2.is_alive%s%p2.is_alive())print(p1.is_alive%s%p1.is_alive())#输出p1和p2进程的别名和pidprint(--p1进程的别名和pid--)print(p1.name%s%p1.name)print(p1.pid%s%p1.pid)print(--p2进程的别名和pid--)print(p2.name%s%p2.name)print(p2.pid%s%p2.pid)#join括号中不携带参数表示父进程在这个位置要等待p1进程执行完成后再继续执行下面的语句一般用于进程间的数据同步# 如果不写这一句下面的is_alive判断将会是True#可以尝试着将下面的这条语句改成p1.join(1)#因为p2需要2秒以上才可能执行完成父进程等待1秒很可能不能让p1完全执行完成#所以下面的print会输出True即p1仍然在执行print(--p1进程是否执行完毕--)p1.join()print(p1.is_alive%s%p1.is_alive())p2.join()print(p2.is_alive%s%p2.is_alive())if __name__ __main__:main()# 结果如下# 进程ID4004# p2.is_aliveTrue# p1.is_aliveTrue# --p1进程的别名和pid--# p1.nameProcess-1# p1.pid3352# --p2进程的别名和pid--# p2.namexiaoke# p2.pid6092# --p1进程是否执行完毕--# worker_2,父进程(4004),当前进程(6092)# worker_2,执行时间为1.00秒# worker_1,父进程(4004),当前进程(3352)# worker_1,执行时间为2.00秒# p1.is_aliveFalse# p2.is_aliveFalseNum05--进程的第二种创建方式--自己创建一个类继承Process类定义创建新的进程还可以使用类的方式。可以自定义一个类继承Process类。每次实例化这个类的时候就等同于实例化这个进程对象。#!/usr/bin/env python# -*- coding: utf-8 -*-# Date : 2017-04-25 16:36:47# Author : xiaokefrom multiprocessing import Processimport timeimport os#继承Process类class Process_Class(Process):#因为Process类本身也有__init__方法这个子类相当于重写了这个方法#但这样就会带来一个问题我们并没有完全的初始化一个Process类# 所以就不能使用从这个类继承的一些方法和属性#最好的方法就是将继承类本身传递给Process.__init__方法完成这些初始化操作def __init__(self,interval):Process.__init__(self)# 传递进来的属性self.interval interval#重写了Process类的run()方法def run(self):print(子进程(%s) 开始执行父进程为(%s)%(os.getpid(),os.getppid()))t_start time.time()time.sleep(self.interval)t_stop time.time()print(子进程(%s)执行结束耗时%0.2f秒%(os.getpid(),t_stop-t_start))if __name____main__:t_start time.time()print(当前程序进程(%s)%os.getpid())p1 Process_Class(2)#对一个不包含target属性的Process类执行start()方法就会运行这个类中的run()方法所以这里会执行p1.run()p1.start()p1.join()t_stop time.time()print(父进程(%s)执行结束耗时%0.2f秒%(os.getpid(),t_stop-t_start))# 结果为# 当前程序进程(14736)# 子进程(4292) 开始执行父进程为(14736)# 子进程(4292)执行结束耗时2.00秒# 父进程(14736)执行结束耗时2.11秒Num06--进程池--Pool当需要创建的子进程数量不多时可以直接利用multiprocessing中的Process动态生成多个进程。但如果是上百甚至上千个目标手动的去创建进程的工作量巨大此时就可以用到multiprocessing模块提供的Pool方法。初始化Pool时可以指定一个最大进程数当有新的请求提交到Pool中时如果池还没有满那么就会创建一个新的进程用来执行该请求但如果池中的进程数已经达到指定的最大值那么该请求就会等待直到池中有进程结束才会创建新的进程来执行。#采用非阻塞的方式#!/usr/bin/env python# -*- coding: utf-8 -*-# Time : 2017/4/27 8:35# Author : xiaoke# Site :# File : Test33.py# Software: PyCharm Community Editionfrom multiprocessing import Poolimport os, time, randomdef worker(num):t_start time.time()print(子进程-%s开始执行,进程号为%d % (num, os.getpid()))# random.random()随机生成0~1之间的浮点数time.sleep(random.random() * 2)t_stop time.time()print(子进程-%s执行完毕耗时%0.2f % (num, t_stop - t_start))def main():# 定义一个进程池最大进程数5p Pool(3)for i in range(10):# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))# 异步的方式每次循环将会用空闲出来的子进程去调用目标p.apply_async(worker, (i,))m_start time.time()print(----start----)p.close() # 关闭进程池关闭后p不再接收新的请求p.join() # 等待p中所有子进程执行完成必须放在close语句之后print(-----end-----)m_stop time.time()print(父进程执行完毕耗时%0.2f % (m_stop - m_start))if __name__ __main__:main()# 结果如下# ----start----# 子进程-0开始执行,进程号为3360# 子进程-1开始执行,进程号为16084# 子进程-2开始执行,进程号为12580# 子进程-1执行完毕耗时0.05# 子进程-3开始执行,进程号为16084# 子进程-3执行完毕耗时0.80# 子进程-4开始执行,进程号为16084# 子进程-2执行完毕耗时1.63# 子进程-5开始执行,进程号为12580# 子进程-0执行完毕耗时1.80# 子进程-6开始执行,进程号为3360# 子进程-4执行完毕耗时1.55# 子进程-7开始执行,进程号为16084# 子进程-7执行完毕耗时0.07# 子进程-8开始执行,进程号为16084# 子进程-8执行完毕耗时0.05# 子进程-9开始执行,进程号为16084# 子进程-5执行完毕耗时1.01# 子进程-6执行完毕耗时1.10# 子进程-9执行完毕耗时1.57# -----end-----# 父进程执行完毕耗时4.26multiprocessing.Pool常用函数解析apply_async(func[, args[, kwds]]) 使用非阻塞方式调用func(并行执行堵塞方式必须等待上一个进程退出才能执行下一个进程)args为传递给func的参数列表kwds为传递给func的关键字参数列表apply(func[, args[, kwds]])使用阻塞方式调用funcclose()关闭Pool使其不再接受新的任务terminate()不管任务是否完成立即终止join()主进程阻塞等待子进程的退出 必须在close或terminate之后使用采用apply阻塞的方式#!/usr/bin/env python# -*- coding: utf-8 -*-# Time : 2017/4/27 8:35# Author : xiaoke# Site :# File : Test33.py# Software: PyCharm Community Editionfrom multiprocessing import Poolimport os, time, randomdef worker(num):t_start time.time()print(子进程-%s开始执行,进程号为%d % (num, os.getpid()))# random.random()随机生成0~1之间的浮点数time.sleep(random.random() * 2)t_stop time.time()print(子进程-%s执行完毕耗时%0.2f % (num, t_stop - t_start))def main():# 定义一个进程池最大进程数5p Pool(3)for i in range(10):# Pool.apply_async(要调用的目标,(传递给目标的参数元祖,))# 异步的方式每次循环将会用空闲出来的子进程去调用目标p.apply(worker, (i,))m_start time.time()print(----start----)p.close() # 关闭进程池关闭后p不再接收新的请求p.join() # 等待p中所有子进程执行完成必须放在close语句之后print(-----end-----)m_stop time.time()print(父进程执行完毕耗时%0.2f % (m_stop - m_start))if __name__ __main__:main()# 结果如下# 子进程-0开始执行,进程号为4464# 子进程-0执行完毕耗时1.75# 子进程-1开始执行,进程号为11640# 子进程-1执行完毕耗时1.33# 子进程-2开始执行,进程号为8756# 子进程-2执行完毕耗时1.86# 子进程-3开始执行,进程号为4464# 子进程-3执行完毕耗时0.70# 子进程-4开始执行,进程号为11640# 子进程-4执行完毕耗时1.29# 子进程-5开始执行,进程号为8756# 子进程-5执行完毕耗时0.69# 子进程-6开始执行,进程号为4464# 子进程-6执行完毕耗时0.33# 子进程-7开始执行,进程号为11640# 子进程-7执行完毕耗时1.83# 子进程-8开始执行,进程号为8756# 子进程-8执行完毕耗时1.58# 子进程-9开始执行,进程号为4464# 子进程-9执行完毕耗时1.37# ----start----# -----end-----# 父进程执行完毕耗时0.08Num07--进程间的通信--Queue进程(Process)之间有时间需要通信操作系统提供了很多机制来实现进程间的通信。如Queue、Pipes等。Queue本身是一个消息队列。Test01-- 先看一个简单的案例#!/usr/bin/env python# -*- coding: utf-8 -*-# Time : 2017/4/27 8:35# Author : xiaoke# Site :# File : Test33.py# Software: PyCharm Community Editionfrom multiprocessing import Queueq Queue(3) # 初始化一个Queue对象最多可接收三条put消息q.put(消息1)q.put(消息2)print(q.full()) # Falseq.put(消息3)print(q.full()) # True# 因为消息列队已满下面的try都会抛出异常第一个try会等待3秒后再抛出异常第二个Try会立刻抛出异常try:q.put(消息4, True, 3)except:print(消息列队已满现有消息数量:%s % q.qsize())try:q.put_nowait(消息4)except:print(消息列队已满现有消息数量:%s % q.qsize())# 推荐的方式先判断消息列队是否已满再写入if not q.full():q.put_nowait(消息4)# 读取消息时先判断消息列队是否为空再读取if not q.empty():for i in range(q.qsize()):# print(取出消息%s % q.get())print(取出消息%s % q.get_nowait())# 结果是# False# True# 消息列队已满现有消息数量:3# 消息列队已满现有消息数量:3# 取出消息消息1# 取出消息消息2# 取出消息消息3以上代码加以说明初始化Queue()对象时(例如qQueue())若括号中没有指定最大可接收的消息数量或数量为负值那么就代表可接受的消息数量没有上限(直到内存的尽头)Queue.qsize()返回当前队列包含的消息数量Queue.empty()如果队列为空返回True反之False Queue.full()如果队列满了返回True,反之FalseQueue.get([block[, timeout]])获取队列中的一条消息然后将其从列队中移除block默认值为True1)如果block使用默认值且没有设置timeout(单位秒)消息列队如果为空此时程序将被阻塞(停在读取状态)直到从消息列队读到消息为止如果设置了timeout则会等待timeout秒若还没读取到任何消息则抛出Queue.Empty异常2)如果block值为False消息列队如果为空则会立刻抛出Queue.Empty异常Queue.get_nowait()相当Queue.get(False)Queue.put(item,[block[, timeout]])将item消息写入队列block默认值为True1)如果block使用默认值且没有设置timeout(单位秒)消息列队如果已经没有空间可写入此时程序将被阻塞(停在写入状态)直到从消息列队腾出空间为止如果设置了timeout则会等待timeout秒若还没空间则抛出Queue.Full异常2)如果block值为False消息列队如果没有空间可写入则会立刻抛出Queue.Full异常Queue.put_nowait(item)相当Queue.put(item, False)Test02--在父进程创建两个子进程一个往Queue里面写数据一个从Queue里面读数据。#!/usr/bin/env python# -*- coding: utf-8 -*-# Time : 2017/4/27 8:35# Author : xiaoke# Site :# File : Test33.py# Software: PyCharm Community Editionfrom multiprocessing import Process, Queueimport os, time, random# 写数据进程def write(q):for value in [A, B, C, quit]:print(Put %s to queue... % value)q.put(value)time.sleep(random.random())# 读数据进程def read(q):while True:if not q.empty():value q.get(True)print(Get %s from queue. % value)# 因为读进程是一个死循环所以要设置一个标记用于退出if value quit:breaktime.sleep(random.random())if __name__ __main__:# 父进程创建Queue并传给各个子进程q Queue()pw Process(targetwrite, args(q,))pr Process(targetread, args(q,))# 启动子进程pw写入:pw.start()# 等待pw结束:pw.join()# 启动子进程pr读取:pr.start()pr.join()print(所有数据都写入并且读完)print(pw is alive:%s % pw.is_alive())print(pr is alive:%s % pr.is_alive())# 结果如下# Put A to queue...# Put B to queue...# Put C to queue...# Put quit to queue...# Get A from queue.# Get B from queue.# Get C from queue.# Get quit from queue.# 所有数据都写入并且读完# pw is alive:False# pr is alive:FalseTest03--进程池Pool中的Queue来进行进程间的通信#!/usr/bin/env python# -*- coding: utf-8 -*-# Author : xiaoke# Site :# File : Test33.py# Software: PyCharm Community Editionfrom multiprocessing import Pool, Queue, Managerimport osimport timeimport randomdef task_write(q):for s in (hello, python, world, quit):q.put(s) # 向队列中添加消息print(%s 进程向队列中添加消息:%s % (os.getpid(), s))time.sleep(random.random() * 2)print(%s 进程要结束了 % os.getpid())def task_read(q):while True:msg q.get() # 阻塞式从队列中收消息print(%s 进程从队列中取出消息:%s % (os.getpid(), msg))if msg quit:breaktime.sleep(random.random() * 2)print(%s 进程要结束了 % os.getpid())def main():# 1.创建消息队列对象# q Queue() #只能用于父子进程# Manger().Queue() 消息队列可用于进程池q Manager().Queue()# 2.创建进程池里面放两个进程my_pool Pool(2)# 3.添加任务# 采用阻塞的方式my_pool.apply(task_write, args(q,))my_pool.apply(task_read, args(q,))# 采用非阻塞的方式# my_pool.apply_async(task_write, args(q,))# my_pool.apply_async(task_read, args(q,))# 4.关闭进程池my_pool.close()# 5.等待所有进程结束my_pool.join()if __name__ __main__:main()# 采用非阻塞的方式结果# 7380 进程向队列中添加消息:hello# 11256 进程从队列中取出消息:hello# 7380 进程向队列中添加消息:python# 11256 进程从队列中取出消息:python# 7380 进程向队列中添加消息:world# 11256 进程从队列中取出消息:world# 7380 进程向队列中添加消息:quit# 11256 进程从队列中取出消息:quit# 11256 进程要结束了# 7380 进程要结束了# 采用阻塞的方式结果# 96 进程向队列中添加消息:hello# 96 进程向队列中添加消息:python# 96 进程向队列中添加消息:world# 96 进程向队列中添加消息:quit# 96 进程要结束了# 12412 进程从队列中取出消息:hello# 12412 进程从队列中取出消息:python# 12412 进程从队列中取出消息:world# 12412 进程从队列中取出消息:quit# 12412 进程要结束了