• 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html
  • 极客专栏正式上线!欢迎访问 https://www.jikewenku.com/topic.html

Java后端开发面试问题总结(下2)

技术杂谈 勤劳的小蚂蚁 3个月前 (01-08) 88次浏览 已收录 0个评论 扫描二维码

本文主要涉及下面30个【并发/多线程】问题,内容较多,大家慢慢消化:

  • 1、两个线程对可以同一个ArrayList进行add操作吗?会出现什么结果?
  • 2、volatile和synchronized讲一下?
  • 3、synchronized和重入锁的区别?
  • 4、synchronized作了哪些优化?
  • 5、Java中线程的创建方式有哪些?
  • 6、Java中线程池怎么实现的,核心参数讲一讲?
  • 7、BIO、NIO、AIO的区别?
  • 8、两个线程交替打印奇数和偶数?
  • 9、进程间通信的方式?线程间通信的方式?
  • 10、原子类比如AtomicInteger为什么能保证原子性?
  • 11、实现一个简单的线程池?
  • 12、实现生产者-消费者模型?
  • 13、介绍下J.U.C.下的类?
  • 14、读写锁用过没?
  • 15、自旋锁是什么,为什么要用自旋锁?自选锁的缺点?
  • 16、进程和线程的区别?
  • 17、线程的死锁指什么?如何检测死锁?如何解决死锁?
  • 18、CPU线程调度?
  • 19、HashMap在多线程下有可能出现什么问题?
  • 20、ConcurrentHashMap是如何保证线程安全的?
  • 21、ThreadLocal的作用和实现原理?
  • 22、ArrayBlockingQueue和LinkedBlockingQueue的区别?
  • 23、CountDownLatch和CyclicBarrier的区别?
  • 24、synchronized内部实现原理?
  • 25、什么叫做锁的可重入?
  • 26、Java线程生命周期的状态?
  • 27、被notify()唤醒的线程可以立即得到执行吗?
  • 28、ThreadLocal的应用场景?
  • 29、sleep、wait、yield的区别和联系?
  • 30、Thread类中的start和run方法区别?

并发/多线程

1、两个线程对可以同一个ArrayList进行add操作吗?会出现什么结果?

  1. import java.util.ArrayList;
  2. import java.util.List;
  3. publicclass A {
  4.     staticList<Integer> list =newArrayList<>();
  5.     staticclass BB implementsRunnable{
  6.        @Override
  7.        publicvoid run(){
  8.            for(int j =0; j <100; j++){
  9.                list.add(j);
  10.            }
  11.        }
  12.    }
  13.    publicstaticvoid main(String[] args)throwsInterruptedException{
  14.        BB b =new BB();
  15.        Thread t1 =newThread(b);
  16.        Thread t2 =newThread(b);
  17.        t1.start();
  18.        t2.start();
  19.        t1.join();
  20.        t2.join();
  21.        System.out.println(list.size());
  22.    }
  23. }
比如上面的例子,打印的结果不一定是200.
因为ArrayList不是线程安全的,问题出在add方法
  1. publicboolean add(E e){
  2.    ensureCapacityInternal(size +1);  // Increments modCount!!
  3.    elementData[size++]= e;
  4.    returntrue;
  5. }
上面的程序,可能有三种情况发生:
  • 数组下标越界。首先要检查容量,必要时进行扩容。每当在数组边界处,如果A线程和B线程同时进入并检查容量,也就是它们都执行完ensureCapacityInternal方法,因为还有一个空间,所以不进行扩容,此时如果A暂停下来,B成功自增;然后接着A从 elementData[size++]=e开始执行,由于A之前已经检查过没有扩容,而B成功自增使得现在没有空余空间了,此时A就会发生数组下标越界。
  • 小于200。size++可以看成是 size=size+1,这一行代码包括三个步骤,先读取size,然后将size加1,最后将这个新值写回到size。此时若A和B线程同时读取到size假设为10,B先自增成功size变11,然后回来A因为它读到的size也是10,所以自增后写入size被更新成11,也就是说两次自增,实际上size只增大了1。因此最后的size会小于200。
  • 200。运气很好,没有发生以上的情况。

2、volatile和synchronized讲一下?

