深入理解线程通信

前言

开发中不免会遇到需要所有子线程执行完毕通知主线程处理某些逻辑的场景。
或者是线程 A 在执行到某个条件通知线程 B 执行某个操作。
可以通过以下几种方式实现:

等待通知机制

等待通知模式是 Java 中比较经典的线程通信方式。
两个线程通过对同一对象调用等待 wait() 和通知 notify() 方法来进行通讯。
如两个线程交替打印奇偶数:
  1. publicclassTwoThreadWaitNotify{
  2.    privateint start =1;
  3.    privateboolean flag =false;
  4.    publicstaticvoid main(String[] args){
  5.        TwoThreadWaitNotify twoThread =newTwoThreadWaitNotify();
  6.        Thread t1 =newThread(newOuNum(twoThread));
  7.        t1.setName("A");
  8.        Thread t2 =newThread(newJiNum(twoThread));
  9.        t2.setName("B");
  10.        t1.start();
  11.        t2.start();
  12.    }
  13.    /**
  14.     * 偶数线程
  15.     */
  16.    publicstaticclassOuNumimplementsRunnable{
  17.        privateTwoThreadWaitNotify number;
  18.        publicOuNum(TwoThreadWaitNotify number){
  19.            this.number = number;
  20.        }
  21.        @Override
  22.        publicvoid run(){
  23.            while(number.start <=100){
  24.                synchronized(TwoThreadWaitNotify.class){
  25.                    System.out.println("偶数线程抢到锁了");
  26.                    if(number.flag){
  27.                        System.out.println(Thread.currentThread().getName()+"+-+偶数"+ number.start);
  28.                        number.start++;
  29.                        number.flag =false;
  30.                        TwoThreadWaitNotify.class.notify();
  31.                    }else{
  32.                        try{
  33.                            TwoThreadWaitNotify.class.wait();
  34.                        }catch(InterruptedException e){
  35.                            e.printStackTrace();
  36.                        }
  37.                    }
  38.                }
  39.            }
  40.        }
  41.    }
  42.    /**
  43.     * 奇数线程
  44.     */
  45.    publicstaticclassJiNumimplementsRunnable{
  46.        privateTwoThreadWaitNotify number;
  47.        publicJiNum(TwoThreadWaitNotify number){
  48.            this.number = number;
  49.        }
  50.        @Override
  51.        publicvoid run(){
  52.            while(number.start <=100){
  53.                synchronized(TwoThreadWaitNotify.class){
  54.                    System.out.println("奇数线程抢到锁了");
  55.                    if(!number.flag){
  56.                        System.out.println(Thread.currentThread().getName()+"+-+奇数"+ number.start);
  57.                        number.start++;
  58.                        number.flag =true;
  59.                        TwoThreadWaitNotify.class.notify();
  60.                    }else{
  61.                        try{
  62.                            TwoThreadWaitNotify.class.wait();
  63.                        }catch(InterruptedException e){
  64.                            e.printStackTrace();
  65.                        }
  66.                    }
  67.                }
  68.            }
  69.        }
  70.    }
  71. }
输出结果:
  1. t2+-+奇数93
  2. t1+-+偶数94
  3. t2+-+奇数95
  4. t1+-+偶数96
  5. t2+-+奇数97
  6. t1+-+偶数98
  7. t2+-+奇数99
  8. t1+-+偶数100
这里的线程 A 和线程 B 都对同一个对象 TwoThreadWaitNotify.class 获取锁,A 线程调用了同步对象的 wait() 方法释放了锁并进入 WAITING 状态。
B 线程调用了 notify() 方法,这样 A 线程收到通知之后就可以从 wait() 方法中返回。
这里利用了 TwoThreadWaitNotify.class 对象完成了通信。
有一些需要注意:
  • wait() 、nofify() 、nofityAll() 调用的前提都是获得了对象的锁(也可称为对象监视器)。
  • 调用 wait() 方法后线程会释放锁,进入 WAITING 状态,该线程也会被移动到等待队列中。
  • 调用 notify() 方法会将等待队列中的线程移动到同步队列中,线程状态也会更新为 BLOCKED
  • 从 wait() 方法返回的前提是调用 notify() 方法的线程释放锁,wait() 方法的线程获得锁。
