up:: 线程池拒绝任务的两个时机和四种策略简介

说明:

(1) 本篇博客的主要内容,简单介绍ThreadPoolExecutor提供的钩子方法;ThreadPoolExecutor提供了三个钩子方法,本篇博客仅仅演示了beforeExecute()方法;

(2) 声明:

● 并发(包括线程)中的内容,还是比较多的;本专栏介绍多线程,只能算是一个入门;

● 所以,本篇博客,如果做到,知道ThreadPoolExecutor有钩子方法这个东西,就足够了;


一:演示;

1.自定义一个线程池类:PauseableThreadPoolTest;

 
     package threadPool;
 
     import java.util.concurrent.*;
     import java.util.concurrent.locks.Condition;
     import java.util.concurrent.locks.ReentrantLock;
 
     public class PauseableThreadPoolTest extends ThreadPoolExecutor {
 
         //为了保证,我们在修改isPaused的时候,
         private final ReentrantLock lock = new ReentrantLock();
         private boolean isPaused;//标记位,标记线程是否是处于暂停状态;
 
         private Condition unpausedCondition = lock.newCondition();
 
         public PauseableThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable>  workQueue) {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue);
         }
 
 
         public PauseableThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable>  workQueue, ThreadFactory threadFactory) {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue, threadFactory);
         }
 
 
         public PauseableThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable>  workQueue, RejectedExecutionHandler handler) {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue, handler);
         }
 
 
         public PauseableThreadPoolTest(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable>  workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
             super(corePoolSize, maximumPoolSize, keepAliveTime, unit,  workQueue, threadFactory, handler);
         }
 
 
 
         /**
          * 可以实现【暂停逻辑】的方法;
          */
         public void pause() {
             lock.lock();//加锁
             try{
                 isPaused = true;//把标记为设为true
             }finally {
                 lock.unlock();//解锁
             }
         }
 
         /**
          * 可以实现【恢复逻辑】的方法;
          */
         public void resume() {
             lock.lock();//加锁
             try {
                 isPaused = false;//把标记为设为false
                 unpausedCondition.signalAll();//使用signalAll()方法,去唤醒所有的线程;
             }finally {
                 lock.unlock();//解锁
             }
         }
 
         /**
          * 通过该类可以获得的线程池;然后,该线程池中的线程可以去执行任务;那么,在任务执行前,就会执行beforeExecute()方法;
          * @param t
          * @param r
          */
         @Override
         protected void beforeExecute(Thread t, Runnable r) {
             super.beforeExecute(t, r);
             lock.lock();//首先,加锁
             try {
                 //如果检测到,isPaused这个标记为为true,就表示我们想让线程暂停了;
                 while (isPaused) {
                     unpausedCondition.await();//那么,就利用await()方法,去休眠当前线程;
                 }
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }finally {
                 lock.unlock();//最后,释放锁
             }
         }
     }

说明:

(1) ThreadPoolExecutor类是普通的、创建线程池的类;我们自定义的这个线程池类,继承了ThreadPoolExecutor类;

(2) 内容说明;

(3) 这儿为了保证资源在多线程中的安全,使用到了ReentrantLock和Condition;对于这些锁相关的内容,可以暂时不深究;以后,自己系统学习并发的内容的时候,自然会了解;

(4) 其实,这儿的beforeExecute()方法,这就是ThreadPoolExecutor提供的三个钩子方法之一;这也是,本篇博客主要的目的;

● ThreadPoolExecutor提供了三个钩子方法,可以在任务执行前后或者终止时做一些额外的操作。

● 这三个钩子方法是:beforeExecute():线程池执行某个任务前会调用,afterExecute():任务结束后(任务异常退出)会执行,terminated():线程池执行结束后执行;

● 而,本篇博客演示的就是beforeExecute();

● 在实际开中,对于线程池,我们就可以利用钩子方法,在每个任务执行前或执行后,做一些事情;比如,日志、统计等;

● 其实,这很容易就能联想到过滤器、拦截器、AOP;虽然线程池的钩子方法和上面是三个不是一种东西,但是其中的思想能感受到有点相通;

2.创建PauseableThreadPoolTestTest类,创建线程池,去演示;

 
     package threadPool;
 
     import java.util.concurrent.LinkedBlockingQueue;
     import java.util.concurrent.TimeUnit;
 
     public class PauseableThreadPoolTestTest {
 
         public static void main(String[] args) throws InterruptedException {
             //创建一个自动定义的线程池
             PauseableThreadPoolTest pauseableThreadPoolTest = new PauseableThreadPoolTest(10,
                     20, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<());
 
						//利用匿名内部类的方式,创建一个实现了Runnable接口的任务对象;(PS:只有一个类实现了Runnable接口,该类才能够被线程管理起来)
             Runnable runnable = new Runnable() {
                 @Override
                 public void run() {
                     //这个任务对象:这儿仅出于演示目的,这儿打印了一点文字,然后让任务所在线程休眠一下;
                     System.out.println("我被执行了……");
                     try {
                         Thread.sleep(10);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
                 }
             };
 
 
             //向线程池,放10000个任务;
             for (int i = 0; i <10000 ; i++) {
                 pauseableThreadPoolTest.execute(runnable);
             }
 
             //当前方法所在的线程(具体到这儿就是main的主线程)休眠1500毫秒
             Thread.sleep(1500);
             //然后,我们在当前main的主线程中,调用线程池中的定义的pause方法,把线程池中的isPause标记位设置为true;
             pauseableThreadPoolTest.pause();
             //那么,【一旦main的主线程中的这条语句运行了】→【线程池中的isPause变为了true】→
             //→【那么,线程池中的,后续的线程在执行任务前,执行beforeExecute()时,就会发现isPause变为了true】→【那么,根据beforeExecute()
             // 的逻辑,这些后续的线程就会休眠】
             System.out.println("线程池中的线程,被暂停了");
             //为了能让【上面打印的“线程池中的线程,被暂停了”多显示一会】让main方法的主线程休眠1500毫秒;
             Thread.sleep(1500);
             //调用线程池中的定义的resume方法,把线程池中的isPause标记位设置为false,并唤醒线程池中的所有线程
             pauseableThreadPoolTest.resume();
         }
     }

说明:

(1) 内容说明;

● 有匿名内部类,如有需要可以参考Java多态

(2) 看注释吧;

(3) 运行结果;