• 新版网站前后台即将上线,2019年将致力于提高文章质量,加大原创力度,打造一个更加舒适的阅读体验!
  • 极客文库小编@勤劳的小蚂蚁,为您推荐每日资讯,欢迎关注!
  • 新版网站前后台即将上线,2019年将致力于提高文章质量,加大原创力度,打造一个更加舒适的阅读体验!
  • 如果有任何体验不佳的地方,欢迎向客服反馈!

Java线程池总结

前言

Java 中的线程池十分重要,无论是在实际应用中还是应对面试

一、线程池原理


1.1 使用线程池的好处


第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。

1.2 线程池的实现原理


当向线程池提交任务后,线程池会按下图所示流程去处理这个任务

线程池的主要处理流程

1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。

2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。

3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

对应到代码层面就是ThreadPoolExecutor执行execute()方法。如下图所示:

ThreadPoolExecutor 执行示意图

1)如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

2)如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。

3)如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

4)如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。根据不同的拒绝策略去处理。

ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。

在 ThreadPoolExecutor 完成预热之后(当前运行的线程数大于等于 corePoolSize),几乎所有的 execute()方法调用都是执行步骤 2,而步骤 2 不需要获取全局锁。

二、线程池的使用


2.1 创建线程池


我们可以通过 ThreadPoolExecutor 构造方法来创建一个线程池。

publicThreadPoolExecutor(
   int corePoolSize,
   int maximumPoolSize,                              
      long keepAliveTime,
   TimeUnit unit,
   BlockingQueue<Runnable> workQueue,
   ThreadFactory threadFactory,
   RejectedExecutionHandler handler){
}

介绍一下这几个参数:

1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,如果当前 poolSize<corePoolSize 时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

2)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。

3)keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

4)TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。

5)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法 Executors.newFixedThreadPool()使用了这个队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 Linked-BlockingQueue,静态工厂方法 Executors.newCachedThreadPool 使用了这个队列。

  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

6)ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架 guava 提供的 ThreadFactoryBuilder 可以快速给线程池里的线
程设置有意义的名字,代码如下。

newThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();

7)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。在 JDK 1.5 中 Java 线程池框架提供了以下 4 种策略。

  • AbortPolicy:直接抛出异常。

  • CallerRunsPolicy:只用调用者所在线程来运行任务。

  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

  • DiscardPolicy:不处理,丢弃掉。

当然,也可以根据应用场景需要来实现 RejectedExecutionHandler 接口自定义策略。如记录日志或持久化存储不能处理的任务。

2.2 向线程池提交任务


可以使用两个方法向线程池提交任务,分别为 execute()和 submit()方法。这两个方法的区别就是,execute()用于提交不需要返回值的任务,submit()方法用于提交需要返回值的任务。

execute 方法:

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。通过以下代码可知 execute()方法输入的任务是一个 Runnable 类的实例。

threadsPool.execute(new Runnable() {
   @Override
public void run() {
// TODO Auto-generated method stub
}
});

submit 方法:

线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future 的 get()方法来获取返回值,get()方
法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

Future<Object> future = executor.submit(haveReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}

2.3 关闭线程池


可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。

但是它们存在一定的区别,shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。

只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。

至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。

2.4 实例 Demo


(1)首先构造一个线程池,用 ArrayBlockingQueue 作为其等待队列,队列初始化容量为 1。该线程池核心容量为 10,最大容量为 20,线程存活时间为 1 分钟。

static BlockingQueue blockingQueue=new ArrayBlockingQueue<>(1);
static ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(10, 20, 1, TimeUnit.MINUTES, blockingQueue);

(2)另外构造了一个实现 Runable 接口的类 TaskBusyWithoutResult 类,其模拟一个繁忙的任务:

staticclassTaskBusyWithoutResultimplementsRunnable
{
   publicTaskBusyWithoutResult()
   {
   }
   @Override
   publicvoidrun()
   {
       System.out.println(“线程”+Thread.currentThread()+“开始运行”);
       int i=10000*10000;
       while(i>0)
       {
           i–;
       }
       System.out.println(“线程”+Thread.currentThread()+“运行结束”);
   }
}

