解决线程池中ThreadGroup的坑

网友投稿 1141 2022-12-02

解决线程池中ThreadGroup的坑

解决线程池中ThreadGroup的坑

目录线程池中ThreadGroup的坑ThreadGroup是否可行Executors内部类DefaultThreadFactoryThreadGroup的使用及手写线程池监听线程异常关闭如何拿到Thread线程中异常ThreadGroup线程池使用

线程池中ThreadGroup的坑

java中每一个线程都归属于某个线程组管理的一员,例如在主函数main()主工作流程中产生一个线程,则产生的线程属于main这个线程组管理的一员。简单地说,线程组(ThreadGroup)就是由线程组成的管理线程的类,这个类是java.lang.ThreadGroup类。

定义一个线程组,通过以下代码可以实现。

ThreadGroup group=new ThreadGroup(“groupName”);

Thread thread=new Thread(group,”the first thread of group”);

ThreadGroup类中的某些方法,可以对线程组中的线程产生作用。例如,setMaxPriority()方法可以设定线程组中的所有线程拥有最大的优先权。

所有线程都隶属于一个线程组。那可以是一个默认线程组(不指定group),亦可是一个创建线程时明确指定的组。在创建之初,线程被限制到一个组里,而且不能改变到一个不同的组。每个应用都至少有一个线程从属于系统线程组。若创建多个线程而不指定一个组,它们就会自动归属于系统线程组。

线程组也必须从属于其他线程组。必须在构建器里指定新线程组从属于哪个线程组。若在创建一个线程组的时候没有指定它的归属,则同样会自动成为系统线程组的一名属下。因此,一个应用程序中的所有线程组最终都会将系统线程组作为自己的“父”。

那么假如我们需要在线程池中实现一个带自定义ThreadGroup的线程分组,该怎么实现呢?

我们在给线程池(ThreadPoolExecutor)提交任务的时候可以通过execute(Runnable command)来将一个线程任务加入到该线程池,那么我们是否可以通过new一个指定了ThreadGroup的Thread实例来加入线程池来达到前面说到的目的呢?

ThreadGroup是否可行

通过new Thread(threadGroup,runnable)实现线程池中任务分组

public static void main(String[] args) {

ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool();

final ThreadGroup group = new ThreadGroup("Main_Test_Group");

for (int i = 0; i < 5; i++) {

Thread thread = new Threhttp://ad(group, new Runnable() {

@Override

public void run() {

int sleep = (int)(Math.random() * 10);

try {

Thread.sleep(1000 * 3);

System.out.println(Thread.currentThread().getName()+"执行完毕");

System.out.println("当前线程组中的运行线程数"+group.activeCount());

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}, group.getName()+" #"+i+"");

pool.execute(thread);

}

}

运行结果

pool-1-thread-3执行完毕

pool-1-thread-1执行完毕

当前线程组中的运行线程数0

pool-1-thread-2执行完毕

当前线程组中的运行线程数0

当前线程组中的运行线程数0

pool-1-thread-4执行完毕

pool-1-thread-5执行完毕

当前线程组中的运行线程数0

当前线程组中的运行线程数0

运行结果中可以看到group中的线程并没有因为线程池启动了这个线程任务而运行起来.因此通过线程组来对线程池中的线层任务分组不可行.

从java.util.concurrent.ThreadPoolExecutor源码中可以看到如下构造函数:

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

Executors.defaultThreadFactory(), defaultHandler);

}

如果我们在实例化ThreadPoolExecutor时不指定ThreadFactory,那么将以默认的ThreadFactory来创建Thread.

Executors内部类DefaultThreadFactory

下面的源码即是默认的Thread工厂

