微前端架构如何改变企业的开发模式与效率提升
583
2022-10-08
PythonNote009---线程和进程
线程和进程
通俗意义上的线程和进程的解释,
进程(Process):对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程,打开一个记事本就启动了一个记事本进程,打开两个记事本就启动了两个记事本进程,打开一个Word就启动了一个Word进程线程(Thread):有些进程还不止同时干一件事,比如Word,它可以同时进行打字、拼写检查、打印等事情。在一个进程内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”称为线程(Thread)
那如果我们想并行的执行些任务,咋整? 如果在windows下,执行这些下面多进程的代码会报错,可以在cmd中python test.py执行,mac或者linux下应该可以顺利执行~
多线程应用
对于一般的循环操作,我们希望多线程执行完毕之后,会有一个汇总的功能。R语言中的foreach函数提供了这样的功能。python的多线程好像没有提供类似功能,一种替代方案是把结果输出到文件中,再重新读取,有点笨,凑合用。
多线程的速度未必比循环串行要快,Python的多线程只能用到一个CPU核心,看下廖雪峰的解释:因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。 因此,只有当线程中会出现阻塞时,多线程才有意义,比如线程中有数据-,在等待数据返回时线程阻塞了,此时CPU就可以来处理其它线程的运算。有个Case:通过python连接hive去查询sql,获取数据时,由于主要的计算工作是hive在做,python只是获取数据的过程,因此多线程可以比串行的速度更快。 如果不存在线程阻塞,多线程的速度可能不如单线程,因为线程切换本身也需要花费时间。
import pandas as pdimport threadingimport timeimport random#创建线程threads = []date_list = [x.strftime('%Y%m%d') for x in list(pd.date_range('2019/2/1','2019/2/20', freq='D'))]def testFun(x): print("第%s.次线程执行~"%x) time.sleep(2*x) return 2**50 for i in range(10): t = threading.Thread(target=testFun,args=(i,)) threads.append(t) #执行线程if __name__=='__main__': for t in threads: t.start() for t in threads: t.join()print ("退出线程")
第0.次线程执行~第1.次线程执行~第2.次线程执行~第3.次线程执行~第4.次线程执行~第5.次线程执行~第6.次线程执行~第7.次线程执行~第8.次线程执行~第9.次线程执行~退出线程
上面的代码暂时能用,可以满足一些简单的需求,更复杂和高级的用法,后面遇到再补充。
多进程应用
启动子进程
from multiprocessing import Processimport os# 子进程要执行的代码# 输出子进程iddef run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) print("what is up")if __name__=='__main__': # 输出主进程id print('Parent process %s.' % os.getpid()) # 创建子进程实例 # 注意args,如果只有一个参数,也要写成(args1,)的形式 plist = [] p1 = Process(target=run_proc, args=('test1',)) p2 = Process(target=run_proc, args=('test2',)) plist.append(p1) plist.append(p2) print('Child process will start.') # 启动进程 for t in plist: t.start() for t in plist: t.join() print('Child process end.')
Parent process 28053.Child process will start.Run child process test1 (10070)...what is upRun child process test2 (10073)...what is upChild process end.
进程池
如果要启动大量的子进程,可以用进程池的方式批量创建子进程。
apply_async
对不返回具体结果的函数,比如输出结果到文件中,可以用apply_async 函数原型:apply_async(func[, args=()[, kwds={}[, callback=None]]]) 其作用是向进程池提交需要执行的函数及参数, 各个进程采用非阻塞(异步)的调用方式,即每个子进程只管运行自己的,不管其它进程是否已经完成。这是默认方式。
from multiprocessing import Pool, cpu_countimport osimport timedef testFun(index): time.sleep(5) print('子进程: {} - 任务{}'.format(os.getpid(), index))if __name__ == '__main__': print("CPU内核数:{}".format(cpu_count())) print('当前母进程: {}'.format(os.getpid())) start = time.time() p = Pool(cpu_count()-2) for i in list(range(10)): p.apply_async(testFun, args=(i,))# ret = p.map_async(testFun, list(range(10))) print('等待所有子进程完成。') p.close() p.join() end = time.time() print("总共用时{}秒".format((end - start)))
CPU内核数:32当前母进程: 28053等待所有子进程完成。子进程: 10127 - 任务1子进程: 10132 - 任务6子进程: 10129 - 任务3子进程: 10133 - 任务7子进程: 10130 - 任务4子进程: 10128 - 任务2子进程: 10126 - 任务0子进程: 10134 - 任务8子进程: 10131 - 任务5子进程: 10135 - 任务9总共用时5.2520129680633545秒
可以看到10个任务,开10个进程,一共用了5s。如果串行,则需要50s
map_async
对于return变量的函数,可以用map_async,也是非阻塞的。 函数原型:map_async(func, iterable[, chunksize[, callback]])
from multiprocessing import Pool, cpu_countimport pandas as pdimport osimport timedef testFun(index): time.sleep(4) print('子进程: {} - 任务{}'.format(os.getpid(), index)) start = time.time() time.sleep(random.random() * 3) end = time.time() print('任务 %s runs %0.2f seconds.' % (index, (end - start))) return(pd.DataFrame({"x1":[index]}))if __name__ == '__main__': print("CPU内核数:{}".format(cpu_count())) print('当前母进程: {}'.format(os.getpid())) start = time.time() # p = Pool(cpu_count()-2) p = Pool(4) res = p.map_async(testFun, list(range(5))) print('等待所有子进程完成。') # 关闭进程池(pool),使其不在接受新的任务 p.close() # 主进程阻塞等待子进程的退出, join方法要在close或terminate之后使用 p.join() end = time.time() print("总共用时{}秒".format((end - start)))
CPU内核数:32当前母进程: 28053等待所有子进程完成。子进程: 12104 - 任务3子进程: 12101 - 任务0子进程: 12102 - 任务1子进程: 12103 - 任务2任务 1 runs 0.28 seconds.任务 3 runs 0.42 seconds.任务 0 runs 0.84 seconds.任务 2 runs 2.03 seconds.子进程: 12102 - 任务4任务 4 runs 2.28 seconds.总共用时10.639443159103394秒
参考下廖雪峰的解释: 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。 任务 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的我们设置大小是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。
# 返回一个list df = res.get()
pd.concat(df)
x1 | |
0 | 0 |
0 | 1 |
0 | 2 |
0 | 3 |
0 | 4 |
多进程虽然是并行执行,但是返回结果时会维持加入进程的顺序~
函数封装
封装时要用到signal包,信号signal是python进程间通信多种机制中的其中一种机制。可以对操作系统进程的控制,当进程中发生某种原因而中断时,可以异步处理这个异常。具体原理一时半会地我也搞不懂,还是从实际使用出发,给出demo~
import signalfrom multiprocessing import Pool, cpu_countdef init_worker(): signal.signal(signal.SIGINT, signal.SIG_IGN)def multi_process_map_async(func, items): pool = Pool(processes=cpu_count() - 2, initializer=init_worker, maxtasksperchild=400) ret = pool.map_async(func, items) pool.close() pool.join() return ret.get() if ret.successful() else []def multi_process_apply_async(func, items): pool = Pool(processes=cpu_count() - 1, initializer=init_worker, maxtasksperchild=400) for item in items: pool.apply_async(func, (item,)) pool.close() pool.join()
总之,其他复杂的用法,等以后实际用到,再补充吧~
Ref
[1] 阮一峰blog[2] 廖雪峰Python教程[3] 官方文档[4] 于南京市江宁区九龙湖
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~