最新公告
  • 欢迎您光临极客文库,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入我们
  • FutureTask

    在介绍 Callable 时我们知道它可以有返回值,返回值通过 Future 进行封装。FutureTask 实现了 RunnableFuture 接口,该接口继承自 Runnable 和 Future 接口,这使得 FutureTask 既可以当做一个任务执行,也可以有返回值。

    public class FutureTask implements RunnableFuture
    
    public interface RunnableFuture extends Runnable, Future
    

    当一个计算任务需要执行很长时间,那么就可以用 FutureTask 来封装这个任务,用一个线程去执行该任务,然后其它线程继续执行其它任务。当需要该任务的计算结果时,再通过 FutureTask 的 get() 方法获取。

    public class FutureTaskExample {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureTask futureTask = new FutureTask(new Callable() {
                @Override
                public Integer call() throws Exception {
                    int result = 0;
                    for (int i = 0; i < 100; i++) {
                        Thread.sleep(10);
                        result += i;
                    }
                    return result;
                }
            });
    
            Thread computeThread = new Thread(futureTask);
            computeThread.start();
    
            Thread otherThread = new Thread(() -> {
                System.out.println("other task is running...");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            otherThread.start();
            System.out.println(futureTask.get());
        }
    }
    
    other task is running...
    4950
    

    BlockingQueue

    java.util.concurrent.BlockingQueue 接口有以下阻塞队列的实现:

    FIFO 队列 :LinkedBlockingQueue、ArrayListBlockingQueue(固定长度)
    优先级队列 :PriorityBlockingQueue

    提供了阻塞的 take() 和 put() 方法:如果队列为空 take() 将阻塞,直到队列中有内容;如果队列为满 put() 将阻塞,指到队列有空闲位置。

    使用 BlockingQueue 实现生产者消费者问题

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    
    public class ProducerConsumer {
    
        private static BlockingQueue queue = new ArrayBlockingQueue<>(5);
    
        private static class Producer extends Thread {
            @Override
            public void run() {
                try {
                    queue.put("product");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.print("produce..");
            }
        }
    
        private static class Consumer extends Thread {
    
            @Override
            public void run() {
                try {
                    String product = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.print("consume..");
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 2; i++) {
                Producer producer = new Producer();
                producer.start();
            }
            for (int i = 0; i < 5; i++) {
                Consumer consumer = new Consumer();
                consumer.start();
            }
            for (int i = 0; i < 3; i++) {
                Producer producer = new Producer();
                producer.start();
            }
        }
    }
    
    produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
    

    ForkJoin

    主要用于并行计算中,和 MapReduce 原理类似,都是把大的计算任务拆分成多个小任务并行计算。

    public class ForkJoinExample extends RecursiveTask {
        private final int threhold = 5;
        private int first;
        private int last;
    
        public ForkJoinExample(int first, int last) {
            this.first = first;
            this.last = last;
        }
    
        @Override
        protected Integer compute() {
            int result = 0;
            if (last - first <= threhold) {
                // 任务足够小则直接计算
                for (int i = first; i <= last; i++) {
                    result += i;
                }
            } else {
                // 拆分成小任务
                int middle = first + (last - first) / 2;
                ForkJoinExample leftTask = new ForkJoinExample(first, middle);
                ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
                leftTask.fork();
                rightTask.fork();
                result = leftTask.join() + rightTask.join();
            }
            return result;
        }
    }
    
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinExample example = new ForkJoinExample(1, 10000);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Future result = forkJoinPool.submit(example);
        System.out.println(result.get());
    }
    

    ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

    public class ForkJoinPool extends AbstractExecutorService
    

    ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。每个线程都维护了一个双端队列,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例如下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。

    本站所有文章均由网友分享,仅用于参考学习用,请勿直接转载,如有侵权,请联系网站客服删除相关文章。若由于商用引起版权纠纷,一切责任均由使用者承担
    极客文库 » Java并发(八)J.U.C – 其它组件

    常见问题FAQ

    如果资源链接失效了怎么办?
    本站用户分享的所有资源都有自动备份机制,如果资源链接失效,请联系本站客服QQ:2580505920更新资源地址。
    如果用户分享的资源与描述不符怎么办?
    可以联系客服QQ:2580505920,如果要求合理可以安排退款或者退赞助积分。
    如何分享个人资源获取赞助积分或其他奖励?
    本站用户可以分享自己的资源,但是必须保证资源没有侵权行为。点击个人中心,根据操作填写并上传即可。资源所获收益完全归属上传者,每周可申请提现一次。
    如果您发现了本资源有侵权行为怎么办?
    及时联系客服QQ:2580505920,核实予以删除。

    Leave a Reply

    Hi, 如果你对这款资源有疑问,可以跟我联系哦!

    联系发布者

    Leave a Reply

    Hi, 如果你对这款资源有疑问,可以跟我联系哦!

    联系发布者
    • 102会员总数(位)
    • 3674资源总数(个)
    • 2本周发布(个)
    • 0 今日发布(个)
    • 136稳定运行(天)

    欢迎加入「极客文库」,成为原创作者从这里开始!

    立即加入 了解更多
    成为赞助用户享有更多特权立即升级