HengYk Blog

个人站

具有浪漫主义情调的理想主义务实青年


Java Concurrent P3

第三章 JAVA 并发编程(精通篇之Concurrent同步工具类)

1. CountDownLatch类

       CountDownLatch是一个辅助工具类,它允许一个或多个线程等待一系列指定操作的完成。CountDownLatch以一个给定的数量初始化。countDown()每被调用一次,这一数量就减一。通过调用 await()方法之一,线程可以阻塞等待这一数量到达零。


public class CountDownLatchDemo {

    //注意:可以引入多个计数器CountDownLatch
    //private static final CountDownLatch countDownLatch = new CountDownLatch(3);

    public static void main(String[] args) {

        final CountDownLatch countDownLatch = new CountDownLatch(3);
        String name = Thread.currentThread().getName();

        new Thread(new Runnable() {

            @Override
            public synchronized void run() {
                try {
                    System.out.println(name);

                    System.out.println(Thread.currentThread().getName() + " await...");
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName() + " stop!!!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {

            @Override
            public synchronized void run() {
                try {
                    System.out.println(name);

                    System.out.println(Thread.currentThread().getName() + " await...");
                    countDownLatch.await();
                    System.out.println(Thread.currentThread().getName() + " stop!!!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("..." + countDownLatch.getCount());
        countDownLatch.countDown();
        System.out.println("..." + countDownLatch.getCount());
        countDownLatch.countDown();
        System.out.println("..." + countDownLatch.getCount());
        countDownLatch.countDown();
        /*
          特别注意:此处所有的阻塞线程已经都被唤醒,下面的countDown方法执行时,主线程会和两个子线程争抢资源。
         */
        System.out.println("..." + countDownLatch.getCount());
        countDownLatch.countDown();
        System.out.println("..." + countDownLatch.getCount());
    }

    /*
    -------------结果可能是这样的-------------
    #main
    *main
    Thread-1 await...
    Thread-0 await...
    ...3
    ...2
    ...1
    Thread-1 stop!!!
    Thread-0 stop!!!
    ...0
    ...0

    -------------结果也可能是这样的-------------
    *main
    Thread-0 await...
    #main
    Thread-1 await...
    ...3
    ...2
    ...1
    ...0
    ...0
    Thread-0 stop!!!
    Thread-1 stop!!!
     */
}

2. CyclicBarrier类

       CyclicBarrier 是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环的barrier。


public class Runner implements Runnable {

    private CyclicBarrier cyclicBarrier;
    private String name;

    public Runner(CyclicBarrier cyclicBarrier, String name) {
        this.cyclicBarrier = cyclicBarrier;
        this.name = name;
    }

    @Override
    public void run() {

        try {
            Thread.sleep(1000 * (new Random()).nextInt(8));
            System.out.println(name + "准备好了");
            //cyclicBarrier.await();
            cyclicBarrier.await(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            System.out.println(name + "线程中断");
            return;
        } catch (BrokenBarrierException e) {
            System.out.println(name + "Barrier异常");
            return;
        } catch (TimeoutException e) {
            System.out.println(name + "超时异常");
        }

        System.out.println(name + "跑呀...");
    }
}

public class CyclicBarrierDemo {

    public static void main(String[] args) {

        CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.submit(new Runner(cyclicBarrier, "t1"));
        executorService.submit(new Runner(cyclicBarrier, "t2"));
        executorService.submit(new Runner(cyclicBarrier, "t3"));

        executorService.shutdown();
    }

    /*
    -------------结果可能是这样的-------------
    t1::准备好了::1000
    t2::准备好了::4000
    t3::准备好了::4000
    t3::跑呀...
    t1::跑呀...
    t2::跑呀...

    -------------结果也可能是这样的-------------
    t3::准备好了::0
    t3::超时异常
    t3::跑呀...
    t1::准备好了::6000
    t1::Barrier异常
    t2::准备好了::7000
    t2::Barrier异常

    注意比较上述结果中时间的差值,如果最长准备时间比最短准备时间多3秒,则抛出异常TimeoutException和BrokenBarrierException。
     */
}

       需要所有的子任务都完成时,才执行主任务,这个时候就可以选择使用CyclicBarrier。


public class Worker implements Runnable {

    private String name;
    private CyclicBarrier cyclicBarrier;

    public Worker(String name, CyclicBarrier cyclicBarrier) {
        this.name = name;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {

        try {
            System.out.println(name + "运行完毕");
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

public class CyclicBarrierDemoTwo {

    public static void main(String[] args) {

        CyclicBarrier cyclicBarrier = new CyclicBarrier(100);

        for (int i = 0; i < 99; i ++) {
            new Thread(new Worker("worker" + i, cyclicBarrier)).start();
        }

        System.out.println("主线程开始等待..................");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }

        System.out.println("主线程开始工作..................");
    }

    /*
    worker0运行完毕
    worker1运行完毕
    主线程开始等待..................
    worker2运行完毕
    worker3运行完毕
    worker4运行完毕
    worker5运行完毕
    worker6运行完毕
    worker7运行完毕
    worker8运行完毕
    主线程开始工作..................
     */
}

3. Semaphore类

       Semaphore是一个计数信号量。信号量维护了一个许可集合; 通过acquire()acquire()来获取和释放访问许可证。只有通过acquire获取了许可证的线程才能执行,否则阻塞。通过release释放许可证其他线程才能进行获取。

       公平性:没有办法保证线程能够公平地可从信号量中获得许可。也就是说,无法担保掉第一个调用 acquire() 的线程会是第一个获得一个许可的线程。如果第一个线程在等待一个许可时发生阻塞,而第二个线程前来索要一个许可的时候刚好有一个许可被释放出来,那么它就可能会在第一个线程之前获得许可。如果你想要强制公平,Semaphore 类有一个具有一个布尔类型的参数的构造子,通过这个参数以告知 Semaphore 是否要强制公平。强制公平会影响到并发性能,所以除非你确实需要它否则不要启用它。


public class SemaphoreDemo {

    public static void main(String[] args) {

        ExecutorService executorService = Executors.newCachedThreadPool();

        //Semaphore semaphore = new Semaphore(2);
        Semaphore semaphore = new Semaphore(2, true);//强制公平,但会影响到并发性能

        for (int i = 0; i < 5; i ++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + "尝试获取许可证");
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName() + "获取许可证");
                        Thread.sleep(5000);
                        System.out.println(Thread.currentThread().getName() + "释放许可证");
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            executorService.execute(runnable);
        }

        executorService.shutdown();
    }

    /*
    -------------不计较公平-------------
    pool-1-thread-1尝试获取许可证
    pool-1-thread-2尝试获取许可证
    pool-1-thread-2获取许可证
    pool-1-thread-1获取许可证
    pool-1-thread-3尝试获取许可证
    pool-1-thread-4尝试获取许可证
    pool-1-thread-5尝试获取许可证
    pool-1-thread-2释放许可证
    pool-1-thread-1释放许可证
    pool-1-thread-3获取许可证
    pool-1-thread-4获取许可证
    pool-1-thread-3释放许可证
    pool-1-thread-4释放许可证
    pool-1-thread-5获取许可证
    pool-1-thread-5释放许可证

    注意:尽管pool-1-thread-1首先尝试获取许可证,但是pool-1-thread-2还是率先获得了许可证(无公平性)。

    -------------强制公平-------------
    pool-1-thread-1尝试获取许可证
    pool-1-thread-2尝试获取许可证
    pool-1-thread-1获取许可证
    pool-1-thread-2获取许可证
    pool-1-thread-3尝试获取许可证
    pool-1-thread-4尝试获取许可证
    pool-1-thread-5尝试获取许可证
    pool-1-thread-2释放许可证
    pool-1-thread-1释放许可证
    pool-1-thread-3获取许可证
    pool-1-thread-4获取许可证
    pool-1-thread-3释放许可证
    pool-1-thread-4释放许可证
    pool-1-thread-5获取许可证
    pool-1-thread-5释放许可证
     */
}

4. Exchanger类

       Exchanger类表示一种两个线程可以进行互相交换对象的会合点。只能用于两个线程之间,并且两个线程必须都到达会合点才会进行数据交换


public class ExchangerRunnable implements Runnable {

    private Exchanger<String> exchanger;
    private Object object;

    public ExchangerRunnable(Exchanger<String> exchanger, Object object) {
        this.exchanger = exchanger;
        this.object = object;
    }

    @Override
    public void run() {
        Object previous = this.object;
        System.out.println(Thread.currentThread().getName() + "交换前:" + previous);

        //个人观点:先处理完,先进行交换。
        try {
            if ("A".equals(this.object)) {
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "处理了1s");
            } else if ("B".equals(this.object)) {
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + "处理了2s");
            } else if ("C".equals(this.object)) {
                Thread.sleep(3000);
                System.out.println(Thread.currentThread().getName() + "处理了3s");
            } else if ("D".equals(this.object)) {
                Thread.sleep(4000);
                System.out.println(Thread.currentThread().getName() + "处理了4s");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            this.object = this.exchanger.exchange((String) this.object);
            System.out.println(Thread.currentThread().getName() + " exchange " + previous + " for " + this.object);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class ExchangerDemo {

    public static void main(String[] args) {

        Exchanger<String> exchanger = new Exchanger<>();

        ExchangerRunnable runnable = new ExchangerRunnable(exchanger, "A");
        ExchangerRunnable runnable1 = new ExchangerRunnable(exchanger, "B");

        new Thread(runnable).start();
        new Thread(runnable1).start();

        Exchanger<String> exchanger2 = new Exchanger<>();

        ExchangerRunnable runnable3 = new ExchangerRunnable(exchanger2, "C");
        ExchangerRunnable runnable4 = new ExchangerRunnable(exchanger2, "D");

        new Thread(runnable3).start();
        new Thread(runnable4).start();
    }

    /*
    Thread-0交换前:A
    Thread-1交换前:B
    Thread-2交换前:C
    Thread-3交换前:D
    Thread-0处理了1s
    Thread-1处理了2s
    Thread-1 exchange B for A
    Thread-0 exchange A for B
    Thread-2处理了3s
    Thread-3处理了4s
    Thread-3 exchange D for C
    Thread-2 exchange C for D
     */
}

5. ReentrantLock类

       ReentrantLock可以用来替代Synchronized,在需要同步的代码块加上锁,最后一定要释放锁,否则其他线程永远进不来。


public class ReentrantLockDemo {

    private static ReentrantLock lock = new ReentrantLock();

    public static void runOne() {

        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "runOne...");
            Thread.sleep(10000);
            System.out.println(Thread.currentThread().getName() + "runOne finished...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void runTwo() {

        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "runTwo...");
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                runOne();
            }
        }, "t1").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                runTwo();
            }
        }, "t2").start();
    }

    /*
    t1runOne...
    t1runOne finished...
    t2runTwo...
     */
}

       可以使用Condition来替换wait和notify来进行线程间的通讯,Condition只针对某一把锁。一个Lock可以创建多个Condition,更加灵活。

       ReentrantLock的构造函数可以传入一个boolean参数,用来指定公平/非公平模式,默认false是非公平的。非公平的效率更高。


public class ReentrantLockDemoTwo {

    private static ReentrantLock lock = new ReentrantLock(true);
    private static Condition c1 = lock.newCondition();
    private static Condition c2 = lock.newCondition();

    public static void runOne() {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "runOne...");
            c1.await();
            System.out.println(Thread.currentThread().getName() + "runOne continue...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

    }

    public static void runTwo() {

        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "runTwo...");
            c1.await();
            System.out.println(Thread.currentThread().getName() + "runTwo continue...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void runThree() {

        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "runThree...");
            c1.signalAll();
            System.out.println(Thread.currentThread().getName() + "runThree continue...");
        } finally {
            lock.unlock();
        }
    }

    public static void runFour() {

        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "runFour...");
            c2.await();
            System.out.println(Thread.currentThread().getName() + "runFour continue...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void runFive() {

        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "runFive...");
            c2.signal();
            System.out.println(Thread.currentThread().getName() + "runFive continue...");
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                runFour();
            }
        }, "t4").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                runTwo();
            }
        }, "t2").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                runOne();
            }
        }, "t1").start();

        try {
            Thread.sleep(5000);
            System.out.println("--------------------------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                runThree();
            }
        }, "t3").start();

        try {
            Thread.sleep(5000);
            System.out.println("--------------------------------------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                runFive();
            }
        }, "t5").start();
    }
}

       Lock的其他方法:

  • tryLock():尝试获得锁,返回true/false;
  • tryLock(timeout, unit):在给定的时间内尝试获得锁;
  • isFair():是否为公平锁;
  • isLocked():当前线程是否持有锁;
  • lock.getHoldCount():持有锁的数量,只能在当前调用线程内部使用,不能再其他线程中使用。


