并发编程之Master-Worker模式(master worker模式)

网友投稿 866 2022-09-14

并发编程之Master-Worker模式(master worker模式)

并发编程之Master-Worker模式(master worker模式)

我们知道,单个线程计算是串行的,只有等上一个任务结束之后,才能执行下一个任务,所以执行效率是比较低的。

那么,如果用多线程执行任务,就可以在单位时间内执行更多的任务,而Master-Worker就是多线程并行计算的一种实现方式。

它的思想是,启动两个进程协同工作:Master和Worker进程。

Master负责任务的接收和分配,Worker负责具体的子任务执行。每个Worker执行完任务之后把结果返回给Master,最后由Master汇总结果。(其实也是一种分而治之的思想,和forkjoin计算框架有相似之处,参看:并行任务计算框架forkjoin)

Master-Worker工作示意图如下:

下面用Master-Worker实现计算1-100的平方和,思路如下:

定义一个Task类用于存储每个任务的数据

Master生产固定个数的Worker,把所有worker存放在workers变量(map)中,Master需要存储所有任务的队列workqueue(ConcurrentLinkedQueue)和所有子任务返回的结果集resultMap(ConcurrentHashMap)。

每个Worker执行自己的子任务,然后把结果存放在resultMap中。

Master汇总resultMap中的数据,然后返回给Client客户端。

为了扩展Worker的功能,用一个MyWorker继承Worker重写任务处理的具体方法。

Task类:

package com.thread.masterworker;

public class Task {

private int id;

private String name;

private int num;

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public int getNum() {

return num;

}

public void setNum(int num) {

this.num = num;

}

}

Master实现:

package com.thread.masterworker;

import java.util.HashMap;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {

//所有任务的队列

private ConcurrentLinkedQueue workerQueue = new ConcurrentLinkedQueue();

//所有worker

private HashMap workers = new HashMap();

//共享变量,worker返回的结果

private ConcurrentHashMap resultMap = new ConcurrentHashMap();

//构造方法,初始化所有worker

public Master(Worker worker,int workerCount){

worker.setWorkerQueue(this.workerQueue);

worker.setResultMap(this.resultMap);

for (int i = 0; i < workerCount; i++) {

Thread t = new Thread(worker);

this.workers.put("worker-"+i,t);

}

}

//任务的提交

public void submit(Task task){

this.workerQueue.add(task);

}

//执行任务

public int execute(){

for (Map.Entry entry : workers.entrySet()) {

entry.getValue().start();

}

//一直循环,直到结果返回

while (true){

if(isComplete()){

return getResult();

}

}

}

//判断是否所有线程都已经执行完毕

public boolean isComplete(){

for (Map.Entry entry : workers.entrySet()) {

//只要有任意一个线程没有结束,就返回false

if(entry.getValue().getState() != Thread.State.TERMINATED){

return false;

}

}

return true;

}

//处理结果集返回最终结果

public int getResult(){

int res = 0;

for (Map.Entry entry : resultMap.entrySet()) {

res += (Integer) entry.getValue();

}

return res;

}

}

父类Worker:

package com.thread.masterworker;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable {

private ConcurrentLinkedQueue workerQueue;

private ConcurrentHashMap resultMap;

public void setWorkerQueue(ConcurrentLinkedQueue workerQueue) {

this.workerQueue = workerQueue;

}

public void setResultMap(ConcurrentHashMap resultMap) {

this.resultMap = resultMap;

}

@Override

public void run() {

while(true){

//从任务队列中取出一个任务

Task task = workerQueue.poll();

if(task == null) break;

//处理具体的任务

Object res = doTask(task);

//把每次处理的结果放到结果集里面,此处直接把num值作为结果

resultMap.put(String.valueOf(task.getId()),res);

}

}

public Object doTask(Task task) {

return null;

}

}

子类MyWorker继承父类Worker,重写doTask方法实现具体的逻辑:

package com.thread.masterworker;

public class MyWorker extends Worker {

@Override

public Object doTask(Task task) {

//暂停0.5秒,模拟任务处理

try {

Thread.sleep(500);

} catch (InterruptedException e) {

e.printStackTrace();

}

//计算数字的平方

int num = task.getNum();

return num * num;

}

}

客户端Client:

package com.thread.masterworker;

import java.util.Random;

public class Client {

public static void main(String[] args) {

Master master = new Master(new MyWorker(), 10);

//提交n个任务到任务队列里

for (int i = 0; i < 100; i++) {

Task task = new Task();

task.setId(i);

task.setName("任务"+i);

task.setNum(i+1);

master.submit(task);

}

//执行任务

long start = System.currentTimeMillis();

int res = master.execute();

long time = System.currentTimeMillis() - start;

System.out.println("结果:"+res+",耗时:"+time);

}

}

以上,我们用10个线程去执行子任务,最终由Master做计算求和(1-100的平方和)。每个线程暂停500ms,计算数字的平方值。

总共100个任务,分10个线程并行计算,相当于每个线程均分10个任务,一个任务的时间大概为500ms,故10个任务为5000ms,再加上计算平方值的时间,故稍大于5000ms。结果如下,

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:保定小程序开发(微信小程序开发定位)
下一篇:软件定义的“可靠性”到底可不可靠?信服云的ECC机制了解一下
相关文章

 发表评论

暂时没有评论,来抢沙发吧~