第三章 JAVA 并发编程(精通篇之线程池)
1. newCachedThreadPool
newCachedThreadPool是具有
缓存性质
的线程池,线程最大空闲时间60s,线程可重复利用(缓存特性),没有最大线程数限制。任务耗时端,数量大。
public class NewCachedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService cacheThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i ++) {
int index = i;
// try {
// Thread.sleep(index * 1000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println(new Date(System.currentTimeMillis()));
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "-->" + index);
System.out.println(new Date(System.currentTimeMillis()));
System.out.println("\n");
}
};
cacheThreadPool.execute(runnable);
}
cacheThreadPool.shutdown();
}
/*
Mon Nov 05 19:33:27 CST 2018
Mon Nov 05 19:33:27 CST 2018
Mon Nov 05 19:33:27 CST 2018
Mon Nov 05 19:33:27 CST 2018
Mon Nov 05 19:33:27 CST 2018
pool-1-thread-1-->0
Mon Nov 05 19:33:27 CST 2018
pool-1-thread-2-->1
Mon Nov 05 19:33:28 CST 2018
pool-1-thread-3-->2
Mon Nov 05 19:33:29 CST 2018
pool-1-thread-4-->3
Mon Nov 05 19:33:30 CST 2018
pool-1-thread-5-->4
Mon Nov 05 19:33:31 CST 2018
*/
}
2. newFixedThreadPool
newFixedThreadPool 具有
固定数量
的线程池,核心线程数等于最大线程数,线程最大空闲时间为0,执行完毕即销毁,超出最大线程数进行等待。高并发下控制性能。
public class NewFixedThreadPoolDemo {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(4);
System.out.println(Runtime.getRuntime().availableProcessors());//定长线程池的大小最好根据系统资源进行设置
for (int i = 0; i < 10; i ++) {
int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "-->" + index);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
fixedThreadPool.shutdown();
}
/*
4
pool-1-thread-1-->0
pool-1-thread-2-->1
pool-1-thread-3-->2
pool-1-thread-4-->3
pool-1-thread-1-->4
pool-1-thread-4-->5
pool-1-thread-3-->6
pool-1-thread-2-->7
pool-1-thread-1-->8
pool-1-thread-4-->9
*/
}
3. newScheduledThreadPool
newScheduledThreadPool 具有
时间调度特性
的线程池,必须初始化核心线程数,底层使用DelayedWorkQueue实现延迟特性。
public class NewScheduledThreadPoolDemo {
public static void main(String[] args) {
testOne();
//testTwo();
//testThree();
}
/**
* 延迟时间不包含执行时间
*/
public static void testThree() {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
ScheduledFuture<?> scheduledFuture = scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(new Date(System.currentTimeMillis()));
try {
Thread.sleep(3000); //注意睡眠时间为1000s和3000s时的差异
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
System.out.println(new Date(System.currentTimeMillis()));
System.out.println("\n");
}
}, 0 , 2, TimeUnit.SECONDS);
/*
------------睡眠时间3000s------------
*Mon Nov 05 19:53:09 CST 2018
pool-1-thread-1
Mon Nov 05 19:53:13 CST 2018
*Mon Nov 05 19:53:15 CST 2018
pool-1-thread-1
Mon Nov 05 19:53:18 CST 2018
*Mon Nov 05 19:53:20 CST 2018
pool-1-thread-2
Mon Nov 05 19:53:23 CST 2018
*Mon Nov 05 19:53:25 CST 2018
pool-1-thread-2
Mon Nov 05 19:53:28 CST 2018
*Mon Nov 05 19:53:30 CST 2018
......
------------睡眠时间1000s------------
*Mon Nov 05 19:54:08 CST 2018
pool-1-thread-1
Mon Nov 05 19:54:10 CST 2018
*Mon Nov 05 19:54:12 CST 2018
pool-1-thread-1
Mon Nov 05 19:54:13 CST 2018
*Mon Nov 05 19:54:15 CST 2018
pool-1-thread-2
Mon Nov 05 19:54:16 CST 2018
*Mon Nov 05 19:54:18 CST 2018
pool-1-thread-2
Mon Nov 05 19:54:19 CST 2018
*Mon Nov 05 19:54:21 CST 2018
......
*/
}
/**
* 延迟时间包含执行时间
*/
public static void testTwo() {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
ScheduledFuture<?> scheduledFuture = scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(new Date(System.currentTimeMillis()));
try {
Thread.sleep(3000); //注意睡眠时间为1000s和3000s时的差异
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
System.out.println(new Date(System.currentTimeMillis()));
System.out.println("\n");
}
}, 0, 2, TimeUnit.SECONDS);
/*
------------睡眠时间3000s------------
#Mon Nov 05 19:47:37 CST 2018
pool-1-thread-1
Mon Nov 05 19:47:40 CST 2018
#Mon Nov 05 19:47:40 CST 2018
pool-1-thread-1
Mon Nov 05 19:47:43 CST 2018
#Mon Nov 05 19:47:43 CST 2018
pool-1-thread-2
Mon Nov 05 19:47:46 CST 2018
#Mon Nov 05 19:47:46 CST 2018
pool-1-thread-2
Mon Nov 05 19:47:49 CST 2018
#Mon Nov 05 19:47:49 CST 2018
......
------------睡眠时间1000s------------
#Mon Nov 05 19:49:27 CST 2018
pool-1-thread-1
Mon Nov 05 19:49:28 CST 2018
#Mon Nov 05 19:49:29 CST 2018
pool-1-thread-1
Mon Nov 05 19:49:30 CST 2018
#Mon Nov 05 19:49:31 CST 2018
pool-1-thread-2
Mon Nov 05 19:49:32 CST 2018
#Mon Nov 05 19:49:33 CST 2018
pool-1-thread-2
Mon Nov 05 19:49:34 CST 2018
#Mon Nov 05 19:49:35 CST 2018
......
*/
}
/**
* ScheduledExecutorService比Timer更安全,功能更强大
*/
public static void testOne() {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
for (int i = 0; i < 10; i ++) {
int finalI = i;
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println(new Date(System.currentTimeMillis()));
System.out.println(Thread.currentThread().getName() + "-->" + finalI);
System.out.println(new Date(System.currentTimeMillis()));
System.out.println("\n");
}
}, i, TimeUnit.SECONDS);
}
scheduledThreadPool.shutdown();
/*
Mon Nov 05 19:45:20 CST 2018
pool-1-thread-1-->0
Mon Nov 05 19:45:20 CST 2018
Mon Nov 05 19:45:21 CST 2018
pool-1-thread-2-->1
Mon Nov 05 19:45:21 CST 2018
Mon Nov 05 19:45:22 CST 2018
pool-1-thread-3-->2
Mon Nov 05 19:45:22 CST 2018
Mon Nov 05 19:45:23 CST 2018
pool-1-thread-1-->3
Mon Nov 05 19:45:23 CST 2018
Mon Nov 05 19:45:24 CST 2018
pool-1-thread-2-->4
Mon Nov 05 19:45:24 CST 2018
*/
}
}
4. newSingleThreadExecutor
newSingleThreadExecutor
核心线程数与最大线程数均为1
,用于不需要并发顺序执行的场景。
public class NewSingleThreadExecutorDemo {
public static void main(String[] args) {
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i ++ ) {
int index = i;
singleThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "-->" + index);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
singleThreadPool.shutdown();
}
/*
pool-1-thread-1-->0
pool-1-thread-1-->1
pool-1-thread-1-->2
pool-1-thread-1-->3
pool-1-thread-1-->4
*/
}
5. ThreadPoolExecutor
四种线程池都是通过Executors类创建的, 底层创建的都是ThreadPoolExecutor类, 可以构建自己需要的线程类(
自定义
)。
public class ThreadPoolExecutorDemo {
public static void main(String[] args) {
ThreadPoolExecutor executorService =
new ThreadPoolExecutor(6, 6, 60L, TimeUnit.SECONDS,
new LinkedBlockingDeque<>(1), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//自定义拒绝策略
System.out.println(r);
}
});
for (int i = 0; i < 10; i++) {
int index = i;
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "-->" + index);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
executorService.shutdown();
}
/*
-------------new LinkedBlockingDeque<>(1)------------
cn.edu.xidian.ictt.yk.proficient.ThreadPoolExecutorDemo$2@6d6f6e28
pool-1-thread-1-->0
cn.edu.xidian.ictt.yk.proficient.ThreadPoolExecutorDemo$2@135fbaa4
pool-1-thread-2-->1
pool-1-thread-1-->2
-------------new LinkedBlockingDeque<>(5)------------
pool-1-thread-1-->0
pool-1-thread-2-->1
pool-1-thread-2-->2
pool-1-thread-1-->3
pool-1-thread-1-->4
*/
}
程序说明:当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务
拒绝策略
。 线程池pool的“最大池大小”和“核心池大小”都为2(THREADS_SIZE),这意味着“线程池能同时运行的任务数量最大只能是2”。 线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。
6. 拒绝策略
拒绝策略通常有
四种
策略。
// 辅助程序
public class MyRunnable implements Runnable {
private String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println(this.name + " is running.");
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
- ThreadPoolExecutor.AbortPolicy:
丢弃任务并抛出RejectedExecution异常。根据分析execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到池中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!
public class AbortPolicyDemo {
private static final int THREADS_SIZE = 1;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,
0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));
// 设置线程池的拒绝策略为"抛出异常"
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
try {
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < 10; i++) {
Runnable myrun = new MyRunnable("task-"+i);
pool.execute(myrun);
}
} catch (RejectedExecutionException e) {
e.printStackTrace();
// 关闭线程池
pool.shutdown();
}
}
/*
task-0 is running.
java.util.concurrent.RejectedExecutionException: Task cn.edu.xidian.ictt.yk.proficient.MyRunnable@7f31245a rejected from ...
task-1 is running.
*/
}
- ThreadPoolExecutor.DiscardPolicy:
直接丢弃任务,但是不抛出异常。
public class DiscardPolicyDemo {
private static final int THREADS_SIZE = 1;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,
0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));
// 设置线程池的拒绝策略为"丢弃"
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < 10; i++) {
Runnable myrun = new MyRunnable("task-" + i);
pool.execute(myrun);
}
// 关闭线程池
pool.shutdown();
}
/*
task-0 is running.
task-1 is running.
*/
}
- ThreadPoolExecutor.DiscardOldestPolicy:
丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。当有任务添加到线程池被拒绝时,线程池会丢弃阻塞队列中最前面的任务,然后将被拒绝的任务添加到末尾。
public class DiscardOldestPolicyDemo {
private static final int THREADS_SIZE = 1;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,
0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));
// 设置线程池的拒绝策略为"DiscardOldestPolicy"
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < 10; i++) {
Runnable myrun = new MyRunnable("task-"+i);
pool.execute(myrun);
}
// 关闭线程池
pool.shutdown();
}
/*
task-0 is running.
task-9 is running.
*/
}
- ThreadPoolExecutor.CallerRunsPolicy:
由调用线程处理该任务。当有任务添加到线程池被拒绝时,线程池会将被拒绝的任务添加到“线程池正在运行的线程”中去运行。
public class CallerRunsPolicyDemo {
private static final int THREADS_SIZE = 1;
private static final int CAPACITY = 1;
public static void main(String[] args) throws Exception {
// 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。
ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE,
0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));
// 设置线程池的拒绝策略为"CallerRunsPolicy"
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 新建10个任务,并将它们添加到线程池中。
for (int i = 0; i < 10; i++) {
Runnable myrun = new MyRunnable("task-"+i);
pool.execute(myrun);
}
// 关闭线程池
pool.shutdown();
}
/*
task-2 is running.
task-0 is running.
task-3 is running.
task-1 is running.
task-5 is running.
task-4 is running.
task-6 is running.
task-8 is running.
task-7 is running.
task-9 is running.
*/
}