public class ReentrantLockDemoThree {

    private static ReentrantLock lock = new ReentrantLock();

    public static void runOne () {

        try {
            lock.lock();
            System.out.println(lock.getHoldCount());
            runTwo();
        } finally {
            System.out.println("release lock 1");
            lock.unlock();
        }
    }

    public static void runTwo () {

        try {
            lock.lock();
            System.out.println(lock.getHoldCount());
        } finally {
            System.out.println("release lock 2");
            lock.unlock();
        }
    }

    public static void main(String[] args) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                runOne();
            }
        }, "t1").start();
    }

    /*
    1
    2
    release lock 2
    release lock 1
     */
}

6. ReentrantReadWriteLock类

       ReentrantReadWriteLock读写锁,采用读写分离机制,高并发下读多写少时性能优于ReentrantLock。

        读读共享,写写互斥,读写互斥(详见ReentrantReadWriteLockDemo)。


public class ReentrantReadWriteLockDemo {

    private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private ReadLock readLock = lock.readLock();
    private WriteLock writeLock = lock.writeLock();

    public void read() {

        try {
            readLock.lock();
            System.out.println(Thread.currentThread().getName() + "read...");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + "read exit...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            readLock.unlock();
        }
    }

    public void write() {

        try {
            writeLock.lock();
            System.out.println(Thread.currentThread().getName() + "write...");
            Thread.sleep(5000);
            System.out.println(Thread.currentThread().getName() + "write exit...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            writeLock.unlock();
        }
    }

    public static void main(String[] args) {

        ReentrantReadWriteLockDemo demo = new ReentrantReadWriteLockDemo();

//        new Thread(new Runnable() {
//            @Override
//            public void run() {
//                demo.read();
//            }
//        }, "t1").start();
//
//        new Thread(new Runnable() {
//            @Override
//            public void run() {
//                demo.read();
//            }
//        }, "t2").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                demo.write();
            }
        }, "t3").start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                demo.write();
            }
        }, "t4").start();
    }
}
代码下载参见https://github.com/HengYk/ConcurrentDemo