(3)向线程池提交 20 个任务,执行任务

publicstaticvoidmain(String[] args){
    for (int i = 0; i < 20; i++) {
        Runnable runnable = new TaskBusyWithoutResult();
        threadPoolExecutor.submit(runnable);
    }
}

三、线程池源码分析


基于 jdk1.8

3.1 认识几个变量


privatefinal AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
   privatestaticfinalint COUNT_BITS = Integer.SIZE – 3;
   privatestaticfinalint CAPACITY   = (1 << COUNT_BITS) – 1;

   // runState is stored in the high-order bits
   privatestaticfinalint RUNNING    = –1 << COUNT_BITS;
   privatestaticfinalint SHUTDOWN   =  0 << COUNT_BITS;
   privatestaticfinalint STOP       =  1 << COUNT_BITS;
   privatestaticfinalint TIDYING    =  2 << COUNT_BITS;
   privatestaticfinalint TERMINATED =  3 << COUNT_BITS;

   // Packing and unpacking ctl
   privatestaticintrunStateOf(int c)     { return c & ~CAPACITY; }
   privatestaticintworkerCountOf(int c)  { return c & CAPACITY; }
   privatestaticintctlOf(int rs, int wc){ return rs | wc; }

后面 execute 方法会用到 ctl,这个变量是为了把工作线程数量和线程池状态放在一个 int 型变量存储而设置的一个原子类型的变量。 在 ctl 中,低位的 29 位表示工作线程的数量,高位用来表示 RUNNING、SHUTDOWN、STOP 等状态。上面定义的三个方法只是为了计算得到线程池的状态和工作线程的数量,以及得到 ctl。

3.2 execute()方法


publicvoidexecute(Runnable command) {
   //如果提交了空的任务 抛出异常
 if (command == null)
   thrownew NullPointerException();
 int c = ctl.get();//获取当前线程池的状态
   //检查当前工作线程数量是否小于核心线程数量
 if (workerCountOf(c) < corePoolSize) {
   //通过 addWorker 方法提交任务
   if (addWorker(command, true))
     return;
   c = ctl.get();//如果提交失败 需要二次检查状态
 }
 //向工作线程提交任务
 if (isRunning(c) && workQueue.offer(command)) {
   // 再次检查状态
      int recheck = ctl.get();
     
      if (! isRunning(recheck) && remove(command))
        reject(command);
      elseif (workerCountOf(recheck) == 0)
        addWorker(null, false);
 }
 //扩容失败 则拒绝任务
 elseif (!addWorker(command, false))
   reject(command);
}

这里面有些细节:

(1)if (! isRunning(recheck) && remove(command))这句。由于&&是短路与,意思就是前面如果非真,后面不会执行。所以如果线程池不是 Running 状态时,才把现成移出工作队列,再使用饱和策略。

(2)addWorker(command, false):我们看到有 addWorker(command, true)和 addWorker(command, false)。true 和 false 分别代表在检查工作线程数量的时候是应该与 corePoolSize 对比还是应该 maximumPoolSize 对比。只有第一个 if 时,也就是当前线程数小于 corePoolSize 时才用 addWorker(command, true)。

3.3  addWorker()方法


这个方法是任务提交的一个核心方法。在里面完成了状态检查、新建任务、执行任务等一系列动作。

