app开发者平台在数字化时代的重要性与发展趋势解析
824
2022-11-22
进阶Python之线程进程篇
线程与进程
线程是程序运行的基本执行单元,在系统里面至少会创建一个进程,而在一个进程里面也必须要创建一个线程。如果这样理解的话,进程包含线程,而一个进程中至少要有一个线程。
要注意的是进程之间是不可以共享内存的,比如我们电脑的系统,它不会共享的,但是线程之间是可以共享进程的CPU的,它们我们可以理解为是我们运行的程序,当我们运行了很多的程序,CPU不够,这个时候电脑就比较卡顿,那我们就可以很好地理解,这是一个线程之间共享进程的资源了。
Python中的线程处理
t1 = threading.Thread(target=你写的函数名,args=(传入变量(如果只有一个变量就必须在后加上逗号),),name=随便取一个线程名):把一个线程实例化给t1,这个线程负责执行target=你写的函数名t1.start():负责执行启动这个线程t1.join():必须要等待你的子线程执行完成后再执行主线程t1.setDeamon(True):当你的主线程执行完毕后,不管子线程有没有执行完成都退出主程序,注意不能和t1.join()一起使用。threading.current_thread().name:打印出线程名
_thread模块
这个模块是为了兼容Python3的一个合并模块,把之前的thread程序给命名为_thread。我们用的比较少,但是我们还是了解一下:
_thread.start__new__thread(function,age)function 是一个函数,我们自己封装好的age 是函数里面的参数,必须是元组类型
threading模块(常用)
上面的那个模块只是提供了比较低级的功能,我们常用的还是threading模块来完成一些高级的操作。
import threadingthreading.currentThread()#返回当前线程变量threading.enumerate()#返回一个正在运行的线程列表threading.activeCount()#返回正在运行的线程数量
除了上述的一些方法,我们还有一些其他的方法,比如run(),start(),jion(),isAlive(),getName(),setName()。
使用
def zhiyun(x,y): for i in range(x,y): print(str(i*i)+";")ta=threading.Thread(target=zhiyun,args=(1,6))tb=threading.Thread(target=zhiyun,args=(16,21))ta.start()tb.start()print(tb.getName())
class myThread(threading.Thread):#定义继承类threading.Thread的子类myThread def __init__(self,mynum): super().__init__()#处理父类与子类的关系 self.myunm=mynum def run(self): for i in range(self.myunm,self.myunm+5): print(str(i*i)+";")ma=myThread(1)mb=myThread(16)ma.start()mb.start()
线程等待
在Python程序中,我们需要等待某一个线程完成后才可以继续,这个时候我们用到join()实现线程等待
import timedef zhiyun(x,y,thr=None):#包含一个线程实例 if thr: thr.join() else: time.sleep(2) for i in range(x,y): print(str(i * i) + ";")tb=threading.Thread(target=zhiyun,args=(16,21))ta=threading.Thread(target=zhiyun,args=(1,6,tb))tb.start()ta.start()# 这里调用了join方法,等到tb执行完后才可以执行ta.
如果是多个线程对某一个数据进行修改的话,那么就会出现很多不可以预料的事情。所以我们要同步线程,在Python中我们使用Thread对象的属性lock和属性Rlock可以简单的实现线程同步的功能。
对于那些每次只允许一个线程操作的数据,我们可以将其操作放在acquire和release方法之间。多线程的优势就是,可以同时运行多个任务,但是我们在共享数据的时候,就可能存在数据不同步的问题。
比如有这样一个实例,一个列表里面的元素都是0,线程1从后面向前面把所有的数据都修改为1,而线程2负责打印,从前往后读取输出,那么现在就可能存在一个问题了,输出的时候有一半是0,一半是1,就造成了数据的不同步。
那么我们为了避免这种情况,我们就引入了“锁”的概念。锁有两种概念,分别是锁定和未锁定,当一个线程想要共享数据的时候,首先必须要获得锁定,如果已经有了另外一个线程提前获得了锁定,那么我们这个线程就只能等待了暂停,也就是“同步阻塞”等到之前那个线程访问完毕,释放锁之后,在让之前的运行。
比如我们之前的那个问题,我们就可以先让修改的那个线程获得锁定,让它执行完毕之后,释放锁定在让打印这个线程去执行,这样就实现了数据同步了。
import timeclass mt(threading.Thread):#继承类的子类 def run(self):#定义重载函数run global x lock.acquire()#在操作x变量之前锁定的资源,进行下面的那一个变量叠加,如果不去锁定资源,就会出现很多东西 for i in range(5):#遍历操作 x+=10 time.sleep(1)#休眠1秒 print(x) lock.release()#释放锁资源x=0lock=threading.RLock()#实例化可重入锁类def main(): thrs=[] for i in range(10): thrs.append(mt())#实例化线程类 for item in thrs: item.start()#启动线程if __name__=="__main__": main()
线程优先级队列模块queue
这个模块提供了同步的,线程安全的队列类
常用方法:
Queue.qsize() 返回队列的大小 Queue.empty() 如果队列为空,返回True,反之False Queue.full() 如果队列满了,返回True,反之False,Queue.full 与 maxsize 大小对应 Queue.get([block[, timeout]])获取队列,timeout等待时间 Queue.get_nowait() 相当于Queue.get(False),非阻塞方法 Queue.put(item) 写入队列,timeout等待时间 Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号。每个get()调用得到一个任务,接下来task_done()调用告诉队列该任务已经处理完毕。 Queue.join() 实际上意味着等到队列为空,再执行别的操作
生产者消费者模式
生产者消费者模式并不是GOF提出的众多模式之一,但它依然是开发同学编程过程中最常用的一种模式
解耦:缓冲区的存在可以让生产者和消费者降低互相之间的依赖性,一个模块儿代码变化,不会直接影响另一个模块儿
并发:由于缓冲区,生产者和消费者不是直接调用,而是两个独立的并发主体,生产者产生数据之后把它放入缓冲区,就继续生产数据,不依赖消费者的处理速度
这个里面有一些常见的数据结构,以及用法 queueq = queue.Queue(3) # 调用构造函数,初始化一个大小为3的队列print(q.empty()) # 判断队列是否为空,也就是队列中是否有数据# 入队,在队列尾增加数据, block参数,可以是True和False 意思是如果队列已经满了则阻塞在这里,# timeout 参数 是指超时时间,如果被阻塞了那最多阻塞的时间,如果时间超过了则报错。q.put(13, block=True, timeout=5)print(q.full()) # 判断队列是否满了,这里我们队列初始化的大小为3print(q.qsize()) # 获取队列当前数据的个数# block参数的功能是 如果这个队列为空则阻塞,# timeout和上面一样,如果阻塞超过了这个时间就报错,如果想一只等待这就传递Noneprint(q.get(block=True, timeout=None))# queue模块还提供了两个二次封装了的函数,q.put_nowait(23) # 相当于q.put(23, block=False)q.get_nowait() # 相当于q.get(block=False)
task_done() 意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
join() 阻塞调用线程,直到队列中的所有任务被处理掉。
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
基本的FIFO队列
这个表示先进先出队列
import queueq=queue.Queue()#创建一个队列对象实例for i in range(8):#遍历操作 q.put(i)#调用队列对象的put方法在队尾插入一项while not q.empty():#如归队列不为空 print(q.get())#显示队列信息
import queueq=queue.LifoQueue()for i in range(10): q.put(i)while not q.empty(): print(q.get())
优先级队列
import queueimport randomq=queue.PriorityQueue()#级别越低越先出队列class Node: def __init__(self,x): self.x=x def __lt__(self, other):#内置函数 return other.x>self.x def __str__(self): return "{}".format(self.x)a=[Node(int(random.uniform(0,10))) for i in range(10)]#产生10个随机数for i in a: print(i,end=" ") q.put(i)print("****************")while not q.empty(): print(q.get(),end=" ")#按序转出列表中的数字
模板
# coding:utf-8 from concurrent.futures import ThreadPoolExecutor # 导入线程池模块 import threading # 导入线程模块,作用是获取当前线程的名称 import os,time def task(n): print('%s:%s is running' %(threading.currentThread().getName(),os.getpid())) # 打印当前线程名和运行的id号码 time.sleep(2) return n**2 # 返回传入参数的二次幂 if __name__ == '__main__': p=ThreadPoolExecutor() #实例化线程池,设置线程池的数量,不填则默认为cpu的个数*5 l=[] # 用来保存返回的数据,做计算总计 for i in range(10): obj=p.submit(task,i) # 这里的obj其实是futures的对象,使用obj.result()获取他的结果 # 传入的参数是要执行的函数和该函数接受的参数 # submit是非堵塞的 # 这里执行的方式是异步执行 # ----------------------------------- # # p.submit(task,i).result()即同步执行 # ----------------------------------- # 上面的方法使用range循环有个高级的写法,即map内置函数 # p = ThreadPoolExecutor() # obj=p.map(task,range(10)) # p.shutdown() # 这里的obj的值就是直接返回的所有计算结果,不属于futures对象 # ----------------------------------- l.append(obj) # 把返回的结果保存在空的列表中,做总计算 p.shutdown() # 所有计划运行完毕,关闭结束线程池 print('='*30) print([obj.result() for obj in l]) #上面方法也可写成下面的方法 # with ThreadPoolExecutor() as p: #类似打开文件,可省去.shutdown() # future_tasks = [p.submit(task, i) for i in range(10)] # print('=' * 30) # print([obj.result() for obj in future_tasks])
subprocess创建进程
模板
from concurrent.futures import ProcessPoolExecutorimport os,time,randomdef task(n): print('%s is running' %os.getpid()) time.sleep(2) return n**2if __name__ == '__main__': p=ProcessPoolExecutor() #不填则默认为cpu的个数 l=[] start=time.time() for i in range(10): obj=p.submit(task,i) #submit()方法返回的是一个future实例,要得到结果需要用obj.result() l.append(obj) p.shutdown() #类似用from multiprocessing import Pool实现进程池中的close及join一起的作用 print('='*30) # print([obj for obj in l]) print([obj.result() for obj in l]) print(time.time()-start) #上面方法也可写成下面的方法 # start = time.time() # with ProcessPoolExecutor() as p: #类似打开文件,可省去.shutdown() # future_tasks = [p.submit(task, i) for i in range(10)] # print('=' * 30) # print([obj.result() for obj in future_tasks]) # print(time.time() - start)
对于进程的研究我这里就不过多的说明了,主要是线程的运用
每文一语
我们不能改变过去的失意,但是我们可以改变现在的自己!
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~