一 . 管道 (了解)
from multiprocessing import Process, Pipedef f1(conn): # 管道的recv 里面不用写数字 from_main_process = conn.recv() print('我是子程序') print(from_main_process)if __name__ == '__main__': # 创建一个管道,返回管道的两端conn1 和 conn2 但是只能在一边发消息,另一端接消息,自己这一段是不能接的 conn1, conn2 = Pipe() p1 = Process(target=f1,args=(conn2,)) p1.start() # 管道的发送里面也不用发字节 conn1.send('oh baby') print('我是爸爸')# 数据接收一次就没有了.也就是说,往管道里面传一次消息,即使有多个进程都来接收,但是只能有一个接收成功
二 . 事件(了解)
import timefrom multiprocessing import Process,Eventdef f1(e): time.sleep(2) n = 100 print('子进程计算结果为',n) # 将初识对象改为True e.set() # 查看现在的状态 print('现在的状态是->',e.is_set())if __name__ == '__main__': # 创建事件对象,初识状态是False e = Event() p = Process(target=f1,args=(e,)) p.start() print('主进程等待...') # e.clear() # clear 是将状态改为False # 这个对象的状态为False的时候,就在wait的地方等待 e.wait() print('结果已经写入文件了,可以拿到这值')
三 . 信号量(了解)
import timeimport randomfrom multiprocessing import Process,Semaphoredef f1(i, s): s.acquire() # 加锁 print('男嘉宾%s号到了' % i) time.sleep(random.randint(1,3)) s.release() # 解锁 每有一个解开就会有一个进去if __name__ == '__main__': s = Semaphore(3) # 计数器 一起能去3个进程 for i in range(10): p = Process(target=f1,args=(i, s)) p.start()
四. 进程池(重点)
进程的创建和销毁是很浪费时间的,影响代码执行效率. 所以说进程池比多进程同时执行的时候会省很多时间,因为进程池没有创建和销毁这一过程.
import timefrom multiprocessing import Process,Pooldef f1(n): passif __name__ == '__main__': #统计进程池执行100个任务的时间 s_time = time.time() # 里面这个参数是指定进程池中有多少个进程用的,4表示4个进程,如果不传参数,默认开启的进程数一般是cpu的个数 pool = Pool(4) pool.map(f1,range(100)) #参数数据必须是可迭代的,异步提交任务,自带close和join功能 e_time = time.time() dif_time = e_time - s_time #统计100个进程,来执行100个任务的执行时间 p_s_t = time.time() #多进程起始时间 p_list = [] for i in range(100): p = Process(target=f1,args=(i,)) p.start() # 要加入列表里面之后把所有的都加上join p_list.append(p) [pp.join() for pp in p_list] p_e_t = time.time() p_dif_t = p_e_t - p_s_t print('进程池的时间:',dif_time) print('多进程的执行时间:',p_dif_t) # 进程池的时间: 0.17912554740905762 # 多进程的执行时间: 4.200979232788086
同步方法
import timefrom multiprocessing import Process,Pooldef f1(n): time.sleep(1) return n*nif __name__ == '__main__': pool = Pool(4) for i in range(10): # 进程池的同步方法,将任务变成了串行 res = pool.apply(f1,args=(i,)) print(res)
异步方法
import timefrom multiprocessing import Process,Pooldef f1(n): time.sleep(2) return n*nif __name__ == '__main__': pool = Pool() res_list = [] for i in range(5): #异步给进程池提交任务 res = pool.apply_async(f1,args=(i,)) # print(res) # 得到的是pool对象res_list.append(res) print('等待所有任务执行完') # pool.close() #锁住进程池,意思就是不让其他的程序再往这个进程池里面提交任务了,工作中一般不会锁 # pool.join() #打印结果,如果异步提交之后的结果对象 for i in res_list: # get()方法就是有就拿,没有就等着 print(i.get()) # 拿到的是返回结果 0,1,4,9,16
五. 回调函数
from multiprocessing import Pool,Processdef f1(n): print('>>>>',n) return n*ndef call_back_func(n): print('回调函数中的结果:',n)if __name__ == '__main__': pool = Pool(4) # callback就是把f1 的返回值当参数传入函数 res = pool.apply_async(f1,args=(5,),callback=call_back_func) pool.close() pool.join()