JUC(10)线程池

网友投稿 604 2022-10-31

JUC(10)线程池

JUC(10)线程池

文章目录

​​一、三大方法​​​​二、七大参数​​​​三、四种拒绝策略​​

​​3.1 new ThreadPoolExecutor.AbortPolicy()​​​​3.2 new ThreadPoolExecutor.CallerRunsPolicy()​​​​3.3 new ThreadPoolExecutor.DiscardPolicy()​​​​3.4 new ThreadPoolExecutor.DiscardOldestPolicy():​​

​​四、手动创建线程池​​​​五、小结和拓展(CPU密集型和IO密集型!)​​​​六、Fork/Join框架​​

​​6.1如何使用ForkJoin?​​

此部分可以参考

线程池:三大方法、7大参数、4种拒绝策略池化技术

程序的运行,本质:占用系统的资源!我们需要去优化资源的使用 ===> 池化技术线程池、JDBC的连接池、内存池、对象池 等等。。。。资源的创建、销毁十分消耗资源池化技术:事先准备好一些资源,如果有人要用,就来我这里拿,用完之后还给我,以此来提高效率。

线程池的好处:

1、降低资源的消耗;2、提高响应的速度;3、方便管理;

线程池的作用:线程复用、可以控制最大并发数、管理线程;

一、三大方法

ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程ExecutorService threadPool2 = Executors.newFixedThreadPool(5); //创建一个固定的线程池的大小ExecutorService threadPool3 = Executors.newCachedThreadPool(); //可伸缩的

package com.wlw.pool;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Demo01 { public static void main(String[] args) { //ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程 //ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定大小的线程池 ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的 try { for (int i = 0; i < 20; i++) { //使用线程池来创建线程 threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"ok"); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //线程池用完必须要关闭线程池 threadPool.shutdown(); } }}

二、七大参数

源码分析

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));}

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, //21亿 60L, TimeUnit.SECONDS, new SynchronousQueue());}

本质:三种方法都是开启的ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize, //核心线程池大小 int maximumPoolSize, //最大的线程池大小 long keepAliveTime, //超时了没有人调用就会释放 TimeUnit unit, //超时单位 BlockingQueue workQueue, //阻塞队列 ThreadFactory threadFactory, //线程工厂 创建线程的 一般不用动 RejectedExecutionHandler handler //拒绝策略 ) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}

阿里巴巴的Java操作手册中明确说明:对于Integer.MAX_VALUE初始值较大,所以一般情况我们要使用底层的ThreadPoolExecutor来创建线程池。

七个参数的说明

要说明的是当银行人满了的时候,就会执行拒绝策略,不再让顾客入内非核心(Core)线程,如果超过设定的时间还没有被使用,就会关闭

三、四种拒绝策略

这是七大参数中最后一个参数:拒绝策略的类型

3.1 new ThreadPoolExecutor.AbortPolicy()

该拒绝策略为:整个线程池满了,还有线程进来,不处理这个想进来的线程,并抛出异常而判断线程池满? 线程池的最大承载量为 :阻塞队列+最大的线程池大小 ,超过报java.util.concurrent.RejectedExecutionException

3.2 new ThreadPoolExecutor.CallerRunsPolicy()

该拒绝策略为:哪来的去哪里 ,让 main线程去进行处理多余的线程

3.3 new ThreadPoolExecutor.DiscardPolicy()

该拒绝策略为:队列满了,把多余的线程(任务)丢掉,不会抛出异常。

3.4 new ThreadPoolExecutor.DiscardOldestPolicy():

该拒绝策略为:队列满了,尝试去和最早的进程竞争,不会抛出异常

四、手动创建线程池