synchronized保证了当有多个线程同时操作共享数据时,任何时刻只有一个线程能进入临界区操作共享数据,其他线程必须等待。因此它可以保证操作的原子性。synchronized通过同步锁保证线程安全,进入临界区前必须获得对象的锁,其他没有获得锁的线程不可进入。当临界区中的线程操作完毕后,它会释放锁,此时其他线程可以竞争锁,得到锁的那个线程便可以进入临界区。
synchronized还可以保证可见性。因为对一个变量的unlock操作之前,必须先把次变量同步回主内存中。它还可以保证有序性,因为一个变量在任何时刻只能有一个线程对其进行lock操作(也就是任何时刻只有一个线程可以获得该锁对象),这决定了持有同一把锁的两个同步块只能串行进入。
volatile是一个关键字,用于修饰变量。被其修饰的变量具有可见性和有序性。
可见性,当一条线程修改了这个变量的值,新值能被其他线程立刻观察到。具体来说,volatile的作用是:在本CPU对变量的修改直接写入主内存中,同时这个写操作使得其他CPU中对应变量的缓存行无效,这样其他线程在读取这个变量时候必须从主内存中读取,所以读取到的是最新的,这就是上面说得能被立即“看到”。
有序性,volatile可以禁止指令重排。volatile在其汇编代码中有一个lock操作,这个操作相当于一个内存屏障,指令重排不能越过内存屏障。具体来说在执行到volatile变量时,内存屏障之前的语句一定被执行过了且结果对后面是已知的,而内存屏障后面的语句一定还没执行到;在volatile变量之前的语句不能被重排后其之后,相反其后的语句也不能被重排到之前。

3、synchronized和重入锁的区别?

synchronized是JVM的内置锁,而重入锁是Java代码实现的。重入锁是synchronized的扩展,可以完全代替后者。重入锁可以重入,允许同一个线程连续多次获得同一把锁。其次,重入锁独有的功能有:
  • 可以相应中断,synchronized要么获得锁执行,要么保持等待。而重入锁可以响应中断,使得线程在迟迟得不到锁的情况下,可以不再等待。主要由 lockInterruptibly()实现,这是一个可以对中断进行响应的锁申请动作,锁中断可以避免死锁。
  • 锁的申请可以有等待时限,用 tryLock()可以实现限时等待,如果超时还未获得锁会返回false,也防止了线程迟迟得不到锁时一直等待,可避免死锁。
  • 公平锁,即锁的获得按照线程先来后到的顺序依次获得,不会产生饥饿现象。synchronized的锁默认是不公平的,重入锁可通过传入构造方法的参数实现公平锁。
  • 重入锁可以绑定多个Condition条件,这些condition通过调用await/singal实现线程间通信。

4、synchronized作了哪些优化?

synchronized对内置锁引入了偏向锁、轻量级锁、自旋锁、锁消除等优化。使得性能和重入锁差不多了。
  • 偏向锁:偏向锁会偏向第一个获得它的线程,如果在接下来的执行过程中,该锁没有被其他线程获取,则持有偏向锁的线程永远也不需要再进行同步。偏向锁是在无竞争的情况下把整个同步都消除掉,CAS操作也没有了。适合于同一个线程请求同一个锁,不适用于不同线程请求同一个锁,此时会造成偏向锁失效。
  • 轻量级锁:如果偏向锁失效,虚拟机不会立即挂起线程,会使用一种称为轻量级锁的优化手段,轻量级锁的加锁和解锁都是通过CAS操作完成的。如果线程获得轻量级锁成功,则可以顺利进入临界区。如果轻量级锁加锁失败,表示其他线程抢先得到了锁,轻量级锁将膨胀为重量级锁。
  • 自旋锁:锁膨胀后,虚拟机为了避免线程真实地在操作系统层面挂起,虚拟机还会做最后的努力–自旋锁。如果共享数据的锁定状态只有很短的一段时间,为了这段时间去挂起和恢复线程(都需要转入内核态)并不值得,所以此时让后面请求锁的那个线程稍微等待以下,但不放弃处理器的执行时间。这里的等待其实就是执行了一个忙循环,这就是所谓的自旋。虚拟机会让当前线程做几个循环,若干次循环后如果得到了锁,就顺利进入临界区;如果还是没得到,这才将线程在操作系统层面挂起。
  • 锁消除:虚拟机即时编译时,对一些代码上要求同步,但被检测到不可能存在共享数据竞争的锁进行消除。锁消除的依据来源于“逃逸分析”技术。堆上的所有数据都不会逃逸出去被其他线程访问到,就可以把它们当栈上的数据对待,认为它们是线程私有的,同步加锁就是没有必要的。

