HengYk Blog

个人站

具有浪漫主义情调的理想主义务实青年


Java Concurrent P4

第三章 JAVA 并发编程(精通篇之线程池)

1. newCachedThreadPool

        newCachedThreadPool是具有缓存性质的线程池,线程最大空闲时间60s,线程可重复利用(缓存特性),没有最大线程数限制。任务耗时端,数量大。


public class NewCachedThreadPoolDemo {

    public static void main(String[] args) {

        ExecutorService cacheThreadPool = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i ++) {
            int index  = i;

//            try {
//                Thread.sleep(index * 1000);
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }

            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println(new Date(System.currentTimeMillis()));
                    try {
                        Thread.sleep(index * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "-->" + index);
                    System.out.println(new Date(System.currentTimeMillis()));
                    System.out.println("\n");
                }
            };

            cacheThreadPool.execute(runnable);
        }

        cacheThreadPool.shutdown();
    }

    /*
    Mon Nov 05 19:33:27 CST 2018
    Mon Nov 05 19:33:27 CST 2018
    Mon Nov 05 19:33:27 CST 2018
    Mon Nov 05 19:33:27 CST 2018
    Mon Nov 05 19:33:27 CST 2018
    pool-1-thread-1-->0
    Mon Nov 05 19:33:27 CST 2018
    pool-1-thread-2-->1
    Mon Nov 05 19:33:28 CST 2018
    pool-1-thread-3-->2
    Mon Nov 05 19:33:29 CST 2018
    pool-1-thread-4-->3
    Mon Nov 05 19:33:30 CST 2018
    pool-1-thread-5-->4
    Mon Nov 05 19:33:31 CST 2018
     */
}

2. newFixedThreadPool

       newFixedThreadPool 具有固定数量的线程池,核心线程数等于最大线程数,线程最大空闲时间为0,执行完毕即销毁,超出最大线程数进行等待。高并发下控制性能。


public class NewFixedThreadPoolDemo {

    public static void main(String[] args) {

        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
        System.out.println(Runtime.getRuntime().availableProcessors());//定长线程池的大小最好根据系统资源进行设置

        for (int i = 0; i < 10; i ++) {
            int index = i;

            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "-->" + index);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        fixedThreadPool.shutdown();
    }

    /*
    4
    pool-1-thread-1-->0
    pool-1-thread-2-->1
    pool-1-thread-3-->2
    pool-1-thread-4-->3
    pool-1-thread-1-->4
    pool-1-thread-4-->5
    pool-1-thread-3-->6
    pool-1-thread-2-->7
    pool-1-thread-1-->8
    pool-1-thread-4-->9
     */
}

3. newScheduledThreadPool

       newScheduledThreadPool 具有时间调度特性的线程池,必须初始化核心线程数,底层使用DelayedWorkQueue实现延迟特性。


public class NewScheduledThreadPoolDemo {

    public static void main(String[] args) {

        testOne();
        //testTwo();
        //testThree();
    }