privatebooleanaddWorker(Runnable firstTask, boolean core){
       retry:
   //死循环更新状态
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);//获取运行状态

       //检查线程池是否处于关闭状态
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()))
               returnfalse;
   
           for (;;) {
         //获取当前工作线程数量
               int wc = workerCountOf(c);
       //如果已经超过 corePoolSize 获取 maximumPoolSize 返回 false
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize))
                   returnfalse;
       //CAS 增加一个工作线程
               if (compareAndIncrementWorkerCount(c))
                break retry;
       //再次获取状态
               c = ctl.get();  // Re-read ctl
       //如果状态更新失败 则循环更新
               if (runStateOf(c) != rs)
                   continue retry;
               // else CAS failed due to workerCount change; retry inner loop
           }
       }

       boolean workerStarted = false;
       boolean workerAdded = false;
       Worker w = null;
       try {
           w = new Worker(firstTask);//初始化一个工作线程
           final Thread t = w.thread;
           if (t != null) {
        //获得锁
               final ReentrantLock mainLock = this.mainLock;
               mainLock.lock();
               try {
                   // Recheck while holding lock.
                   // Back out on ThreadFactory failure or if
                   // shut down before lock acquired.
                   int rs = runStateOf(ctl.get());

                   if (rs < SHUTDOWN ||
                       (rs == SHUTDOWN && firstTask == null)) {
                       if (t.isAlive()) // precheck that t is startable
                           thrownew IllegalThreadStateException();
           //添加工作这到 hashset 中保存
                       workers.add(w);
                       int s = workers.size();
                       if (s > largestPoolSize)
                           largestPoolSize = s;
                       workerAdded = true;
                   }
               } finally {
                   mainLock.unlock();
               }
               if (workerAdded) {
       //工作线程启动 执行第一个任务 就是新提交的任务
                   t.start();
                   workerStarted = true;
               }
           }
       } finally {
           if (! workerStarted)
               addWorkerFailed(w);
       }
       return workerStarted;
   }

这个方法可以分为两个阶段来看,第一个阶段是判断是否有必要新增一个工作线程,如果有则利用 CAS 更新工作线程的数量;第二部分是将提交的任务封装成一个工作线程 Worker 然后加入到线程池的容器中,开始执行新提交的任务。这个 Worker 在执行完任务后,还会循环地获取工作队列里的任务来执行。

Worker 的构造方法如下:

/**
 * Creates with given first task and thread from ThreadFactory.
 * @param firstTask the first task (null if none)
 */
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

3.4 runWorker()方法


在 addWorker 方法快要结束的地方,调用了 t.start()方法,我们知道它实际执行的就是 Worker 对象的 run()方法,而 worker 的 run()方法是这样定义的:

/** Delegates main run loop to outer runWorker  */
publicvoidrun(){
    runWorker(this);
}

它实际上是将自己委托给线程池的 runWorker 方法

final voidrunWorker(Worker w) {
       
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
       try {
       //不断地从 blockingQueue 获取任务
           while (task != null || (task = getTask()) != null) {
               w.lock();
               // If pool is stopping, ensure thread is interrupted;
               // if not, ensure thread is not interrupted.  This
               // requires a recheck in second case to deal with
               // shutdownNow race while clearing interrupt
               if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                     runStateAtLeast(ctl.get(), STOP))) &&
                   !wt.isInterrupted())
                   wt.interrupt();
               try {
           //执行 beforeExecute 方法
                   beforeExecute(wt, task);
                   Throwable thrown = null;
                   try {
           //调用 Runable 的 run 方法
                       task.run();
                   } catch (RuntimeException x) {
                       thrown = x; throw x;
                   } catch (Error x) {
                       thrown = x; throw x;
                   } catch (Throwable x) {
                       thrown = x; thrownew Error(x);
                   } finally {
           // 执行 aferExecute 方法
                       afterExecute(task, thrown);
                   }
               } finally {
                   task = null;
                   w.completedTasks++;
                   w.unlock();
               }
           }
           completedAbruptly = false;
       } finally {
           processWorkerExit(w, completedAbruptly);
       }
}

它在不断执行我们提交的任务的 run 方法。而这个任务可能是我们新提交的,也有可能是从等待队列中获取的。这样就实现了线程池的完成逻辑。


丨极客文库, 版权所有丨如未注明 , 均为原创丨
本网站采用知识共享署名-非商业性使用-相同方式共享 3.0 中国大陆许可协议进行授权
转载请注明原文链接:Java 线程池总结
喜欢 (0)
[247507792@qq.com]
分享 (0)
勤劳的小蚂蚁
关于作者:
温馨提示:本文来源于网络,转载文章皆标明了出处,如果您发现侵权文章,请及时向站长反馈删除。

欢迎 注册账号 登录 发表评论!

  • 精品技术教程
  • 编程资源分享
  • 问答交流社区
  • 极客文库知识库

客服QQ


QQ:2248886839


工作时间:09:00-23:00