5、Java中线程的创建方式有哪些?

  • 继承Thread并重写run方法
  • 实现Runnable并重写run方法,然后作为参数传入Thread
  • 实现Callable,并重写call(),call方法有返回值。使用FutureTask包装Callable实现类,其中FutureTask实现了Runnable和Future接口,最后将FutureTask作为参数传入Thread中
  • 由线程池创建并管理线程。

6、Java中线程池怎么实现的,核心参数讲一讲?

Executors是线程池的工厂类,通过调用它的静态方法如
  1. Executors.newCachedThreadPool();
  2. Executors.newFixedThreadPool(n);
可返回一个线程池。这些静态方法统一返回一个 ThreadPoolExecutor,只是参数不同而已。
  1. publicThreadPoolExecutor(int corePoolSize,
  2.                          int maximumPoolSize,
  3.                          long keepAliveTime,
  4.                          TimeUnit unit,
  5.                          BlockingQueue<Runnable> workQueue,
  6.                          ThreadFactory threadFactory,
  7.                          RejectedExecutionHandler handler){}
包括以上几个参数,其中:
  • corePoolSize:指定了线程池中线程的数量;
  • maximumPoolSize:线程池中的最大线程数量;
  • keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的空闲线程的存活时间;
  • unit:上一个参数keepAliveTime的单位
  • 任务队列,被提交但还未被执行额任务
  • threadFactory:线程工厂,用于创建线程,一般用默认工厂即可。
  • handler:拒绝策略。当任务太多来不及处理的时候,采用什么方法拒绝任务。
最重要的是任务队列和拒绝策略。
任务队列主要有ArrayBlockingQueue有界队列、LinkedBlockingQueue无界队列、SynchronousQueue直接提交队列。
使用ArrayBlockingQueue,当线程池中实际线程数小于核心线程数时,直接创建线程执行任务;当大于核心线程数而小于最大线程数时,提交到任务队列中;因为这个队列是有界的,当队列满时,在不大于最大线程的前提下,创建线程执行任务;若大于最大线程数,执行拒绝策略。
使用LinkedBlockingQueue时,当线程池中实际线程数小于核心线程数时,直接创建线程执行任务;当大于核心线程数而小于最大线程数时,提交到任务队列中;因为这个队列是有无界的,所以之后提交的任务都会进入任务队列中。newFixedThreadPool就采用了无界队列,同时指定核心线程和最大线程数一样。
使用SynchronousQueue时,该队列没有容量,对提交任务的不做保存,直接增加新线程来执行任务。newCachedThreadPool使用的是直接提交队列,核心线程数是0,最大线程数是整型的最大值,keepAliveTime是60s,因此当新任务提交时,若没有空闲线程都是新增线程来执行任务,不过由于核心线程数是0,当60s就会回收空闲线程。
当线程池中的线程达到最大线程数时,就要开始执行拒绝策略了。有如下几种
  • 直接抛出异常
  • 在调用者的线程中,运行当前任务
  • 丢弃最老的一个请求,也就是将队列头的任务poll出去
  • 默默丢弃无法处理的任务,不做任何处理

7、BIO、NIO、AIO的区别?

首先要搞明白在I/O中的同步、异步、阻塞、非阻塞是什么意思。
  • 同步I/O。由用户进程自己处理I/O的读写,处理过程中不能做其他事。需要主动去询问I/O状态。
  • 异步I/O。由系统内核完成I/O操作,完成后系统会通知用户进程。
  • 阻塞。I/O请求操作需要的条件不满足,请求操作一直等待,直到条件满足。
  • 非阻塞。 I/O请求操作需要的条件不满足,会立即返回一个标志,而不会一直等待。