等待通知有着一个经典范式:
线程 A 作为消费者:
  1. 获取对象的锁。
  2. 进入 while(判断条件),并调用 wait() 方法。
  3. 当条件满足跳出循环执行具体处理逻辑。
线程 B 作为生产者:
  1. 获取对象锁。
  2. 更改与线程 A 共用的判断条件。
  3. 调用 notify() 方法。
伪代码如下:
  1. //Thread A
  2. synchronized(Object){
  3.    while(条件){
  4.        Object.wait();
  5.    }
  6.    //do something
  7. }
  8. //Thread B
  9. synchronized(Object){
  10.    条件=false;//改变条件
  11.    Object.notify();
  12. }

join() 方法

  1.    privatestaticvoid join()throwsInterruptedException{
  2.        Thread t1 =newThread(newRunnable(){
  3.            @Override
  4.            publicvoid run(){
  5.                LOGGER.info("running");
  6.                try{
  7.                    Thread.sleep(3000);
  8.                }catch(InterruptedException e){
  9.                    e.printStackTrace();
  10.                }
  11.            }
  12.        });
  13.        Thread t2 =newThread(newRunnable(){
  14.            @Override
  15.            publicvoid run(){
  16.                LOGGER.info("running2");
  17.                try{
  18.                    Thread.sleep(4000);
  19.                }catch(InterruptedException e){
  20.                    e.printStackTrace();
  21.                }
  22.            }
  23.        });
  24.        t1.start();
  25.        t2.start();
  26.        //等待线程1终止
  27.        t1.join();
  28.        //等待线程2终止
  29.        t2.join();
  30.        LOGGER.info("main over");
  31.    }
输出结果:
  1. 2018-03-1620:21:30.967[Thread-1] INFO  c.c.actual.ThreadCommunication- running2
  2. 2018-03-1620:21:30.967[Thread-0] INFO  c.c.actual.ThreadCommunication- running
  3. 2018-03-1620:21:34.972[main] INFO  c.c.actual.ThreadCommunication- main over
t1.join() 时会一直阻塞到 t1 执行完毕,所以最终主线程会等待 t1 和 t2 线程执行完毕。
其实从源码可以看出,join() 也是利用的等待通知机制:
核心逻辑:
  1.    while(isAlive()){
  2.        wait(0);
  3.    }
在 join 线程完成后会调用 notifyAll() 方法,是在 JVM 实现中调用,所以这里看不出来。

volatile 共享内存

因为 Java 是采用共享内存的方式进行线程通信的,所以可以采用以下方式用主线程关闭 A 线程:
  1. publicclassVolatileimplementsRunnable{
  2.    privatestaticvolatileboolean flag =true;
  3.    @Override
  4.    publicvoid run(){
  5.        while(flag){
  6.            System.out.println(Thread.currentThread().getName()+"正在运行。。。");
  7.        }
  8.        System.out.println(Thread.currentThread().getName()+"执行完毕");
  9.    }
  10.    publicstaticvoid main(String[] args)throwsInterruptedException{
  11.        Volatile aVolatile =newVolatile();
  12.        newThread(aVolatile,"thread A").start();
  13.        System.out.println("main 线程正在运行");
  14.        TimeUnit.MILLISECONDS.sleep(100);
  15.        aVolatile.stopThread();
  16.    }
  17.    privatevoid stopThread(){
  18.        flag =false;
  19.    }
  20. }
输出结果:
  1. thread A正在运行。。。
  2. thread A正在运行。。。
  3. thread A正在运行。。。
  4. thread A正在运行。。。
  5. thread A执行完毕
这里的 flag 存放于主内存中,所以主线程和线程 A 都可以看到。
flag 采用 volatile 修饰主要是为了内存可见性,更多内容可以查看这里

CountDownLatch 并发工具