static class DefaultThreadFactory implements ThreadFactory {

private static final AtomicInteger poolNumber = new AtomicInteger(1);

private final ThreadGroup group;

private final AtomicInteger threadNumber = new AtomicInteger(1);

private final String namePrefix;

DefaultThreadFactory() {

SecurityManager s = System.getSecurityManager();

group = (s != null) ? s.getThreadGroup() :

Thread.currentThread().getThreadGroup();

namePrefix = "pool-" +

poolNumber.getAndIncrement() +

"-thread-";

}

public Thread newThread(Runnable r) {

Thread t = new Thread(group, r,

namePrefix + threadNumber.getAndIncrement(),

0);

if (t.isDaemon())

t.setDaemon(false);

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

从唯一的构造函数可以看到DefaultThreadFactory以SecurityManager 实例中的ThreadGroup来指定线程的group,如果SecurityManager 获取到的ThreadGroup为null才默认以当前线程的group来指定.public Thread newThread(Runnable r) 则以group来new 一个Thead.这样我们可以在实例化ThreadPoolExecutor对象的时候在其构造函数内传入自定义的ThreadFactory实例即可达到目的.

public class MyTheadFactory implements ThreadFactory {

private static final AtomicInteger poolNumber = new AtomicInteger(1);

private final AtomicInteger threadNumber = new AtomicInteger(1);

private final String namePrefix;

private ThreadGroup defaultGroup;

public MyTheadFactory() {

SecurityManager s = System.getSecurityManager();

defaultGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();

namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";

}

public MyTheadFactory(ThreadGroup group) {

this.defaultGroup = group;

namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";

}

public Thread newThread(Runnable r) {

Thread t = new Thread(defaultGroup, null, namePrefix + threadNumber.getAndIncrement(), 0);

if (t.isDaemon())

t.setDaemon(false);

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

ThreadGroup的使用及手写线程池

监听线程异常关闭

以下代码在window下不方便测试,需在linux 上 测试

// 以下线程如果强制关闭的话,是无法打印`线程被杀掉了`

// 模拟关闭 kill PID

public static void main(String[] args) {

Runtime.getRuntime().addShutdownHook(new Thread( () -> {

System.out.println("线程被杀掉了");

}));

while(true){

System.out.println("i am working ...");

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

如何拿到Thread线程中异常

public static void main(String[] args) {

Thread thread = new Thread(() -> {

try {

Thread.sleep(1000);

int i = 10/0;

} catch (InterruptedException e) {

e.printStackTrace();

}

});

thread.setUncaughtExceptionHandler((t,e)->{

System.out.println("线程的名字"+ t.getName());

System.out.println(e);

}); // 通过注入接口的方式

thread.start();

}

ThreadGroup

注意: threadGroup 设置为isDaemon 后,会随最后一个线程结束而销毁,如果没有设置isDaemon ,则需要手动调用 destory()

线程池使用

自己搭建的简单线程池实现

其中ThreadGroup 的应用没有写,但是我们可以观察线程关闭后,检查ThreadGroup 中是否还有活跃的线程等,具体参考ThreadGroup API

import java.util.ArrayList;

import java.util.Iterator;

import java.util.LinkedList;

import java.util.List;

import java.util.stream.IntStream;

/**

* @Author: shengjm

* @Date: 2020/2/10 9:52

* @Description:

*/

public class SimpleThreadPool extends Thread{

/**

* 线程数量

*/

private int size;

private final int queueSize;

/**

* 默认线程队列数量

*/

private final static int DEFAULR_TASK_QUEUE_SIZE = 2000;

private static volatile int seq = 0;

private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_";

private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");

private final static LinkedList TASK_QUEUE = new LinkedList<>();

private final static List THREAD_QUEUE = new ArrayList<>();

private final DiscardPolicy discardPolicy;

private volatile boolean destory = false;

private int min;

private int max;

private int active;

/**

* 定义异常策略的实现

*/

private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {

throw new DiscardException("线程池已经被撑爆了,后继多余的人将丢失");

};

/**

*

*/

public SimpleThreadPool(){

this(4,8,12,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY);

}

/**

*

*/

public Simphttp://leThreadPool(int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) {

this.min = min;

this.active = active;

this.max = max;

this.queueSize = queueSize;

this.discardPolicy = discardPolicy;

init();

}

/**

* 初始化

*/

private void init() {

for(int i = 0; i < min; i++){

createWorkTask();

}

this.size = min;

this.start();

}

private void createWorkTask(){

WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));

task.start();

THREAD_QUEUE.add(task);

}

/**

* 线程池自动扩充

*/

@Override

public void run() {

while(!destory){

System.out.println(this.min +" --- "+this.active+" --- "+this.max + " --- "+ this.size + " --- "+ TASK_QUEUE.size());

try {

Thread.sleephttp://(1000);

if(TASK_QUEUE.size() > active && size < active){

for (int i = size; i < active;i++){

createWorkTask();

}

size = active;

}else if(TASK_QUEUE.size() > max && size < max){

for (int i = size; i < max;i++){

createWorkTask();

}

size = max;

}

synchronized (THREAD_QUEUE){

if(TASK_QUEUE.isEmpty() && size > active){

int release = size - active;

for (Iterator it = THREAD_QUEUE.iterator();it.hasNext();){

if(release <=0){

break;

}

WorkerTask task = it.next();

task.close();

task.interrupt();

it.remove();

release--;

}

size = active;

}

}

} catch (InterruptedException e) {

break;

}

}

}

public void submit(Runnable runnable){

synchronized (TASK_QUEUE){

if(destory){

throw new DiscardException("线程池已经被摧毁了...");

}

if(TASK_QUEUE.size() > queueSize){

discardPolicy.discard();

}

TASK_QUEUE.addLast(runnable);

TASK_QUEUE.notifyAll();

}

}

/**

* 关闭

*/

public void shutdown(){

while(!TASK_QUEUE.isEmpty()){

try {

Thread.sleep(10);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

synchronized (THREAD_QUEUE) {

int initVal = THREAD_QUEUE.size();

while (initVal > 0) {

for (WorkerTask workerTask : THREAD_QUEUE) {

if (workerTask.getTaskState() == TaskState.BLOCKED) {

workerTask.interrupt();

workerTask.close();

initVal--;

} else {

try {

Thread.sleep(10);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

this.destory = true;

}

}

public int getSize() {

return size;

}

public int getMin() {

return min;

}

public int getMax() {

return max;

}

public int getActive() {

return active;

}

/**

* 线程状态

*/

private enum TaskState{

FREE , RUNNING , BLOCKED , DEAD

}

/**

* 自定义异常类

*/

public static class DiscardException extends RuntimeException{

public DiscardException(String message){

super(message);

}

}

/**

* 定义异常策略

*/

@FunctionalInterface

public interface DiscardPolicy{

void discard() throws DiscardException;

}

private static class WorkerTask extends Thread{

private volatile TaskState taskState = TaskState.FREE;

public TaskState getTaskState(){

return this.taskState;

}

public WorkerTask(ThreadGroup group , String name){

super(group , name);

}

@Override

public void run(){

OUTER:

while(this.taskState != TaskState.DEAD){

Runnable runnable;

synchronized (TASK_QUEUE){

while(TASK_QUEUE.isEmpty()){

try {

taskState = TaskState.BLOCKED;

TASK_QUEUE.wait();

} catch (InterruptedException e) {

break OUTER;

}

}

runnable = TASK_QUEUE.removeFirst();

}

if(runnable != null){

taskState = TaskState.RUNNING;

runnable.run();

taskState = TaskState.FREE;

}

}

}

public void close(){

this.taskState = TaskState.DEAD;

}

}

/**

* 测试

* @param args

*/

public static void main(String[] args) {

SimpleThreadPool simpleThreadPool = new SimpleThreadPool();

// SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY);

IntStream.rangeClosed(0,40).forEach(i -> {

simpleThreadPool.submit(() -> {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println("the runnable " + i + "be servered by " + Thread.currentThread());

});

});

// try {

// Thread.sleep(15000);

// } catch (InterruptedException e) {

// e.printStackTrace();

// }

simpleThreadPool.shutdown();

}

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:文本数据库--.Net界未来的一朵奇葩
下一篇:学习笔记—— 一些UPDATE语句
相关文章

 发表评论

暂时没有评论,来抢沙发吧~