现在来看BIO、NIO、AIO的区别。
BIO:同步并阻塞。用户进程在发起一个I/O请求后,必须等待I/O准备就绪,I/O操作也由自己来处理,在IO操作未完成之前,用户进程必须等待。
NIO:同步非阻塞。用户进程发起一个I/O请求后可立即返回去做其他任务,当I/O准备就绪时它会收到通知。接着由这个线程自行进行I/O操作,I/O操作本身还是同步的。
AIO:异步非阻塞。用户进程发起一个I/O操作以后可立即返回去做其他任务,真正的I/O操作由内核完成后通知用户进程。
NIO和AIO的不同:NIO是操作系统通知用户进程I/O已经准备就绪,由用户进程自行完成I/O操作;AIO是操作系统完成I/O后通知用户进程。
BIO是为每一个客户端连接开启一个线程,简单说就是一个连接一个线程。
NIO主要组件有Seletor、Channel、Buffer,数据需要通过BUffer包装后才能使用Channel进行读取和写入。一个Selector可以由一个线程管理,每一个Channel可看作一个客户端连接。一个Selector可以监听多个Channel,即使用一个或极少数的线程来管理大量的客户端连接。当与客户端连接的数据没有准备好时,Selector处于等待状态,一旦某个Channel的准备好了数据,Selector就能立即得到通知。

8、两个线程交替打印奇数和偶数?

先使用synchronized实现。PrintOdd用于打印奇数;PrintEven用于打印偶数。核心就是判断当前count如果是奇数,就让PrintEven阻塞,PrintOdd打印后唤醒在lock对象上等待的PrintEven并且释放锁。此时PrintEven获得锁打印偶数再唤醒PrintOdd,两个线程如此交替唤醒对方就实现了交替打印奇偶数。
  1. publicclassPrintOddEven{
  2.    privatestaticfinalObject lock =newObject();
  3.    privatestaticint count =1;
  4.    staticclassPrintOddimplementsRunnable{
  5.        @Override
  6.        publicvoid run(){
  7.            for(int i =0; i <10; i++){
  8.                synchronized(lock){
  9.                    try{
  10.                        while((count &1)!=1){
  11.                            lock.wait();
  12.                        }
  13.                        System.out.println(Thread.currentThread().getName()+" "+count);
  14.                        count++;
  15.                        lock.notify();
  16.                    }catch(InterruptedException e){
  17.                        e.printStackTrace();
  18.                    }
  19.                }
  20.            }
  21.        }
  22.    }
  23.    staticclassPrintEvenimplementsRunnable{
  24.        @Override
  25.        publicvoid run(){
  26.            for(int i =0; i <10; i++){
  27.                synchronized(lock){
  28.                    try{
  29.                        while((count &1)!=0){
  30.                            lock.wait();
  31.                        }
  32.                        System.out.println(Thread.currentThread().getName()+" "+count);
  33.                        count++;
  34.                        lock.notify();
  35.                    }catch(InterruptedException e){
  36.                        e.printStackTrace();
  37.                    }
  38.                }
  39.            }
  40.        }
  41.    }
  42.    publicstaticvoid main(String[] args){
  43.        newThread(newPrintOdd()).start();
  44.        newThread(newPrintEven()).start();
  45.    }
  46. }