package com.wlw.pool;import java.util.concurrent.*;public class Demo01 { public static void main(String[] args) { //ExecutorService threadPool = Executors.newSingleThreadExecutor();//单个线程 //ExecutorService threadPool = Executors.newFixedThreadPool(5);//创建一个固定大小的线程池 //ExecutorService threadPool = Executors.newCachedThreadPool(); //可伸缩的 ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常 //该线程池最大承载量为: maximumPoolSize + BlockingQueue = 5+3 = 8 try { for (int i = 0; i < 8; i++) { //使用线程池来创建线程 threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"==>ok"); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //线程池用完必须要关闭线程池 threadPool.shutdown(); } }}

五、小结和拓展(CPU密集型和IO密集型!)

如何去设置线程池的最大大小( maximumPoolSize)如何去设置?利用CPU密集型和IO密集型!:

CPU密集型:电脑的核数是几核就选择几;这个大小就是maximunPoolSize的大小

Runtime.getRuntime().availableProcessors() 获取cpu的核数

I/O密集型:在程序中有15个大型任务,io十分占用资源;I/O密集型就是判断我们程序中十分耗I/O的线程数量,然后将maximunPoolSize设置成大约是最大I/O数的一倍到两倍之间。

package com.wlw.pool;import java.util.concurrent.*;public class Demo01 { public static void main(String[] args) { ExecutorService threadPool = new ThreadPoolExecutor( 2, Runtime.getRuntime().availableProcessors(), 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略为:银行满了,还有人进来,不处理这个人的,并抛出异常 //该线程池最大承载量为: maximumPoolSize + BlockingQueue = 5+3 = 8 try { for (int i = 0; i < 8; i++) { //使用线程池来创建线程 threadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()+"==>ok"); } }); } } catch (Exception e) { e.printStackTrace(); } finally { //线程池用完必须要关闭线程池 threadPool.shutdown(); } }}

六、Fork/Join框架

什么是ForkJoin? (分支合并)

ForkJoin 在JDK1.7,并行执行任务!提高效率~。在大数据量速率会更快!大数据中:MapReduce 核心思想->把大任务拆分为小任务!

ForkJoin 的特点:工作窃取!

6.1如何使用ForkJoin?

package com.wlw.forkjoin;import java.util.concurrent.RecursiveTask;public class ForkJoinDemo extends RecursiveTask { private Long start; private Long end; //临界值 private Long temp = 10000L; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } //计算方法 @Override protected Long compute() { if((end - start) < temp){ Long sum = 0L; for (Long i = start;i <= end ; i++){ sum = sum + i; } return sum; }else { //使用forkjoin 分而治之 计算 Long middle = (start+end)/2; //拆分任务,把线程任务压入线程队列 ForkJoinDemo forkJoinDemoTask1 = new ForkJoinDemo(start, middle); forkJoinDemoTask1.fork(); //拆分任务,把线程任务压入线程队列 ForkJoinDemo forkJoinDemoTask2 = new ForkJoinDemo(middle+1, end); forkJoinDemoTask2.fork(); //获取结果 Long sum = forkJoinDemoTask1.join() + forkJoinDemoTask2.join(); return sum; } }}

package com.wlw.forkjoin;import java.util.OptionalLong;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.stream.LongStream;public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { //test1(); //sum=500000000500000000,时间:8598 //test2(); //sum=500000000500000000,时间:4835 test3(); //sum=500000000500000000时间:512 } public static void test1(){ Long start = System.currentTimeMillis(); Long sum = 0L; for (Long i = 0L;i <= 10_0000_0000L; i++){ sum = sum + i; } Long end = System.currentTimeMillis(); System.out.println("sum="+sum+",时间:"+(end-start)); } public static void test2() throws ExecutionException, InterruptedException { Long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinDemo forkJoinDemoTask = new ForkJoinDemo(0L,10_0000_0000L); ForkJoinTask submit = forkJoinPool.submit(forkJoinDemoTask); Long aLong = submit.get(); Long end = System.currentTimeMillis(); System.out.println("sum="+aLong+",时间:"+(end-start)); } public static void test3(){ Long start = System.currentTimeMillis(); //使用Stream Long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0,(x,y)->{return x+y;}); Long end = System.currentTimeMillis(); System.out.println("sum="+sum+"时间:"+(end-start)); }}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:ResNeXt: Facebook发布用于图像分类和对象检测框架
下一篇:kitabu:一个框架使用Ruby来从Markdown创建电子书籍
相关文章

 发表评论

暂时没有评论,来抢沙发吧~