CountDownLatch 可以实现 join 相同的功能,但是更加的灵活。
  1.    privatestaticvoid countDownLatch()throwsException{
  2.        int thread =3;
  3.        long start =System.currentTimeMillis();
  4.        finalCountDownLatch countDown =newCountDownLatch(thread);
  5.        for(int i=0;i<thread ; i++){
  6.            newThread(newRunnable(){
  7.                @Override
  8.                publicvoid run(){
  9.                    LOGGER.info("thread run");
  10.                    try{
  11.                        Thread.sleep(2000);
  12.                        countDown.countDown();
  13.                        LOGGER.info("thread end");
  14.                    }catch(InterruptedException e){
  15.                        e.printStackTrace();
  16.                    }
  17.                }
  18.            }).start();
  19.        }
  20.        countDown.await();
  21.        long stop =System.currentTimeMillis();
  22.        LOGGER.info("main over total time={}",stop-start);
  23.    }
输出结果:
  1. 2018-03-1620:19:44.126[Thread-0] INFO  c.c.actual.ThreadCommunication- thread run
  2. 2018-03-1620:19:44.126[Thread-2] INFO  c.c.actual.ThreadCommunication- thread run
  3. 2018-03-1620:19:44.126[Thread-1] INFO  c.c.actual.ThreadCommunication- thread run
  4. 2018-03-1620:19:46.136[Thread-2] INFO  c.c.actual.ThreadCommunication- thread end
  5. 2018-03-1620:19:46.136[Thread-1] INFO  c.c.actual.ThreadCommunication- thread end
  6. 2018-03-1620:19:46.136[Thread-0] INFO  c.c.actual.ThreadCommunication- thread end
  7. 2018-03-1620:19:46.136[main] INFO  c.c.actual.ThreadCommunication- main over total time=2012
CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 实现的,更多实现参考 ReentrantLock 实现原理
  • 初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法。
  • 该方法会将 AQS 内置的一个 state 状态 -1 。
  • 最终在主线程调用 await() 方法,它会阻塞直到 state==0 的时候返回。

线程响应中断

  1. publicclassStopThreadimplementsRunnable{
  2.    @Override
  3.    publicvoid run(){
  4.        while(!Thread.currentThread().isInterrupted()){
  5.            // 线程执行具体逻辑
  6.            System.out.println(Thread.currentThread().getName()+"运行中。。");
  7.        }
  8.        System.out.println(Thread.currentThread().getName()+"退出。。");
  9.    }
  10.    publicstaticvoid main(String[] args)throwsInterruptedException{
  11.        Thread thread =newThread(newStopThread(),"thread A");
  12.        thread.start();
  13.        System.out.println("main 线程正在运行");
  14.        TimeUnit.MILLISECONDS.sleep(10);
  15.        thread.interrupt();
  16.    }
  17. }
输出结果:
  1. thread A运行中。。
  2. thread A运行中。。
  3. thread A退出。。
可以采用中断线程的方式来通信,调用了 thread.interrupt() 方法其实就是将 thread 中的一个标志属性置为了 true。
并不是说调用了该方法就可以中断线程,如果不对这个标志进行响应其实是没有什么作用(这里对这个标志进行了判断)。
但是如果抛出了 InterruptedException 异常,该标志就会被 JVM 重置为 false。

线程池 awaitTermination() 方法

如果是用线程池来管理线程,可以使用以下方式来让主线程等待线程池中所有任务执行完毕:
  1.    privatestaticvoid executorService()throwsException{
  2.        BlockingQueue<Runnable> queue =newLinkedBlockingQueue<>(10);
  3.        ThreadPoolExecutor poolExecutor =newThreadPoolExecutor(5,5,1,TimeUnit.MILLISECONDS,queue);
  4.        poolExecutor.execute(newRunnable(){
  5.            @Override
  6.            publicvoid run(){
  7.                LOGGER.info("running");
  8.                try{
  9.                    Thread.sleep(3000);
  10.                }catch(InterruptedException e){
  11.                    e.printStackTrace();
  12.                }
  13.            }
  14.        });
  15.        poolExecutor.execute(newRunnable(){
  16.            @Override
  17.            publicvoid run(){
  18.                LOGGER.info("running2");
  19.                try{
  20.                    Thread.sleep(2000);
  21.                }catch(InterruptedException e){
  22.                    e.printStackTrace();
  23.                }
  24.            }
  25.        });
  26.        poolExecutor.shutdown();
  27.        while(!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
  28.            LOGGER.info("线程还在执行。。。");
  29.        }
  30.        LOGGER.info("main over");
  31.    }