如果要实现3个线程交替打印ABC呢?这次打算使用重入锁,和上面没差多少,但是由于现在有三个线程了,在打印完后需要唤醒其他线程,注意不可使用 sigal(),因为唤醒的线程是随机的,不能保证打印顺序不说,还会造成死循环。一定要使用 sigalAll()唤醒所有线程。
  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. publicclassThreeThreadPrintABC{
  4.    privatestaticReentrantLock lock =newReentrantLock();
  5.    privatestaticCondition wait = lock.newCondition();
  6.    // 用来控制该打印的线程
  7.    privatestaticint count =0;
  8.    publicstaticvoid main(String[] args){
  9.        Thread printA =newThread(newPrintA());
  10.        Thread printB =newThread(newPrintB());
  11.        Thread printC =newThread(newPrintC());
  12.        printA.start();
  13.        printB.start();
  14.        printC.start();
  15.    }
  16.    staticclassPrintAimplementsRunnable{
  17.        @Override
  18.        publicvoid run(){
  19.            for(int i =0; i <10; i++){
  20.                lock.lock();
  21.                try{
  22.                    while((count %3)!=0){
  23.                        wait.await();
  24.                    }
  25.                    System.out.println(Thread.currentThread().getName()+" A");
  26.                    count++;
  27.                    wait.signalAll();
  28.                }catch(InterruptedException e){
  29.                    e.printStackTrace();
  30.                }finally{
  31.                    lock.unlock();
  32.                }
  33.            }
  34.        }
  35.    }
  36.    staticclassPrintBimplementsRunnable{
  37.        @Override
  38.        publicvoid run(){
  39.            for(int i =0; i <10; i++){
  40.                lock.lock();
  41.                try{
  42.                    while((count %3)!=1){
  43.                        wait.await();
  44.                    }
  45.                    System.out.println(Thread.currentThread().getName()+" B");
  46.                    count++;
  47.                    wait.signalAll();
  48.                }catch(InterruptedException e){
  49.                    e.printStackTrace();
  50.                }finally{
  51.                    lock.unlock();
  52.                }
  53.            }
  54.        }
  55.    }
  56.    staticclassPrintCimplementsRunnable{
  57.        @Override
  58.        publicvoid run(){
  59.            for(int i =0; i <10; i++){
  60.                lock.lock();
  61.                try{
  62.                    while((count %3)!=2){
  63.                        wait.await();
  64.                    }
  65.                    System.out.println(Thread.currentThread().getName()+" C");
  66.                    count++;
  67.                    wait.signalAll();
  68.                }catch(InterruptedException e){
  69.                    e.printStackTrace();
  70.                }finally{
  71.                    lock.unlock();
  72.                }
  73.            }
  74.        }
  75.    }
  76. }
如果觉得不好理解,重入锁是可以绑定多个条件的。创建3个Condition分别让三个打印线程在上面等待。A打印完了,唤醒等待在waitB对象上的PrintB;B打印完了唤醒在waitC对象上的PrintC;C打印完了,唤醒在waitA对象上等待的PrintA,如此循环地唤醒对方即可。
  1. import java.util.concurrent.locks.Condition;
  2. import java.util.concurrent.locks.ReentrantLock;
  3. publicclassThreeThreadPrintABC{
  4.    privatestaticReentrantLock lock =newReentrantLock();
  5.    privatestaticCondition waitA = lock.newCondition();
  6.    privatestaticCondition waitB = lock.newCondition();
  7.    privatestaticCondition waitC = lock.newCondition();
  8.    // 用来控制该打印的线程
  9.    privatestaticint count =0;
  10.    publicstaticvoid main(String[] args){
  11.        Thread printA =newThread(newPrintA());
  12.        Thread printB =newThread(newPrintB());
  13.        Thread printC =newThread(newPrintC());
  14.        printA.start();
  15.        printB.start();
  16.        printC.start();
  17.    }
  18.    staticclassPrintAimplementsRunnable{
  19.        @Override
  20.        publicvoid run(){
  21.            for(int i =0; i <10; i++){
  22.                lock.lock();
  23.                try{
  24.                    while((count %3)!=0){
  25.                        waitA.await();
  26.                    }
  27.                    System.out.println(Thread.currentThread().getName()+" A");
  28.                    count++;
  29.                    waitB.signal();
  30.                }catch(InterruptedException e){
  31.                    e.printStackTrace();
  32.                }finally{
  33.                    lock.unlock();
  34.                }
  35.            }
  36.        }
  37.    }
  38.    staticclassPrintBimplementsRunnable{
  39.        @Override
  40.        publicvoid run(){
  41.            for(int i =0; i <10; i++){
  42.                lock.lock();
  43.                try{
  44.                    while((count %3)!=1){
  45.                        waitB.await();
  46.                    }
  47.                    System.out.println(Thread.currentThread().getName()+" B");
  48.                    count++;
  49.                    waitC.signal();
  50.                }catch(InterruptedException e){
  51.                    e.printStackTrace();
  52.                }finally{
  53.                    lock.unlock();
  54.                }
  55.            }
  56.        }
  57.    }
  58.    staticclassPrintCimplementsRunnable{
  59.        @Override
  60.        publicvoid run(){
  61.            for(int i =0; i <10; i++){
  62.                lock.lock();
  63.                try{
  64.                    while((count %3)!=2){
  65.                        waitC.await();
  66.                    }
  67.                    System.out.println(Thread.currentThread().getName()+" C");
  68.                    count++;
  69.                    waitA.signal();
  70.                }catch(InterruptedException e){
  71.                    e.printStackTrace();
  72.                }finally{
  73.                    lock.unlock();
  74.                }
  75.            }
  76.        }
  77.    }
  78. }

