C# Barrier类

网友投稿 606 2022-08-31

C# Barrier类

C# Barrier类

对于同步,Barrier 类非常适用于其中工作有多个任务分支且以后又需要合并工作的情况。Barrier 类用于需要同步的参与者。激活一个任务时,就可以动态地添加其他参与者,例如,从父任务中创建子任务。参与者在继续之前,可以等待所有其他参与者完成其工作。

BarrierSample 有点复杂,但它展示了 Barrier 类型的功能。下面的应用程序使用一个包含 2 000 000 个随机字符串的集合。使用多个任务遍历该集合,并统计以 a、b、c 等开头的字符串个数。工作不仅分布在不同的任务之间,也放在一个任务中。毕竟所有的任务都迭代字符串的第一个集合,汇总结果,以后任务会继续处理下一个集合。

FillData() 方法创建一个集合,并用随机字符串填充它:

public static IEnumerable FillData(int size){ var r = new Random(); return Enumerable.Range(0, size).Select(x => GetString(r));}private static string GetString(Random r){ var sb = new StringBuilder(6); for (int i = o; i < 6; i++) { sb.Append((char)(r.Next(26) + 97)); } return sb.ToString();}

在 LogBarrierInformation 方法中定义一个辅助方法,来显示 Barrier 的信息:

private static void LogBarrierInformation(string info, Barrier barrier) { Console.WriteLine($"Task {Task.CurrentId): {info)."+ $"{barrier.ParticipantCount} current and " + $"{barrier.ParticipantsRemaining} remaining participants, " + $"phase {barrier.CurrentPhaseNumber}") ;}

CalculationInTask() 方法定义了任务执行的作业。通过参数,第 3 个参数引用 Barrier 实例。用于计算的数是数组 IList。最后一个参数是 int 锯齿数组,用于在任务执行过程中写出结果。

任务把处理放在一个循环中。每一次循环中,都处理 IList[] 的数组元素。每个循环完成后,任务通过调用 SignalAndWait 方法,发出做好了准备的信号,并等待,直到所有的其他任务也准备好处理为止。这个循环会继续执行,直到任务完全完成为止。接着,任务就会使用 RemoveParticipant() 方法从 Barrier 类中删除它自己:

private static void CalculationInTask(int jobNumber, int partitionSize, Barrier barrier, IList[] coll, int loops, int[][] results){ LogBarrierInformation("CalculationInTask started", barrier); for (int i = 0; i < loops; i++) { var data = new List(coll[i]); int start = jobNumber * partitionSize; int end = start + partitionSize; Console.WriteLine($"Task {Task.CurrentId) in loop {i}: partition " + $"from {start} to {end}"); for (int j = start; j < end; j++) { char c = data[j] [0]; results[i][c - 97]++; } Console.WriteLine($"Calculation completed from task {Task.CurrentId) " + $"in loop {i}. {results[i][0]} times a, {results[i][25]} times z"); LogBarrierInformation("sending signal and wait for all", barrier); barrier.SignalAndWait(); LogBarrierInformation("waiting completed", barrier); } barrier.RemoveParticipant(); LogBarrierInformation("finished task, removed participant", barrier);}

在 Main() 方法中创建一个 Barrier 实例。在构造函数中,可以指定参与者的数量。在该示例中,这个数量是 3(numberTasks + 1),因为该示例创建了两个任务,Main() 方法本身也是一个参与者。使用 Task.Run 创建两个任务,把遍历集合的任务分为两个部分。启动该任务后,使用 SignalAndWait() 方法,Main() 方法在完成时发出信号,并等待所有其他参与者或者发出完成的信号,或者从Barrier 类中删除它们。一旦所有的参与者都准备好,就提取任务的结果,并使用Zip() 扩展方法把它们合并起来。接着进行下一次迭代,等待任务的下一个结果:

static void Main(){ const int numberTasks = 2; const int partitionSize = 1000000; const int loops = 5; var taskResults = new Dictionary(); var data = new List[loops]; for (int i = o; i < loops; i++) { data[i] = new List(FillData(partitionSize * numberTasks); } var barrier = new Barrier(numberTasks + 1); LogBarrierInformation("initial participants in barrier", barrier); for (int i = 0; i < numberTasks; i++) { barrier.AddParticipant(); int jobNumber = i; taskResults.Add(i, new int[loops][]); for (int loop = 0; loop < loops; loop++) { taskResult[i, loop] = new int[26]; } Console.WriteLine("Main - starting task job {jobNumber}"); Task.Run(() => CalculationInTask(jobNumber, partitionSize, barrier, data, loops, taskResults[jobNumber])); } for (int loop = 0; loop < 5; loop++) { LogBarrierInformation("main task, start signaling and wait", barrier); barrier.SignalAndWait(); LogBarrierInformation("main task waiting completed", barrier); int[][] resultCollection1 = taskResults[0]; int[][] resultCollection2 = taskResults[1]; var resultCollection = resultCollection1[loop].Zip( resultCollection2[loop],(cl, c2) => cl + c2); char ch = 'a'; int sum = 0; foreach (var x in resultCollection) { Console.WriteLine($"{ch++}, count: {x}"); sum += x; } LogBarrierInformation($"main task finished loop {loop}, sum: {sum}", barrier); } Console.WriteLine("finished all iterations"); Console.ReadLine();}

运行应用程序,输出如下所示。在输出中可以看到,每个 AddParticipant 调用都会增加参与者的数量和剩下的参与者数量。只要一个参与者调用 SignalAndWait,剩下的参与者数就会递减。当剩下的参与者数量达到0时,所有参与者的等待就结束,开始下一个阶段:

Task : initial participants in barrier. 1 current and 1 remaining participants, phase 0.

Main - starting task job 0

Main - starting task job 1

Task : main task, starting signaling and wait. 3 current and

3 remaining participants, phase 0.

Task 4: CalculationInTask started. 3 current and 2 remaining participants, phase 0.

Task 5: CalculationInTask started. 3 current and 2 remaining participants, phase 0.

Task 4 in loop 0: partition from 0 to 1000000

Task 5 in loop 0: partition from 1000000 to 2000000

Calculation completed from task 4 in loop 0. 38272 times a, 38637 times z Task 4: sending signal and wait for all. 3 current and

2 remaining participants,  phase 0.

Calculation completed from task 5 in loop 0. 38486 times a, 38781 times z. Task 5: sending signal and wait for all. 3 current and

1 remaining participants,  phase 0.

Task 5: waiting completed. 3 current and 3 remaining participants, phase 1 Task 4:   waiting completed. 3 current and 3 remaining participants, phase 1

Task : main waiting completed. 3 current and 3 remaining participants, phase 1

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Go项目实战:用 Go 语言构建 SQL 解析器(go 项目实战)
下一篇:大数据ClickHouse(五):数据库引擎介绍与实例演示
相关文章

 发表评论

暂时没有评论,来抢沙发吧~