输出结果:
  1. 2018-03-1620:18:01.273[pool-1-thread-2] INFO  c.c.actual.ThreadCommunication- running2
  2. 2018-03-1620:18:01.273[pool-1-thread-1] INFO  c.c.actual.ThreadCommunication- running
  3. 2018-03-1620:18:02.273[main] INFO  c.c.actual.ThreadCommunication-线程还在执行。。。
  4. 2018-03-1620:18:03.278[main] INFO  c.c.actual.ThreadCommunication-线程还在执行。。。
  5. 2018-03-1620:18:04.278[main] INFO  c.c.actual.ThreadCommunication- main over
使用这个 awaitTermination() 方法的前提需要关闭线程池,如调用了 shutdown() 方法。
调用了 shutdown() 之后线程池会停止接受新任务,并且会平滑的关闭线程池中现有的任务。

管道通信

  1.    publicstaticvoid piped()throwsIOException{
  2.        //面向于字符 PipedInputStream 面向于字节
  3.        PipedWriter writer =newPipedWriter();
  4.        PipedReader reader =newPipedReader();
  5.        //输入输出流建立连接
  6.        writer.connect(reader);
  7.        Thread t1 =newThread(newRunnable(){
  8.            @Override
  9.            publicvoid run(){
  10.                LOGGER.info("running");
  11.                try{
  12.                    for(int i =0; i <10; i++){
  13.                        writer.write(i+"");
  14.                        Thread.sleep(10);
  15.                    }
  16.                }catch(Exception e){
  17.                }finally{
  18.                    try{
  19.                        writer.close();
  20.                    }catch(IOException e){
  21.                        e.printStackTrace();
  22.                    }
  23.                }
  24.            }
  25.        });
  26.        Thread t2 =newThread(newRunnable(){
  27.            @Override
  28.            publicvoid run(){
  29.                LOGGER.info("running2");
  30.                int msg =0;
  31.                try{
  32.                    while((msg = reader.read())!=-1){
  33.                        LOGGER.info("msg={}",(char) msg);
  34.                    }
  35.                }catch(Exception e){
  36.                }
  37.            }
  38.        });
  39.        t1.start();
  40.        t2.start();
  41.    }
输出结果:
  1. 2018-03-1619:56:43.014[Thread-0] INFO  c.c.actual.ThreadCommunication- running
  2. 2018-03-1619:56:43.014[Thread-1] INFO  c.c.actual.ThreadCommunication- running2
  3. 2018-03-1619:56:43.130[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=0
  4. 2018-03-1619:56:43.132[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=1
  5. 2018-03-1619:56:43.132[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=2
  6. 2018-03-1619:56:43.133[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=3
  7. 2018-03-1619:56:43.133[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=4
  8. 2018-03-1619:56:43.133[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=5
  9. 2018-03-1619:56:43.133[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=6
  10. 2018-03-1619:56:43.134[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=7
  11. 2018-03-1619:56:43.134[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=8
  12. 2018-03-1619:56:43.134[Thread-1] INFO  c.c.actual.ThreadCommunication- msg=9
Java 虽说是基于内存通信的,但也可以使用管道通信。
需要注意的是,输入流和输出流需要首先建立连接。这样线程 B 就可以收到线程 A 发出的消息了。
实际开发中可以灵活根据需求选择最适合的线程通信方式。

本站所有文章均来自互联网,如有侵权,请联系站长删除。极客文库 » 深入理解线程通信
分享到:
赞(0)

评论抢沙发

评论前必须登录!