9、进程间通信的方式?线程间通信的方式?

进程间通信的方式推荐阅读这篇博客
  • 管道。分为几种管道。普通管道PIPE:单工,单向传输,只能在父子或者兄弟进程间使用;流管道,半双工,可双向传输,只能在父子或兄弟进程间使用;命名管道:可以在许多并不相关的进程之间进行通讯。
  • 消息队列。消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
  • 信号。用于通知接收进程某个事件已经发生
  • 信号量。信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
  • 共享内存。共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC(进程间通信) 方式,它往往与其他通信机制如信号量配合使用,来实现进程间的同步和通信。
  • 套接字。可用于不同机器间的进程通信。
线程间通信的方式:
推荐阅读这篇博客
  • 锁机制。包括互斥锁、条件变量、读写锁。互斥锁以排他方式方式数据被并发修改;读写锁允许多个线程同时读取,对写操作互斥;条件变量可以以原子的方式阻塞进程,直到某个特定条件为真为止。对条件的测试是在互斥锁的保护下进行的。条件变量始终与互斥锁一起使用。
  • 信号量(Semaphore) 机制。包括无名线程信号量和命名线程信号量
  • 信号(Signal)机制。类似进程间的信号处理

10、原子类比如AtomicInteger为什么能保证原子性?

JDK并发包下有一个atomic包,里面实现了一些直接使用CAS操作的线程安全的类型。AtomicInteger就是其中之一。得益于CAS操作,因此保证了原子性。CAS操作具体说一说是什么?
CAS(Compare And Swap),即“比较并交换”。CAS基于乐观的态度,是无锁操作,它操作包含三个参数,当前要更新的变量、期望值、新值,仅当:当前值和预期值一样时,才会将当前值设置为新值;如果当前值和预期值不一样,说明这个变量已经被其他线程修改过了。如果有多个线程同时使用CAS操作一个变量时,只有一个会胜出,并成功更新。其他线程允许放弃操作,也允许再次尝试,直到修改成功为止。CAS操作是由硬件支持的,现在的处理器基本支持原子化的CAS指令。
CAS由什么缺点?如何解决?
可能引发”ABA”问题,即一个变量原来是A,先被修改成B后又修改回了A,由于CAS操作只是比较当前值和预期值是否一样(只比较结果,不在乎过程中状态的变化),在其他线程来看,该变量就好像没有发生过变化。
可以为数据添加时间戳,每次成功修改数据时,不仅更新数据的值,同时要更新时间戳的值。CAS操作时,不仅要比较当前值和预期值,还要比较当前时间戳和预期时间戳。两者都必须满足预期值才能修改成功。

11、实现一个简单的线程池?