    /**
     * 延迟时间不包含执行时间
     */
    public static void testThree() {

        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

        ScheduledFuture<?> scheduledFuture = scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                System.out.println(new Date(System.currentTimeMillis()));
                try {
                    Thread.sleep(3000); //注意睡眠时间为1000s和3000s时的差异
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
                System.out.println(new Date(System.currentTimeMillis()));
                System.out.println("\n");
            }
        }, 0 , 2, TimeUnit.SECONDS);

        /*
        ------------睡眠时间3000s------------
        *Mon Nov 05 19:53:09 CST 2018
        pool-1-thread-1
        Mon Nov 05 19:53:13 CST 2018
        *Mon Nov 05 19:53:15 CST 2018
        pool-1-thread-1
        Mon Nov 05 19:53:18 CST 2018
        *Mon Nov 05 19:53:20 CST 2018
        pool-1-thread-2
        Mon Nov 05 19:53:23 CST 2018
        *Mon Nov 05 19:53:25 CST 2018
        pool-1-thread-2
        Mon Nov 05 19:53:28 CST 2018
        *Mon Nov 05 19:53:30 CST 2018
        ......

        ------------睡眠时间1000s------------
        *Mon Nov 05 19:54:08 CST 2018
        pool-1-thread-1
        Mon Nov 05 19:54:10 CST 2018
        *Mon Nov 05 19:54:12 CST 2018
        pool-1-thread-1
        Mon Nov 05 19:54:13 CST 2018
        *Mon Nov 05 19:54:15 CST 2018
        pool-1-thread-2
        Mon Nov 05 19:54:16 CST 2018
        *Mon Nov 05 19:54:18 CST 2018
        pool-1-thread-2
        Mon Nov 05 19:54:19 CST 2018
        *Mon Nov 05 19:54:21 CST 2018
        ......
         */
    }

    /**
     * 延迟时间包含执行时间
     */
    public static void testTwo() {

        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

        ScheduledFuture<?> scheduledFuture = scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(new Date(System.currentTimeMillis()));
                try {
                    Thread.sleep(3000); //注意睡眠时间为1000s和3000s时的差异
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
                System.out.println(new Date(System.currentTimeMillis()));
                System.out.println("\n");
            }
        }, 0, 2, TimeUnit.SECONDS);

        /*
        ------------睡眠时间3000s------------
        #Mon Nov 05 19:47:37 CST 2018
        pool-1-thread-1
        Mon Nov 05 19:47:40 CST 2018
        #Mon Nov 05 19:47:40 CST 2018
        pool-1-thread-1
        Mon Nov 05 19:47:43 CST 2018
        #Mon Nov 05 19:47:43 CST 2018
        pool-1-thread-2
        Mon Nov 05 19:47:46 CST 2018
        #Mon Nov 05 19:47:46 CST 2018
        pool-1-thread-2
        Mon Nov 05 19:47:49 CST 2018
        #Mon Nov 05 19:47:49 CST 2018
        ......

        ------------睡眠时间1000s------------
        #Mon Nov 05 19:49:27 CST 2018
        pool-1-thread-1
        Mon Nov 05 19:49:28 CST 2018
        #Mon Nov 05 19:49:29 CST 2018
        pool-1-thread-1
        Mon Nov 05 19:49:30 CST 2018
        #Mon Nov 05 19:49:31 CST 2018
        pool-1-thread-2
        Mon Nov 05 19:49:32 CST 2018
        #Mon Nov 05 19:49:33 CST 2018
        pool-1-thread-2
        Mon Nov 05 19:49:34 CST 2018
        #Mon Nov 05 19:49:35 CST 2018
        ......
         */
    }

    /**
     *  ScheduledExecutorService比Timer更安全,功能更强大
     */
    public static void testOne() {

        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);

        for (int i = 0; i < 10; i ++) {
            int finalI = i;
            scheduledThreadPool.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(new Date(System.currentTimeMillis()));
                    System.out.println(Thread.currentThread().getName() + "-->" + finalI);
                    System.out.println(new Date(System.currentTimeMillis()));
                    System.out.println("\n");
                }
            }, i, TimeUnit.SECONDS);
        }

        scheduledThreadPool.shutdown();

        /*
        Mon Nov 05 19:45:20 CST 2018
        pool-1-thread-1-->0
        Mon Nov 05 19:45:20 CST 2018
        Mon Nov 05 19:45:21 CST 2018
        pool-1-thread-2-->1
        Mon Nov 05 19:45:21 CST 2018
        Mon Nov 05 19:45:22 CST 2018
        pool-1-thread-3-->2
        Mon Nov 05 19:45:22 CST 2018
        Mon Nov 05 19:45:23 CST 2018
        pool-1-thread-1-->3
        Mon Nov 05 19:45:23 CST 2018
        Mon Nov 05 19:45:24 CST 2018
        pool-1-thread-2-->4
        Mon Nov 05 19:45:24 CST 2018
         */
    }
}

4. newSingleThreadExecutor

       newSingleThreadExecutor 核心线程数与最大线程数均为1,用于不需要并发顺序执行的场景。


public class NewSingleThreadExecutorDemo {

    public static void main(String[] args) {

        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

        for (int i = 0; i < 10; i ++ ) {
            int index = i;

            singleThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "-->" + index);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        singleThreadPool.shutdown();
    }

    /*
     pool-1-thread-1-->0
     pool-1-thread-1-->1
     pool-1-thread-1-->2
     pool-1-thread-1-->3
     pool-1-thread-1-->4
     */
}

5. ThreadPoolExecutor

       四种线程池都是通过Executors类创建的, 底层创建的都是ThreadPoolExecutor类, 可以构建自己需要的线程类(自定义)。


public class ThreadPoolExecutorDemo {

