第二章 JAVA并发编程(进阶篇)
1. Volatile关键字
作用:强制线程到共享内存中读取数据,而不从线程的工作内存中读取,从而使变量在多个线程间可见。
public class DemoThread13 {
/**
* 共享内存的可见性
*/
private List<String> list = new ArrayList<>();
private volatile boolean canGet = false;
private void put() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
list.add("A");
System.out.println("线程" + Thread.currentThread().getName() + "添加第" + i + "个元素");
if (i == 5) {
canGet = true;
System.out.println("线程" + Thread.currentThread().getName() + "发出通知");
}
}
}
private void get() {
while (true) {
if (canGet) {
for (String s : list) {
System.out.println("线程" + Thread.currentThread().getName() + "获取元素:" + s);
}
break;
}
}
}
public static void main(String[] args) {
DemoThread13 d13 = new DemoThread13();
new Thread(new Runnable() {
@Override
public void run() {
d13.put();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
d13.get();
}
}).start();
}
/*
-----------属性canGet添加volatile(主程序可终止)-----------
线程Thread-0添加第0个元素
线程Thread-0添加第1个元素
线程Thread-0添加第2个元素
线程Thread-0添加第3个元素
线程Thread-0添加第4个元素
线程Thread-0添加第5个元素
线程Thread-0发出通知
线程Thread-1获取元素:A
线程Thread-1获取元素:A
线程Thread-1获取元素:A
线程Thread-1获取元素:A
线程Thread-1获取元素:A
线程Thread-1获取元素:A
线程Thread-0添加第6个元素
线程Thread-0添加第7个元素
线程Thread-0添加第8个元素
线程Thread-0添加第9个元素
-----------属性canGet不添加volatile(Thread-1不终止)-----------
线程Thread-0添加第0个元素
线程Thread-0添加第1个元素
线程Thread-0添加第2个元素
线程Thread-0添加第3个元素
线程Thread-0添加第4个元素
线程Thread-0添加第5个元素
线程Thread-0发出通知
线程Thread-0添加第6个元素
线程Thread-0添加第7个元素
线程Thread-0添加第8个元素
线程Thread-0添加第9个元素
-----------结论-----------
volatile可以保证共享内存的可见性。
*/
}
不足:volatile无法保证原子性,volatile属于轻量级的同步,性能比synchronized强很多(不加锁),但是只保证线程间的可见性,并不能替代synchronized的同步功能,netty框架中大量使用了volatile。
public class DemoThread14 implements Runnable {
/**
* 数据操作的原子性
*/
private static volatile int sum = 0;
private static void add() {
System.out.println(Thread.currentThread().getName() + "初始化sum = " + sum);
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
sum++;
}
System.out.println(Thread.currentThread().getName() + "计算后sum = " + sum);
}
@Override
public void run() {
add();
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
es.submit(new DemoThread14());
}
// shutdown()被调用之后,先前提交的任务将被执行,但不会接收新的任务
es.shutdown();
//保证10个子线程执行完毕再输出
while (true) {
if (es.isTerminated()) {
System.out.println("sum最终 = " + sum);
if (sum == 100) {
System.out.println(sum + " = ok");
} else {
System.out.println(sum + " = no");
}
break;
}
}
}
/*
-----------sum++之前不填加添加延迟-----------
pool-1-thread-1初始化sum = 0
pool-1-thread-1计算后sum = 10
pool-1-thread-2初始化sum = 10
pool-1-thread-2计算后sum = 20
pool-1-thread-3初始化sum = 20
pool-1-thread-3计算后sum = 30
pool-1-thread-4初始化sum = 30
pool-1-thread-4计算后sum = 40
pool-1-thread-6初始化sum = 30
pool-1-thread-5初始化sum = 30
pool-1-thread-6计算后sum = 50
pool-1-thread-7初始化sum = 40
pool-1-thread-5计算后sum = 60
pool-1-thread-7计算后sum = 70
pool-1-thread-8初始化sum = 70
pool-1-thread-8计算后sum = 80
pool-1-thread-9初始化sum = 80
pool-1-thread-9计算后sum = 90
pool-1-thread-10初始化sum = 90
pool-1-thread-10计算后sum = 100
sum最终 = 100
100 = ok
-----------sum++之前填加添加延迟-----------
pool-1-thread-1初始化sum = 0
pool-1-thread-2初始化sum = 0
pool-1-thread-3初始化sum = 1
pool-1-thread-4初始化sum = 1
pool-1-thread-5初始化sum = 1
pool-1-thread-6初始化sum = 1
pool-1-thread-7初始化sum = 1
pool-1-thread-8初始化sum = 1
pool-1-thread-9初始化sum = 1
pool-1-thread-10初始化sum = 1
pool-1-thread-1计算后sum = 66
pool-1-thread-2计算后sum = 71
pool-1-thread-5计算后sum = 72
pool-1-thread-3计算后sum = 72
pool-1-thread-4计算后sum = 73
pool-1-thread-8计算后sum = 74
pool-1-thread-6计算后sum = 74
pool-1-thread-7计算后sum = 74
pool-1-thread-9计算后sum = 75
pool-1-thread-10计算后sum = 76
sum最终 = 76
76 = no
-----------结论-----------
volatile不能保证数据操作的原子性,因此无法保证线程安全。
*/
}
2. Volatile与Static区别
Static保证唯一性, 不保证一致性,多个实例共享一个静态变量。
Volatile保证一致性,不保证唯一性,多个实例有多个volatile变量。
3. Atomic类的原子性
AtomicInteger等原子类可以保证共享变量的原子性。
public class DemoThread15 implements Runnable{
private static AtomicInteger sum = new AtomicInteger(0);
private static void add() {
System.out.println(Thread.currentThread().getName() + "初始化sum = " + sum);
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
sum.addAndGet(1);
}
System.out.println(Thread.currentThread().getName() + "计算后sum = " + sum);
}
@Override
public void run() {
add();
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
es.submit(new DemoThread15());
}
// shutdown()被调用之后,先前提交的任务将被执行,但不会接收新的任务
es.shutdown();
//保证10个子线程执行完毕再输出
while (true) {
if (es.isTerminated()) {
System.out.println("sum最终 = " + sum);
if (sum.get() == 100) {
System.out.println(sum + " = ok");
} else {
System.out.println(sum + " = no");
}
break;
}
}
}
/*
-----------去掉volatile关键字,采用java.util.concurrent.atomic.*-----------
pool-1-thread-1初始化sum = 0
pool-1-thread-2初始化sum = 0
pool-1-thread-3初始化sum = 0
pool-1-thread-4初始化sum = 0
pool-1-thread-5初始化sum = 0
pool-1-thread-6初始化sum = 0
pool-1-thread-7初始化sum = 0
pool-1-thread-8初始化sum = 1
pool-1-thread-9初始化sum = 1
pool-1-thread-10初始化sum = 1
pool-1-thread-2计算后sum = 91
pool-1-thread-3计算后sum = 92
pool-1-thread-4计算后sum = 93
pool-1-thread-6计算后sum = 95
pool-1-thread-7计算后sum = 95
pool-1-thread-5计算后sum = 96
pool-1-thread-1计算后sum = 97
pool-1-thread-10计算后sum = 100
pool-1-thread-9计算后sum = 100
pool-1-thread-8计算后sum = 100
sum最终 = 100
100 = ok
-----------结论-----------
对比DemoThread14,我们发现Atomic类可以保证共享变量的原子性。
-----------特殊的-----------
Atomic类采用了CAS(checkAndSet)这种非锁机制。
*/
}
Atomic类不能保证成员方法的原子性。
public class DemoThread16 implements Runnable {
private static AtomicInteger sum = new AtomicInteger(0);
private synchronized static void add() {
sum.addAndGet(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
sum.addAndGet(9);
System.out.println(Thread.currentThread().getName() + "计算后sum = " + sum);
}
@Override
public void run() {
add();
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i ++) {
es.submit(new DemoThread16());
}
es.shutdown();
}
/*
-----------不添加synchronized关键字-----------
pool-1-thread-1计算后sum = 28
pool-1-thread-2计算后sum = 28
pool-1-thread-3计算后sum = 37
pool-1-thread-4计算后sum = 46
pool-1-thread-5计算后sum = 55
pool-1-thread-6计算后sum = 64
pool-1-thread-8计算后sum = 82
pool-1-thread-7计算后sum = 82
pool-1-thread-10计算后sum = 100
pool-1-thread-9计算后sum = 100
-----------添加synchronized关键字-----------
pool-1-thread-1计算后sum = 10
pool-1-thread-10计算后sum = 20
pool-1-thread-9计算后sum = 30
pool-1-thread-8计算后sum = 40
pool-1-thread-7计算后sum = 50
pool-1-thread-6计算后sum = 60
pool-1-thread-5计算后sum = 70
pool-1-thread-4计算后sum = 80
pool-1-thread-3计算后sum = 90
pool-1-thread-2计算后sum = 100
-----------结论-----------
只Atomic类不能保证成员方法的原子性,但配合synchronized关键字可保证原子性操作。
*/
}
Atomic类采用了CAS(CheckAndSet)这种锁机制。
4. ThreadLocal
使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
public class DemoThread21 {
/**
* System.out.println()语句之后的注释表示程序的执行顺序。
*
* @param args
*/
public static void main(String[] args) {
ThreadLocal<Integer> th = new ThreadLocal<>();
new Thread(new Runnable() {
@Override
public void run() {
th.set(10);
System.out.println(Thread.currentThread().getName() + ":" + th.get()); // 1
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + ":" + th.get());// 4
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
Integer v = th.get();
System.out.println(Thread.currentThread().getName() + ":" + v); // 2
th.set(100);
System.out.println(Thread.currentThread().getName() + ":" + th.get()); // 3
}
}).start();
}
/*
Thread-0:10
Thread-1:null
Thread-1:100
Thread-0:10
-----------结论-----------
ThreadLocal为每一个线程创建一个独立的副本,线程中的数据操作互不干扰。
}
5. 同步类容器
Vector、HashTable等古老的并发容器,都是使用Collections.synchronizedXXX等工厂方法创建的,并发状态下只能有一个线程访问容器对象,性能很低。
public class DemoThread26 implements Runnable{
// private static List<String> list = new ArrayList<>();
private static List<String> list = Collections.synchronizedList(new ArrayList<>());
private static void add() {
for (int i = 0; i < 10; i ++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
list.add("A");
}
}
@Override
public void run() {
add();
}
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i ++) {
es.submit(new DemoThread26());
}
es.shutdown();
while (true) {
if (es.isTerminated()) {
System.out.println("线程结束了");
System.out.println("list.size = " + list.size());
if (list.size() != 50) {
System.out.println("线程不安全");
} else {
System.out.println("线程安全");
}
break;
}
}
}
/*
-----------不添加Collections.synchronizedList()-----------
线程结束了
list.size = 39
线程不安全
-----------添加Collections.synchronizedList()-----------
线程结束了
list.size = 50
线程安全
-----------结论-----------
添加Collections.synchronizedXXX()可以快速将非线程安全的集合类改造成线程安全的,即并发状态下只能有一个线程访问容器对象,性能很低。
*/
}
6. 并发类容器
【JDK5.0】之后提供了多种并发类容器可以替代同步类容器,提升性能、吞吐量。ConcurrentHashMap替代HashMap、HashTable;ConcurrentSkipListMap替代TreeMap。
ConcurrentHashMap将hash表分为
16个segment
,每个segment单独进行锁控制,从而减小了锁的粒度,提升了性能。
public class DemoThread27 {
/**
* 通过并发下的运行时间对比ConcurrentHashMap和Hashtable的性能
*/
public static void testMapOne() {
// Hashtable<String, Integer> hash = new Hashtable<>();
ConcurrentHashMap<String, Integer> hash = new ConcurrentHashMap<>();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (int i = 0; i < 1000000; i++) {
hash.put("k" + i, i);
}
System.out.println(Thread.currentThread().getName() + ": " + (System.currentTimeMillis() - start));
}
}).start();
}
}
public static void testSkipListMapOne() {
// SortedMap<String, Integer> skipMap = new TreeMap<>();
// 性能低,线程安全
// SortedMap<String, Integer> skipMap = Collections.synchronizedSortedMap(new TreeMap<>());
// 性能高,线程安全
ConcurrentSkipListMap<String, Integer> skipMap = new ConcurrentSkipListMap<>();
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(0);
skipMap.put("k" + i, i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + ": " + (System.currentTimeMillis() - start));
// System.out.println(skipMap);
}
}).start();
}
}
/**
* ConcurrentHashMap用法
*/
public static void testMapTwo() {
ConcurrentHashMap<String, Integer> hashMap = new ConcurrentHashMap<>();
hashMap.put("d", 2);
hashMap.put("c", 4);
hashMap.put("e", 2);
hashMap.put("e", 3);
System.out.println(hashMap); // {c=4, d=2, e=3}
hashMap.putIfAbsent("d1", 1);
System.out.println(hashMap); // {c=4, d=2, e=3, d1=1}
}
/**
* ConcurrentSkipListMap用法
*/
public static void testSkipListMapTwo() {
ConcurrentSkipListMap<String, Integer> skipListMap = new ConcurrentSkipListMap<>();
skipListMap.put("d", 2);
skipListMap.put("c", 4);
skipListMap.put("e", 2);
skipListMap.put("e", 3);
System.out.println(skipListMap); // {c=4, d=2, e=3}
skipListMap.putIfAbsent("d1", 1);
System.out.println(skipListMap); // {c=4, d=2, d1=1, e=3}
}
public static void main(String[] args) {
// testMapOne();
// testSkipListMapOne();
testMapTwo();
// testSkipListMapTwo();
}
/*
-----------古老的并发容器Hashtable的执行时间-----------
Thread-6: 4249
Thread-4: 4466
Thread-3: 4582
Thread-2: 4647
Thread-9: 4575
Thread-5: 4576
Thread-8: 4593
Thread-0: 4710
Thread-1: 4726
Thread-7: 4638
-----------改进的并发容器ConcurrentHashMap的执行时间-----------
Thread-0: 2582
Thread-7: 2703
Thread-9: 2763
Thread-4: 2833
Thread-3: 2900
Thread-5: 2883
Thread-1: 3010
Thread-2: 3032
Thread-8: 2901
Thread-6: 2928
Thread-0: 4313
Thread-2: 4312
Thread-3: 4270
Thread-1: 4448
Thread-9: 4259
Thread-5: 4225
Thread-7: 4357
Thread-6: 4358
Thread-8: 4394
Thread-4: 4366
-----------结论1-----------
执行结果表明,ConcurrentHashMap大多数情况下会比Hashtable优秀一些。
-----------使用SortedMap-----------
Exception in thread "Thread-1" java.lang.NullPointerException
Thread-6: 79
Thread-9: 79
Thread-7: 79
Thread-2: 81
Thread-8: 80
Thread-4: 81
Thread-5: 82
Thread-3: 82
Thread-0: 84
-----------使用Collections.synchronizedSortedMap(SortedMap)-----------
Thread-0: 166
Thread-8: 157
Thread-9: 157
Thread-1: 166
Thread-2: 165
Thread-3: 166
Thread-5: 166
Thread-6: 165
Thread-7: 165
Thread-4: 166
-----------使用ConcurrentSkipListMap-----------
Thread-3: 71
Thread-1: 74
Thread-7: 71
Thread-5: 72
Thread-2: 74
Thread-8: 72
Thread-9: 72
Thread-6: 73
Thread-0: 77
Thread-4: 92
-----------结论2-----------
直接使用SortedMap性能高但线程不安全;
添加锁Collections.synchronizedSortedMap(SortedMap)线程安全但性能降低;
使用使用ConcurrentSkipListMap既保证线程安全又提高了性能。
*/
}
Copy On Write容器,简称COW;写时复制容器,向容器中添加元素时,先将容器Copy出一个新容器,然后将元素添加到新容器中,再将原容器的引用指向新容器。并发读的时候不需要锁定容器,因为原容器没有变化,使用的是一种读写分离的思想。但是由于每次更新都会复制新容器,所以如果数据量较大,并且更新操作频繁则对内存消耗很高,建议在高并发读的场景下使用。
CopyOnWriteArraySet
是基于CopyOnWriteArrayList
实现的,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法, adIfAbsent方法同样采用锁保护,并创建一个新的大小+1的Object数组。遍历当前Object数组,如Object数组中已有了当前元素,则直接返回,如果没有则放入Object数组的尾部,并返回。从以上分析可见,CopyOnWriteArraySet在add时每次都要进行数组的遍历,因此其性能会低于CopyOnWriteArrayList。
public class DemoThread28 {
public static void testOne() {
CopyOnWriteArrayList<Integer> al = new CopyOnWriteArrayList<>();
al.add(1);
al.add(4);
al.add(2);
al.addIfAbsent(3);
al.addIfAbsent(5);
System.out.println(al); // [1, 4, 2, 3, 5]
al.add(2);
System.out.println(al); // [1, 4, 2, 3, 5, 2]
}
public static void testTwo() {
CopyOnWriteArraySet<Integer> as = new CopyOnWriteArraySet<>();
as.add(1);
as.add(4);
as.add(2);
as.add(3);
System.out.println(as); // [1, 4, 2, 3]
as.add(2);
System.out.println(as); // [1, 4, 2, 3]
}
public static void main(String[] args) {
// testOne();
testTwo();
}
}
7. 并发-无阻塞队列
ConcurrentLinkedQueue并发无阻塞队列,BlockingQueue并发阻塞队列,均实现自Queue接口。
ConcurrentLinkedQueue无阻塞、无锁、高性能、无界、线程安全,性能优于BlockingQueue、不允许null值
。
public class DemoThread29 {
public static void main(String[] args) {
ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<>();
clq.add(1);
clq.add(2);
clq.offer(3);
clq.offer(4);
System.out.println(clq); // [1, 2, 3, 4]
System.out.println(clq.peek()); // 1
System.out.println(clq.size()); // 4
System.out.println(clq.poll()); // 1
System.out.println(clq.size()); // 3
System.out.println(clq.poll()); // 2
System.out.println(clq.poll()); // 3
System.out.println(clq.poll()); // 4
System.out.println(clq.size()); // 0
System.out.println(clq.peek()); // null
System.out.println(clq.poll()); // null
}
}
8. 并发-阻塞队列
- a) ArrayBlockingQueue
基于数组实现的阻塞有界队列、创建时可指定长度,内部实现维护了一个
定长数组
用于缓存数据,内部没有采用读写分离,写入和读取数据不能同时进行,不允许null值。
public class DemoThread30 {
public static void testAdd() {
ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(3);
abq.add(1);
abq.offer(2);
try {
abq.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// abq.add(8); // 如果队列满了,则抛出异常。
System.out.println(abq); // [1, 2, 3]
abq.offer(4); // 如果队列满了,不阻塞,不抛出异常。
System.out.println(abq); // [1, 2, 3]
try {
//可设置最大阻塞时间timeout。超时之后如果队列还是满的,不阻塞,不抛出异常。
abq.offer(5, 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(abq); // [1, 2, 3]
try {
abq.put(6); // 如果队列是满的,则永远阻塞下去。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(abq); // 无输出...
System.out.println("over"); // 无输出...
}
public static void testTakeOne() {
ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(2);
abq.add(2);
abq.add(4);
System.out.println(abq); // [2, 4]
System.out.println(abq.peek()); // 2 获取头部元素不移除。
System.out.println(abq); // [2, 4]
System.out.println(abq.poll()); // 2 获取头部元素并移除。
System.out.println(abq); // [4]
try {
System.out.println(abq.take()); // 4 获取头部元素并移除。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(abq); // []
System.out.println(abq.poll()); // null 如果队列为空,不阻塞,不抛异常。
System.out.println(abq); // []
try {
// 可设置阻塞时间timeout。超时之后如果队列依然为空,不阻塞,不抛异常。
System.out.println(abq.poll(2, TimeUnit.SECONDS)); // null
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(abq); // []
try {
System.out.println(abq.take()); // 如果队列为空,则永远阻塞,不抛异常。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(abq); // 无输出...
System.out.println("over"); // 无输出...
}
public static void testTakeTwo() {
ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(2);
abq.add(1);
abq.add(2);
ArrayList<Integer> al = new ArrayList<>();
abq.drainTo(al, 1); // 取abq中指定个数的元素到al中,并从abq中移除。
System.out.println(al); // [1]
System.out.println(abq); // [2]
}
public static void testTakeThree() {
ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(2);
abq.add(1);
abq.add(2);
ArrayList<Integer> al = new ArrayList<>();
abq.drainTo(al); // 取abq中全部元素到al中,并从abq中移除。
System.out.println(abq); // []
System.out.println(al); // [1, 2]
ArrayList<Integer> arrayList = new ArrayList<>();
abq.drainTo(arrayList); // 当队列为空时,不阻塞,不抛异常。
System.out.println(abq); // []
System.out.println(arrayList); // []
}
public static void main(String[] args) {
// testAdd();
// testTakeOne();
// testTakeTwo();
testTakeThree();
}
}
- b) LinkedBlockingQueue
基于
链表
的阻塞队列,内部维护一个链表存储缓存数据,支持写入和读取的并发操作,创建时可指定长度也可以不指定,不指定时代表无界队列,不允许null值。
public class DemoThread31 {
public static void testAdd() {
LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<>(3);
lbq.add(1);
lbq.offer(2);
try {
lbq.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// lbq.add(8); // 如果队列满了,则抛出异常。
System.out.println(lbq); // [1, 2, 3]
lbq.offer(4); // 如果队列满了,不阻塞,不抛出异常。
System.out.println(lbq); // [1, 2, 3]
try {
//可设置最大阻塞时间timeout。超时之后如果队列还是满的,不阻塞,不抛出异常。
lbq.offer(5, 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(lbq); // [1, 2, 3]
try {
lbq.put(6); // 如果队列是满的,则永远阻塞下去。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(lbq); // 无输出...
System.out.println("over"); // 无输出...
}
public static void testTakeOne() {
LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<>();
lbq.add(2);
lbq.add(4);
System.out.println(lbq); // [2, 4]
System.out.println(lbq.peek()); // 2 获取头部元素不移除。
System.out.println(lbq); // [2, 4]
System.out.println(lbq.poll()); // 2 获取头部元素并移除。
System.out.println(lbq); // [4]
try {
System.out.println(lbq.take()); // 4 获取头部元素并移除。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(lbq); // []
System.out.println(lbq.poll()); // null 如果队列为空,不阻塞,不抛异常。
System.out.println(lbq); // []
try {
// 可设置阻塞时间timeout。超时之后如果队列依然为空,不阻塞,不抛异常。
System.out.println(lbq.poll(2, TimeUnit.SECONDS)); // null
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(lbq); // []
try {
System.out.println(lbq.take()); // 如果队列为空,则永远阻塞,不抛异常。
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(lbq); // 无输出...
System.out.println("over"); // 无输出...
}
public static void testTakeTwo() {
LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<>();
lbq.add(1);
lbq.add(2);
ArrayList<Integer> al = new ArrayList<>();
lbq.drainTo(al, 1); // 取abq中指定个数的元素到al中,并从abq中移除。
System.out.println(al); // [1]
System.out.println(lbq); // [2]
}
public static void testTakeThree() {
LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<>();
lbq.add(1);
lbq.add(2);
ArrayList<Integer> al = new ArrayList<>();
lbq.drainTo(al); // 取abq中全部元素到al中,并从abq中移除。
System.out.println(lbq); // []
System.out.println(al); // [1, 2]
ArrayList<Integer> arrayList = new ArrayList<>();
lbq.drainTo(arrayList); // 当队列为空时,不阻塞,不抛异常。
System.out.println(lbq); // []
System.out.println(arrayList); // []
}
public static void main(String[] args) {
// testAdd();
// testTakeOne();
// testTakeTwo();
testTakeThree();
}
}
- c) SychronousQueue
没有任何容量
,必须先有线程先从队列中take,才能向queue中add数据,否则会抛出队列已满的异常。不能使用peek方法取数据,此方法底层没有实现,会直接返回null。
public class DemoThread32 {
public static void testOne() {
SynchronousQueue<Integer> sq = new SynchronousQueue<>();
sq.add(1); // java.lang.IllegalStateException: Queue full
}
public static void testTwo() {
SynchronousQueue<Integer> sq = new SynchronousQueue<>();
try {
sq.put(1); // 阻塞...
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void testThree() {
SynchronousQueue<Integer> sq = new SynchronousQueue<>();
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
System.out.println(sq.take());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
sq.put(1);
sq.put(2);
sq.put(3);
sq.put(4);
/*
sq.add(1);
Thread.sleep(1000);
sq.add(2);
Thread.sleep(1000);
sq.add(3);
*/
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
public static void testFour() {
SynchronousQueue<Integer> sq = new SynchronousQueue<>();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Integer v = sq.poll(2, TimeUnit.SECONDS);
if (v == null) {
System.out.println("null");
break;
} else {
System.out.println(v);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
sq.put(1);
sq.put(2);
sq.put(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
public static void main(String[] args) {
// testOne();
// testTwo();
testThree();
// testFour();
}
}
- d) PriorityBlockingQueue
一个无界阻塞队列,默认初始化长度11,也可以手动指定,但是队列会自动扩容。资源被耗尽时导致OutOfMemoryError。不允许使用null元素。不允许插入不可比较的对象(导致抛出ClassCastException),
加入的对象实现Comparable接口
。
class Demo implements Comparable<Demo> {
private int id;
private String name;
public Demo(int id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Demo{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
@Override
public int compareTo(Demo o) {
if (this.id > o.id) {
return 1;
} else if (this.id < o.id) {
return -1;
} else {
return 0;
}
}
}
public class DemoThread33 {
/**
* 队列中的数据不是按照顺序排列的
*/
public static void testAdd() {
PriorityBlockingQueue<Demo> pq = new PriorityBlockingQueue<>(); // 默认容量11
pq.add(new Demo(3, "a"));
pq.offer(new Demo(1, "b"));
pq.put(new Demo(2, "c"));
pq.offer(new Demo(4, "d"), 2, TimeUnit.SECONDS);
System.out.println(pq); // [Demo{id=1, name='b'}, Demo{id=3, name='a'}, Demo{id=2, name='c'}, Demo{id=4, name='d'}]
}
/**
* 队列中的元素按照顺序被取走
*/
public static void testTake1() {
PriorityBlockingQueue<Demo> pq = new PriorityBlockingQueue<>();
pq.add(new Demo(3, "a"));
pq.offer(new Demo(1, "b"));
pq.put(new Demo(2, "c"));
pq.offer(new Demo(4, "d"), 2, TimeUnit.SECONDS);
try {
System.out.println(pq.poll()); // Demo{id=1, name='b'}
System.out.println(pq.poll(2, TimeUnit.SECONDS)); // Demo{id=2, name='c'}
System.out.println(pq.take()); // Demo{id=3, name='a'}
System.out.println(pq.take()); // Demo{id=4, name='d'}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void testTake2() {
PriorityBlockingQueue<Demo> pbq = new PriorityBlockingQueue<>();
pbq.add(new Demo(3, "a"));
pbq.add(new Demo(1, "b"));
System.out.println(pbq); // [Demo{id=1, name='b'}, Demo{id=3, name='a'}]
System.out.println(pbq.peek()); // Demo{id=1, name='b'}
System.out.println(pbq); // [Demo{id=1, name='b'}, Demo{id=3, name='a'}]
System.out.println(pbq.poll()); // Demo{id=1, name='b'}
System.out.println(pbq);// [Demo{id=3, name='a'}]
try {
System.out.println(pbq.take()); // Demo{id=3, name='a'}
System.out.println(pbq); // []
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(pbq.peek()); // null
System.out.println(pbq.poll()); // null
try {
System.out.println(pbq.poll(2, TimeUnit.SECONDS)); //null
System.out.println(pbq.take()); // 阻塞...
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("over"); // 无输出...
}
public static void testTake3() {
PriorityBlockingQueue<Integer> pbq = new PriorityBlockingQueue<>();
pbq.add(3);
pbq.add(2);
pbq.add(3);
pbq.add(1);
pbq.add(5);
ArrayList<Integer> arrayList = new ArrayList<>();
pbq.drainTo(arrayList, 2);
System.out.println(arrayList); // [1, 2]
System.out.println(pbq); // [3, 5, 3]
Object[] list = arrayList.toArray();
Arrays.sort(list); // [1, 2]
System.out.println(Arrays.toString(list));
}
public static void testTake4() {
PriorityBlockingQueue<Integer> pbq = new PriorityBlockingQueue<>();
pbq.add(3);
pbq.add(2);
pbq.add(3);
pbq.add(1);
pbq.add(5);
ArrayList<Integer> arrayList = new ArrayList<>();
pbq.drainTo(arrayList);
ArrayList<Integer> list = new ArrayList<>();
pbq.drainTo(list);
System.out.println(list); // []
System.out.println(pbq); // []
}
public static void main(String[] args) {
// testAdd();
testTake1();
// testTake2();
// testTake3();
// testTake4();
}
}
- e) DelayQueue
DelayQueue:Delayed元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的getDelay(TimeUnit.NANOSECONDS)方法返回一个小于等于0的值时,将发生到期。即使无法使用take或poll移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size方法同时返回到期和未到期元素的计数。此队列不允许使用null元素。
内部元素需实现Delayed接口
。
class User implements Delayed {
private int id;
private String name;
private long endTime;
public User(int id, String name, long endTime) {
this.id = id;
this.name = name;
this.endTime = endTime;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
@Override
public String toString() {
return name;
}
@Override
public int compareTo(Delayed o) {
User user = (User) o;
if (this.getEndTime() > user.getEndTime()) {
return 1;
} else if (this.getEndTime() < user.getEndTime()) {
return -1;
} else {
return 0;
}
}
@Override
public long getDelay(TimeUnit unit) {
return this.endTime - System.currentTimeMillis();
}
}
/**
* 到时间后强制用户下线功能模拟
*/
public class DemoThread34 {
DelayQueue<User> dq = new DelayQueue<>();
public void login(User user) {
dq.add(user);
System.out.println("login: " + user.getId() + "-" + user.getName() + "-" + user.getEndTime());
}
public void logout() {
try {
User user = dq.take(); // 此处不能使用dq.poll()[非阻塞式方法]
System.out.println("logout: " + user.getId() + "-" + user.getName() + "-" + user.getEndTime());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int onlineSize() {
return dq.size();
}
public DelayQueue<User> getUsers() {
return dq;
}
public static void main(String[] args) {
DemoThread34 demoThread34 = new DemoThread34();
demoThread34.login(new User(1, "a", 3000 + System.currentTimeMillis()));
demoThread34.login(new User(2, "b", 1000 + System.currentTimeMillis()));
demoThread34.login(new User(3, "c", 2000 + System.currentTimeMillis()));
System.out.println(demoThread34.getUsers());
while (true) {
demoThread34.logout();
System.out.println(demoThread34.getUsers());
if (demoThread34.onlineSize() == 0) {
break;
}
}
}
}