实现一个类似于 Executors.newFixedThreadPool(n)的固定大小线程池,当小于corePoolSize时候,优先创建线程去执行该任务;当超过该值时,将任务提交到任务队列中,然后各个线程从任务队列中取任务来执行。
  1. import java.util.HashSet;
  2. import java.util.Set;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.LinkedBlockingQueue;
  6. publicclassMyThreadPool{
  7.    privateint workerCount;
  8.    privateint corePoolSize;
  9.    privateBlockingQueue<Runnable> workQueue;
  10.    privateSet<Worker> workers;
  11.    privatevolatileboolean RUNNING =true;
  12.    publicMyThreadPool(int corePoolSize){
  13.        this.corePoolSize = corePoolSize;
  14.        workQueue =newLinkedBlockingQueue<>();
  15.        workers =newHashSet<>();
  16.    }
  17.    publicvoid execute(Runnable r){
  18.        if(workerCount < corePoolSize){
  19.            addWorker(r);
  20.        }else{
  21.            try{
  22.                workQueue.put(r);
  23.            }catch(InterruptedException e){
  24.                e.printStackTrace();
  25.            }
  26.        }
  27.    }
  28.    privatevoid addWorker(Runnable r){
  29.        workerCount++;
  30.        Worker worker =newWorker(r);
  31.        Thread t = worker.thread;
  32.        workers.add(worker);
  33.        t.start();
  34.    }
  35.    classWorkerimplementsRunnable{
  36.        Runnable task;
  37.        Thread thread;
  38.        publicWorker(Runnable task){
  39.            this.task = task;
  40.            this.thread =newThread(this);
  41.        }
  42.        @Override
  43.        publicvoid run(){
  44.            while(RUNNING){
  45.                Runnable task =this.task;
  46.                // 执行当前的任务,所以把这个任务置空,以免造成死循环
  47.                this.task =null;
  48.                if(task !=null||(task = getTask())!=null){
  49.                    task.run();
  50.                }
  51.            }
  52.        }
  53.    }
  54.    privateRunnable getTask(){
  55.        Runnable r =null;
  56.        try{
  57.            r = workQueue.take();
  58.        }catch(InterruptedException e){
  59.            e.printStackTrace();
  60.        }
  61.        return r;
  62.    }
  63.    publicstaticvoid main(String[] args){
  64.        MyThreadPool threadPool =newMyThreadPool(5);
  65.        Runnable r =newWriter();
  66.        for(int i =0; i <10; i++){
  67.            threadPool.execute(r);
  68.        }
  69.    }
  70. }
  71. classWriterimplementsRunnable{
  72.    @Override
  73.    publicvoid run(){
  74.        System.out.println(Thread.currentThread().getName()+" ");
  75.    }
  76. }
Worker实现了Runnale,是真正执行任务的类。当线程池中工作线程小于核心线程时候,调用addWorker直接start线程执行它的第一个任务。否则,将任务放入任务队列中,等线程来执行它们。Worker中的run方法是一个死循环,执行第一个任务(addWorker时调用start方法执行的那个任务),或者通过getTask方法不断从任务队列中取得任务来执行。正是getTask方法实现了线程的复用,即一个线程虽然只能调用一次start方法,但是后续的任务可以在Worker的run方法里直接调用任务的run方法得以执行。简单来说就是在Worker的run里调用任务的run方法。
任务全部执行完毕后,线程池需要被关闭,否则程序一直死循环。上述代码中并没有实现 shutdown()方法。

12、实现生产者-消费者模型?

可以有几种方式实现生产者-消费者模型:
  • wait()/notify()
  • await()/signal()
  • BlockingQueue
生产者-消费者问题的关键在于:
  • 没有“产品”时,消费者不能消费
  • “产品”线满时,生产者不能生产
如果用队列来存放“产品”:
  • 队列为空时,消费者需要一直等待,不为空时消费者才能取走。
  • 队列为满时,生产者需要一直等待,不为满时生产者才能进行生产。
等待和唤醒可以使用wait()/notify()实现。Java中的阻塞队列BlockingQueue,其 take()和和 put()方法就是阻塞的,内部其实就是await()/signal()方法的配合使用,非常适合作为数据传输的通道。
以ArrayBlockingQueue来说,同步是重入锁保证的。和该lock绑定了两个Condition,一个是notEmpty一个是notFull。简单说明下take和put方法。注意这并不是源码,只是方便理解把核心部分抽取出来。
  1. public E take(){
  2.    lock.lock();
  3.    try{
  4.        // 当队列为空,不能取,必须等待
  5.        while(count==0){
  6.            notEmpty.await();
  7.        }
  8.        // 不再阻塞说明队列有元素了,直接删除并返回
  9.        return dequeue();
  10.    }finally
  11.        lock.unlock();
  12.  }
  13. }
  14. privatevoid enqueue(E x){
  15.    // ...insert element
  16.    // 因为插入了元素,说明队列不为空,唤醒在notEmpty上等待的线程
  17.    notEmpty.signal();
  18. }
  19. publicvoid put(E e){
  20.    lock.lock();
  21.    try{
  22.        // 队列满了,不能放入,必须等待
  23.        while(count == items.length){
  24.            notFull.await();