第三章 JAVA 并发编程(精通篇之Concurrent同步工具类)
1. CountDownLatch类
CountDownLatch是一个
辅助工具类
,它允许一个或多个线程等待一系列指定操作的完成。CountDownLatch以一个给定的数量初始化。countDown()
每被调用一次,这一数量就减一。通过调用await()
方法之一,线程可以阻塞等待这一数量到达零。
public class CountDownLatchDemo {
//注意:可以引入多个计数器CountDownLatch
//private static final CountDownLatch countDownLatch = new CountDownLatch(3);
public static void main(String[] args) {
final CountDownLatch countDownLatch = new CountDownLatch(3);
String name = Thread.currentThread().getName();
new Thread(new Runnable() {
@Override
public synchronized void run() {
try {
System.out.println(name);
System.out.println(Thread.currentThread().getName() + " await...");
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " stop!!!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public synchronized void run() {
try {
System.out.println(name);
System.out.println(Thread.currentThread().getName() + " await...");
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " stop!!!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("..." + countDownLatch.getCount());
countDownLatch.countDown();
System.out.println("..." + countDownLatch.getCount());
countDownLatch.countDown();
System.out.println("..." + countDownLatch.getCount());
countDownLatch.countDown();
/*
特别注意:此处所有的阻塞线程已经都被唤醒,下面的countDown方法执行时,主线程会和两个子线程争抢资源。
*/
System.out.println("..." + countDownLatch.getCount());
countDownLatch.countDown();
System.out.println("..." + countDownLatch.getCount());
}
/*
-------------结果可能是这样的-------------
#main
*main
Thread-1 await...
Thread-0 await...
...3
...2
...1
Thread-1 stop!!!
Thread-0 stop!!!
...0
...0
-------------结果也可能是这样的-------------
*main
Thread-0 await...
#main
Thread-1 await...
...3
...2
...1
...0
...0
Thread-0 stop!!!
Thread-1 stop!!!
*/
}
2. CyclicBarrier类
CyclicBarrier 是一个
同步辅助类
,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用
,所以称它为循环的barrier。
public class Runner implements Runnable {
private CyclicBarrier cyclicBarrier;
private String name;
public Runner(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}
@Override
public void run() {
try {
Thread.sleep(1000 * (new Random()).nextInt(8));
System.out.println(name + "准备好了");
//cyclicBarrier.await();
cyclicBarrier.await(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
System.out.println(name + "线程中断");
return;
} catch (BrokenBarrierException e) {
System.out.println(name + "Barrier异常");
return;
} catch (TimeoutException e) {
System.out.println(name + "超时异常");
}
System.out.println(name + "跑呀...");
}
}
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
ExecutorService executorService = Executors.newFixedThreadPool(3);
executorService.submit(new Runner(cyclicBarrier, "t1"));
executorService.submit(new Runner(cyclicBarrier, "t2"));
executorService.submit(new Runner(cyclicBarrier, "t3"));
executorService.shutdown();
}
/*
-------------结果可能是这样的-------------
t1::准备好了::1000
t2::准备好了::4000
t3::准备好了::4000
t3::跑呀...
t1::跑呀...
t2::跑呀...
-------------结果也可能是这样的-------------
t3::准备好了::0
t3::超时异常
t3::跑呀...
t1::准备好了::6000
t1::Barrier异常
t2::准备好了::7000
t2::Barrier异常
注意比较上述结果中时间的差值,如果最长准备时间比最短准备时间多3秒,则抛出异常TimeoutException和BrokenBarrierException。
*/
}
需要所有的子任务都完成时,才执行主任务,这个时候就可以选择使用CyclicBarrier。
public class Worker implements Runnable {
private String name;
private CyclicBarrier cyclicBarrier;
public Worker(String name, CyclicBarrier cyclicBarrier) {
this.name = name;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
System.out.println(name + "运行完毕");
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierDemoTwo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(100);
for (int i = 0; i < 99; i ++) {
new Thread(new Worker("worker" + i, cyclicBarrier)).start();
}
System.out.println("主线程开始等待..................");
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("主线程开始工作..................");
}
/*
worker0运行完毕
worker1运行完毕
主线程开始等待..................
worker2运行完毕
worker3运行完毕
worker4运行完毕
worker5运行完毕
worker6运行完毕
worker7运行完毕
worker8运行完毕
主线程开始工作..................
*/
}
3. Semaphore类
Semaphore是一个
计数信号量
。信号量维护了一个许可集合; 通过acquire()
和acquire()
来获取和释放访问许可证。只有通过acquire获取了许可证的线程才能执行,否则阻塞。通过release释放许可证其他线程才能进行获取。
公平性
:没有办法保证线程能够公平地可从信号量中获得许可。也就是说,无法担保掉第一个调用 acquire() 的线程会是第一个获得一个许可的线程。如果第一个线程在等待一个许可时发生阻塞,而第二个线程前来索要一个许可的时候刚好有一个许可被释放出来,那么它就可能会在第一个线程之前获得许可。如果你想要强制公平
,Semaphore 类有一个具有一个布尔类型的参数的构造子,通过这个参数以告知 Semaphore 是否要强制公平。强制公平会影响到并发性能,所以除非你确实需要它否则不要启用它。
public class SemaphoreDemo {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
//Semaphore semaphore = new Semaphore(2);
Semaphore semaphore = new Semaphore(2, true);//强制公平,但会影响到并发性能
for (int i = 0; i < 5; i ++) {
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "尝试获取许可证");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "获取许可证");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "释放许可证");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executorService.execute(runnable);
}
executorService.shutdown();
}
/*
-------------不计较公平-------------
pool-1-thread-1尝试获取许可证
pool-1-thread-2尝试获取许可证
pool-1-thread-2获取许可证
pool-1-thread-1获取许可证
pool-1-thread-3尝试获取许可证
pool-1-thread-4尝试获取许可证
pool-1-thread-5尝试获取许可证
pool-1-thread-2释放许可证
pool-1-thread-1释放许可证
pool-1-thread-3获取许可证
pool-1-thread-4获取许可证
pool-1-thread-3释放许可证
pool-1-thread-4释放许可证
pool-1-thread-5获取许可证
pool-1-thread-5释放许可证
注意:尽管pool-1-thread-1首先尝试获取许可证,但是pool-1-thread-2还是率先获得了许可证(无公平性)。
-------------强制公平-------------
pool-1-thread-1尝试获取许可证
pool-1-thread-2尝试获取许可证
pool-1-thread-1获取许可证
pool-1-thread-2获取许可证
pool-1-thread-3尝试获取许可证
pool-1-thread-4尝试获取许可证
pool-1-thread-5尝试获取许可证
pool-1-thread-2释放许可证
pool-1-thread-1释放许可证
pool-1-thread-3获取许可证
pool-1-thread-4获取许可证
pool-1-thread-3释放许可证
pool-1-thread-4释放许可证
pool-1-thread-5获取许可证
pool-1-thread-5释放许可证
*/
}
4. Exchanger类
Exchanger类表示一种两个线程可以进行互相交换对象的会合点。只能用于两个线程之间,并且两个线程必须都到达会合点才会进行
数据交换
。
public class ExchangerRunnable implements Runnable {
private Exchanger<String> exchanger;
private Object object;
public ExchangerRunnable(Exchanger<String> exchanger, Object object) {
this.exchanger = exchanger;
this.object = object;
}
@Override
public void run() {
Object previous = this.object;
System.out.println(Thread.currentThread().getName() + "交换前:" + previous);
//个人观点:先处理完,先进行交换。
try {
if ("A".equals(this.object)) {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "处理了1s");
} else if ("B".equals(this.object)) {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "处理了2s");
} else if ("C".equals(this.object)) {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "处理了3s");
} else if ("D".equals(this.object)) {
Thread.sleep(4000);
System.out.println(Thread.currentThread().getName() + "处理了4s");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
this.object = this.exchanger.exchange((String) this.object);
System.out.println(Thread.currentThread().getName() + " exchange " + previous + " for " + this.object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
ExchangerRunnable runnable = new ExchangerRunnable(exchanger, "A");
ExchangerRunnable runnable1 = new ExchangerRunnable(exchanger, "B");
new Thread(runnable).start();
new Thread(runnable1).start();
Exchanger<String> exchanger2 = new Exchanger<>();
ExchangerRunnable runnable3 = new ExchangerRunnable(exchanger2, "C");
ExchangerRunnable runnable4 = new ExchangerRunnable(exchanger2, "D");
new Thread(runnable3).start();
new Thread(runnable4).start();
}
/*
Thread-0交换前:A
Thread-1交换前:B
Thread-2交换前:C
Thread-3交换前:D
Thread-0处理了1s
Thread-1处理了2s
Thread-1 exchange B for A
Thread-0 exchange A for B
Thread-2处理了3s
Thread-3处理了4s
Thread-3 exchange D for C
Thread-2 exchange C for D
*/
}
5. ReentrantLock类
ReentrantLock可以用来替代Synchronized,在需要同步的代码块加上锁,最后一定要释放锁,否则其他线程永远进不来。
public class ReentrantLockDemo {
private static ReentrantLock lock = new ReentrantLock();
public static void runOne() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "runOne...");
Thread.sleep(10000);
System.out.println(Thread.currentThread().getName() + "runOne finished...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void runTwo() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "runTwo...");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
runOne();
}
}, "t1").start();
new Thread(new Runnable() {
@Override
public void run() {
runTwo();
}
}, "t2").start();
}
/*
t1runOne...
t1runOne finished...
t2runTwo...
*/
}
可以使用Condition来替换wait和notify来进行
线程间的通讯
,Condition只针对某一把锁。一个Lock可以创建多个Condition,更加灵活。
ReentrantLock的构造函数可以传入一个boolean参数,用来指定
公平/非公平模式
,默认false是非公平的。非公平的效率更高。
public class ReentrantLockDemoTwo {
private static ReentrantLock lock = new ReentrantLock(true);
private static Condition c1 = lock.newCondition();
private static Condition c2 = lock.newCondition();
public static void runOne() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "runOne...");
c1.await();
System.out.println(Thread.currentThread().getName() + "runOne continue...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void runTwo() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "runTwo...");
c1.await();
System.out.println(Thread.currentThread().getName() + "runTwo continue...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void runThree() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "runThree...");
c1.signalAll();
System.out.println(Thread.currentThread().getName() + "runThree continue...");
} finally {
lock.unlock();
}
}
public static void runFour() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "runFour...");
c2.await();
System.out.println(Thread.currentThread().getName() + "runFour continue...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void runFive() {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "runFive...");
c2.signal();
System.out.println(Thread.currentThread().getName() + "runFive continue...");
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
runFour();
}
}, "t4").start();
new Thread(new Runnable() {
@Override
public void run() {
runTwo();
}
}, "t2").start();
new Thread(new Runnable() {
@Override
public void run() {
runOne();
}
}, "t1").start();
try {
Thread.sleep(5000);
System.out.println("--------------------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
runThree();
}
}, "t3").start();
try {
Thread.sleep(5000);
System.out.println("--------------------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(new Runnable() {
@Override
public void run() {
runFive();
}
}, "t5").start();
}
}
Lock的其他方法:
- tryLock():尝试获得锁,返回true/false;
- tryLock(timeout, unit):在给定的时间内尝试获得锁;
- isFair():是否为公平锁;
- isLocked():当前线程是否持有锁;
- lock.getHoldCount():持有锁的数量,只能在当前调用线程内部使用,不能再其他线程中使用。
public class ReentrantLockDemoThree {
private static ReentrantLock lock = new ReentrantLock();
public static void runOne () {
try {
lock.lock();
System.out.println(lock.getHoldCount());
runTwo();
} finally {
System.out.println("release lock 1");
lock.unlock();
}
}
public static void runTwo () {
try {
lock.lock();
System.out.println(lock.getHoldCount());
} finally {
System.out.println("release lock 2");
lock.unlock();
}
}
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
runOne();
}
}, "t1").start();
}
/*
1
2
release lock 2
release lock 1
*/
}
6. ReentrantReadWriteLock类
ReentrantReadWriteLock读写锁,采用
读写分离机制
,高并发下读多写少时性能优于ReentrantLock。
读读共享,写写互斥,读写互斥
(详见ReentrantReadWriteLockDemo)。
public class ReentrantReadWriteLockDemo {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private ReadLock readLock = lock.readLock();
private WriteLock writeLock = lock.writeLock();
public void read() {
try {
readLock.lock();
System.out.println(Thread.currentThread().getName() + "read...");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "read exit...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
}
public void write() {
try {
writeLock.lock();
System.out.println(Thread.currentThread().getName() + "write...");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "write exit...");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
public static void main(String[] args) {
ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo();
// new Thread(new Runnable() {
// @Override
// public void run() {
// demo.read();
// }
// }, "t1").start();
//
// new Thread(new Runnable() {
// @Override
// public void run() {
// demo.read();
// }
// }, "t2").start();
new Thread(new Runnable() {
@Override
public void run() {
demo.write();
}
}, "t3").start();
new Thread(new Runnable() {
@Override
public void run() {
demo.write();
}
}, "t4").start();
}
}