app开发者平台在数字化时代的重要性与发展趋势解析
865
2022-11-28
经典进程同步和互斥问题
基本概念
进程同步:对多个进程在执行次序上进行协调,使并发执行的各进程间能按照一定的规则共享系统资源,以免进程无序争夺资源而导致系统混乱。进程互斥:某一时刻不允许多个进程同时访问临界资源,只能单个进程访问
一、生产者消费者问题
在此问题中,需要注意以下几点:
只有缓冲区没满时,生产者才能把产品放入缓冲区,否则必须等待只有缓冲区不空时,消费者才能从缓冲区取出产品,否则必须等待缓冲区属于临界资源,各进程必须互斥地进行访问
那么如何用P、V操作实现这些要求呢?我们以2个生产者、3个消费者,缓冲区大小为4举例
针对要求1、2:
设置同步信号量empty、full,初始值分别为4、0,表示刚开始时缓冲区中还没有数据生产者每次要消耗(P)一个空闲(empty)缓冲区,并生产(V)一个产品(full)消费者每次要消耗(P)一个产品(full),并释放(V)一个空闲(empty)缓冲区
针对要求3:
设置互斥信号量mutex,初始值为1,表示同一时刻只有一个进程能访问缓冲区
前驱图表示:
伪代码如下:
def producer(): while True: 生产一个产品 P(empty) # P操作,表示需要消耗一个非空缓冲区 P(mutex) # 为了互斥访问缓冲区 把产品放入缓冲区 # 用互斥锁实现此缓冲区的互斥访问 V(mutex) V(full) # 对信号量full加1,表示生产了一个数据def consumer(): while True: P(full) # P操作,表示消耗一个数据 P(mutex) 从缓冲区取走产品 V(mutex) V(empty) # V操作,表示释放一个空闲缓冲区
给出Python的实现代码
import threadingfrom multiprocessing import Semaphoreimport multiprocessingmutex = Semaphore(1) # 用于实现各个进程互斥访问缓冲区,任何时刻缓冲区empty = Semaphore(4) # 4个缓冲区full = Semaphore(0) # 表示当前缓冲区的资源数量in_index = 0out_index = 0buffer = [0, 0, 0, 0]n = len(buffer)def produce(id): global in_index data = 1 while True: empty.acquire() # P操作,表示需要消耗一个非空缓冲区 mutex.acquire() # 为了互斥访问缓冲区 buffer[in_index] = data print("%d号生产者进程放入数据%d,放在了%d位置" % (id, data, in_index)) in_index = (in_index + 1) % n data += 1 mutex.release() # 表示已经访问完缓冲区 full.release() # 对信号量full加1,表示使用了一个空闲缓冲区,生产了一个数据def consume(id): global out_index while True: full.acquire() # P操作,表示消耗一个数据 mutex.acquire() # 实现对缓冲区的互斥访问 data = buffer[out_index] buffer[out_index % n] = 0 # 取出数据 print("%d号消费者从%d位置取得数据%d" % (id, out_index, data)) out_index = (out_index + 1) % n mutex.release() empty.release() # V操作,表示释放一个空闲缓冲区def main(): consumer1 = threading.Thread(target=consume, args=(1,)) consumer2 = threading.Thread(target=consume, args=(2,)) consumer3 = threading.Thread(target=consume, args=(3,)) consumer1.start() consumer2.start() consumer3.start() producer1 = threading.Thread(target=produce, args=(1,)) producer2 = threading.Thread(target=produce, args=(2,)) producer1.start() producer2.start()if __name__ == '__main__': main()
注:写代码时需要注意,往缓冲区放入或是从缓冲区取走数据的索引都应该是全局的,若是每个进程(代码实现其实是线程)都有自己访问缓冲区的索引,一定会导致数据覆盖以及重复读取某个数据的情况
思考:能否改变实现同步和实现互斥的P操作
就像这样,把实现互斥的P操作放在实现同步的P操作之前
def producer(): while True: 生产一个产品 P(mutex) # 1 P(empty) # 2 把产品放入缓冲区 # 用互斥锁实现此缓冲区的互斥访问 V(mutex) V(full)def consumer(): while True: P(mutex) # 3 P(full) # 4 从缓冲区取走产品 V(mutex) V(empty)
如果执行顺序就像伪代码中的注释那样,且在某一时刻缓冲区满,即 empty = 0 此时生产者进程会顺利对互斥信号量进行减1操作,然后阻塞在标号为 2 的地方 此时生产者进程再执行,将会阻塞在标号为 3 的地方,即互斥信号量的地方 这就形成了 “死锁”
总结一下:实现互斥的P操作必须放在实现同步的P操作之后
小Tips:实际操作中在临界区的代码也应尽可能的少,否则会增大对临界区的上锁时间,即其他进程的等待时间,从而导致系统并发度降低
二、读者——写者问题
关于此问题需要注意的地方:
允许多个读进程同时访问共享文件。因为读操作不会改变共享文件的改变同一时刻只允许一个写进程访问共享文件。若多个写进程同时访问,则可能导致数据覆盖写进程进行写操作时,不允许读进程进行读操作。因为写进程改变共享文件,此时读进程读取的数据可能并不是自己之前想要读取的数据
伪代码如下:
read_mutex = Semaphore(1)write_mutex = Semaphore(1)read_count = 0 # 表示正在进行读操作的读进程数量def reader(): while True: P(read_mutex) # 上读锁,防止多个读进程同时访问下面的两段代码 if read_count == 0: P(write_mutex) # 第一个读进程上写锁,防止写进程与读进程同时访问文件 read_count += 1 V(read_mutex) 读进程读文件 P(read_mutex) read_count -= 1 if read_count == 0: V(write_mutex) # 最后一个读进程释放写锁,表示此时没有读操作,可以让写进程开始操作文件 V(read_mutex)def writer(): while True: P(write_mutex) 写进程写文件 V(write_mutex)
在上述伪代码中,需要注意的是下面的代码必须一气呵成
假如不用一对PV操作将这段代码设置为临界区,将可能会有两个读进程都进入if语句,其中一个对write_mutex进行P操作后,另一个读进程将会阻塞在对write_mutex的P操作,这将违背 条件1:允许多个读进程同时访问文件
而且,read_count的读取,赋值,写回操作也必须一气呵成,因为该值是读进程的计数,直接影响write_mutex的PV操作,也就影响到条件2、3
实现代码如下:
import threadingfrom multiprocessing import Semaphoreimport multiprocessingimport timeread_mutex = Semaphore(1)write_mutex = Semaphore(1)read_count = 0class Reader(threading.Thread): def __init__(self, id): super().__init__() self.id = id def run(self): while True: time.sleep(1) global read_count print("%d号读线程正在等待..." % self.id) read_mutex.acquire() # 读进程上读锁 if read_count == 0: write_mutex.acquire() # 第一个读进程上写锁,不允许写进程写文件 read_count += 1 # 访问的读进程 +1 read_mutex.release() # 释放读锁 # 由于这里的读锁完全释放,可以允许多个读进程访问 print("%d号读线程正在读文件..." % self.id) # 为了使得read_count不被多个读进程同时访问,上读锁 read_mutex.acquire() read_count -= 1 # 访问的读进程 -1 if read_count == 0: write_mutex.release() # 当没有读进程读文件时,释放写锁,使得写进程可以上锁 read_mutex.release() # 释放读锁 print("%d号读线程完成读取操作..." % self.id)class Writer(threading.Thread): def run(self): while True: time.sleep(1) global read_count print("---写线程正在等待...") write_mutex.acquire() print("---写线程正在写文件...") write_mutex.release() print("---写线程完成操作...")def main(): for i in range(5): read_thread = Reader(i) read_thread.start() write_thread = Writer() write_thread.start()if __name__ == '__main__': main()
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~