    public static void main(String[] args) {

        ThreadPoolExecutor executorService =
                new ThreadPoolExecutor(6, 6, 60L, TimeUnit.SECONDS,
                        new LinkedBlockingDeque<>(1), new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        //自定义拒绝策略
                        System.out.println(r);
                    }
                });

        for (int i = 0; i < 10; i++) {
            int index = i;

            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "-->" + index);
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }

        executorService.shutdown();
    }

    /*
    -------------new LinkedBlockingDeque<>(1)------------
    cn.edu.xidian.ictt.yk.proficient.ThreadPoolExecutorDemo$2@6d6f6e28
    pool-1-thread-1-->0
    cn.edu.xidian.ictt.yk.proficient.ThreadPoolExecutorDemo$2@135fbaa4
    pool-1-thread-2-->1
    pool-1-thread-1-->2

    -------------new LinkedBlockingDeque<>(5)------------
    pool-1-thread-1-->0
    pool-1-thread-2-->1
    pool-1-thread-2-->2
    pool-1-thread-1-->3
    pool-1-thread-1-->4
     */
}

       程序说明:当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略。 线程池pool的“最大池大小”和“核心池大小”都为2(THREADS_SIZE),这意味着“线程池能同时运行的任务数量最大只能是2”。 线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。

6. 拒绝策略

       拒绝策略通常有四种策略。


// 辅助程序
public class MyRunnable implements Runnable {

    private String name;
    public MyRunnable(String name) {
        this.name = name;
    }
    @Override
    public void run() {
        try {
            System.out.println(this.name + " is running.");
            Thread.sleep(100);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • ThreadPoolExecutor.AbortPolicy:

       丢弃任务并抛出RejectedExecution异常。根据分析execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到池中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!


public class AbortPolicyDemo {

    private static final int THREADS_SIZE = 1;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,
                0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));

        // 设置线程池的拒绝策略为"抛出异常"
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

        try {
            // 新建10个任务,并将它们添加到线程池中。
            for (int i = 0; i < 10; i++) {
                Runnable myrun = new MyRunnable("task-"+i);
                pool.execute(myrun);
            }
        } catch (RejectedExecutionException e) {
            e.printStackTrace();
            // 关闭线程池
            pool.shutdown();
        }
    }

    /*
    task-0 is running.
    java.util.concurrent.RejectedExecutionException: Task cn.edu.xidian.ictt.yk.proficient.MyRunnable@7f31245a rejected from ...
    task-1 is running.
     */
}
  • ThreadPoolExecutor.DiscardPolicy:

       直接丢弃任务,但是不抛出异常。


public class DiscardPolicyDemo {

    private static final int THREADS_SIZE = 1;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,
                0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));

        // 设置线程池的拒绝策略为"丢弃"
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

        // 新建10个任务,并将它们添加到线程池中。
        for (int i = 0; i < 10; i++) {
            Runnable myrun = new MyRunnable("task-" + i);
            pool.execute(myrun);
        }

        // 关闭线程池
        pool.shutdown();
    }

    /*
    task-0 is running.
    task-1 is running.
     */
}
  • ThreadPoolExecutor.DiscardOldestPolicy:

       丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。当有任务添加到线程池被拒绝时,线程池会丢弃阻塞队列中最前面的任务,然后将被拒绝的任务添加到末尾。


public class DiscardOldestPolicyDemo {

    private static final int THREADS_SIZE = 1;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,
                0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));

        // 设置线程池的拒绝策略为"DiscardOldestPolicy"
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

        // 新建10个任务,并将它们添加到线程池中。
        for (int i = 0; i < 10; i++) {
            Runnable myrun = new MyRunnable("task-"+i);
            pool.execute(myrun);
        }

        // 关闭线程池
        pool.shutdown();
    }

    /*
    task-0 is running.
    task-9 is running.
     */
}
  • ThreadPoolExecutor.CallerRunsPolicy:

       由调用线程处理该任务。当有任务添加到线程池被拒绝时,线程池会将被拒绝的任务添加到“线程池正在运行的线程”中去运行。


public class CallerRunsPolicyDemo {

    private static final int THREADS_SIZE = 1;
    private static final int CAPACITY = 1;

    public static void main(String[] args) throws Exception {

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,
                0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));
        // 设置线程池的拒绝策略为"CallerRunsPolicy"
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        // 新建10个任务,并将它们添加到线程池中。
        for (int i = 0; i < 10; i++) {
            Runnable myrun = new MyRunnable("task-"+i);
            pool.execute(myrun);
        }

        // 关闭线程池
        pool.shutdown();
    }

    /*
    task-2 is running.
    task-0 is running.
    task-3 is running.
    task-1 is running.
    task-5 is running.
    task-4 is running.
    task-6 is running.
    task-8 is running.
    task-7 is running.
    task-9 is running.
     */
}
代码下载参见https://github.com/HengYk/ConcurrentDemo