HengYk Blog

个人站

具有浪漫主义情调的理想主义务实青年


Java Concurrent P2

第二章 JAVA并发编程(进阶篇)

1. Volatile关键字

       作用:强制线程到共享内存中读取数据,而不从线程的工作内存中读取,从而使变量在多个线程间可见。


public class DemoThread13 {

    /**
     * 共享内存的可见性
     */
    private List<String> list = new ArrayList<>();
    private volatile boolean canGet = false;

    private void put() {

        for (int i = 0; i < 10; i++) {

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            list.add("A");
            System.out.println("线程" + Thread.currentThread().getName() + "添加第" + i + "个元素");

            if (i == 5) {
                canGet = true;
                System.out.println("线程" + Thread.currentThread().getName() + "发出通知");
            }
        }
    }

    private void get() {

        while (true) {
            if (canGet) {
                for (String s : list) {
                    System.out.println("线程" + Thread.currentThread().getName() + "获取元素:" + s);
                }
                break;
            }
        }
    }

    public static void main(String[] args) {

        DemoThread13 d13 = new DemoThread13();

        new Thread(new Runnable() {
            @Override
            public void run() {
                d13.put();
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                d13.get();
            }
        }).start();
    }

    /*
    -----------属性canGet添加volatile(主程序可终止)-----------
    线程Thread-0添加第0个元素
    线程Thread-0添加第1个元素
    线程Thread-0添加第2个元素
    线程Thread-0添加第3个元素
    线程Thread-0添加第4个元素
    线程Thread-0添加第5个元素
    线程Thread-0发出通知
    线程Thread-1获取元素:A
    线程Thread-1获取元素:A
    线程Thread-1获取元素:A
    线程Thread-1获取元素:A
    线程Thread-1获取元素:A
    线程Thread-1获取元素:A
    线程Thread-0添加第6个元素
    线程Thread-0添加第7个元素
    线程Thread-0添加第8个元素
    线程Thread-0添加第9个元素

    -----------属性canGet不添加volatile(Thread-1不终止)-----------
    线程Thread-0添加第0个元素
    线程Thread-0添加第1个元素
    线程Thread-0添加第2个元素
    线程Thread-0添加第3个元素
    线程Thread-0添加第4个元素
    线程Thread-0添加第5个元素
    线程Thread-0发出通知
    线程Thread-0添加第6个元素
    线程Thread-0添加第7个元素
    线程Thread-0添加第8个元素
    线程Thread-0添加第9个元素

    -----------结论-----------
    volatile可以保证共享内存的可见性。
     */
}

       不足:volatile无法保证原子性,volatile属于轻量级的同步,性能比synchronized强很多(不加锁),但是只保证线程间的可见性,并不能替代synchronized的同步功能,netty框架中大量使用了volatile。


public class DemoThread14 implements Runnable {

    /**
     * 数据操作的原子性
     */

    private static volatile int sum = 0;

    private static void add() {

        System.out.println(Thread.currentThread().getName() + "初始化sum = " + sum);
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            sum++;
        }
        System.out.println(Thread.currentThread().getName() + "计算后sum = " + sum);
    }

    @Override
    public void run() {
        add();
    }

    public static void main(String[] args) {

        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            es.submit(new DemoThread14());
        }

        // shutdown()被调用之后,先前提交的任务将被执行,但不会接收新的任务
        es.shutdown();

        //保证10个子线程执行完毕再输出
        while (true) {
            if (es.isTerminated()) {
                System.out.println("sum最终 = " + sum);
                if (sum == 100) {
                    System.out.println(sum + " = ok");
                } else {
                    System.out.println(sum + " = no");
                }
                break;
            }
        }
    }

    /*
      -----------sum++之前不填加添加延迟-----------
      pool-1-thread-1初始化sum = 0
      pool-1-thread-1计算后sum = 10
      pool-1-thread-2初始化sum = 10
      pool-1-thread-2计算后sum = 20
      pool-1-thread-3初始化sum = 20
      pool-1-thread-3计算后sum = 30
      pool-1-thread-4初始化sum = 30
      pool-1-thread-4计算后sum = 40
      pool-1-thread-6初始化sum = 30
      pool-1-thread-5初始化sum = 30
      pool-1-thread-6计算后sum = 50
      pool-1-thread-7初始化sum = 40
      pool-1-thread-5计算后sum = 60
      pool-1-thread-7计算后sum = 70
      pool-1-thread-8初始化sum = 70
      pool-1-thread-8计算后sum = 80
      pool-1-thread-9初始化sum = 80
      pool-1-thread-9计算后sum = 90
      pool-1-thread-10初始化sum = 90
      pool-1-thread-10计算后sum = 100
      sum最终 = 100
      100 = ok

      -----------sum++之前填加添加延迟-----------
      pool-1-thread-1初始化sum = 0
      pool-1-thread-2初始化sum = 0
      pool-1-thread-3初始化sum = 1
      pool-1-thread-4初始化sum = 1
      pool-1-thread-5初始化sum = 1
      pool-1-thread-6初始化sum = 1
      pool-1-thread-7初始化sum = 1
      pool-1-thread-8初始化sum = 1
      pool-1-thread-9初始化sum = 1
      pool-1-thread-10初始化sum = 1
      pool-1-thread-1计算后sum = 66
      pool-1-thread-2计算后sum = 71
      pool-1-thread-5计算后sum = 72
      pool-1-thread-3计算后sum = 72
      pool-1-thread-4计算后sum = 73
      pool-1-thread-8计算后sum = 74
      pool-1-thread-6计算后sum = 74
      pool-1-thread-7计算后sum = 74
      pool-1-thread-9计算后sum = 75
      pool-1-thread-10计算后sum = 76
      sum最终 = 76
      76 = no

      -----------结论-----------
      volatile不能保证数据操作的原子性,因此无法保证线程安全。
      */
}

2. Volatile与Static区别

       Static保证唯一性, 不保证一致性,多个实例共享一个静态变量。

       Volatile保证一致性,不保证唯一性,多个实例有多个volatile变量。

3. Atomic类的原子性

       AtomicInteger等原子类可以保证共享变量的原子性。


public class DemoThread15 implements Runnable{

    private static AtomicInteger sum = new AtomicInteger(0);

    private static void add() {

        System.out.println(Thread.currentThread().getName() + "初始化sum = " + sum);
        for (int i = 0; i < 10; i++) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            sum.addAndGet(1);
        }
        System.out.println(Thread.currentThread().getName() + "计算后sum = " + sum);

    }

    @Override
    public void run() {
        add();
    }

    public static void main(String[] args) {

        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            es.submit(new DemoThread15());
        }

        // shutdown()被调用之后,先前提交的任务将被执行,但不会接收新的任务
        es.shutdown();

        //保证10个子线程执行完毕再输出
        while (true) {
            if (es.isTerminated()) {
                System.out.println("sum最终 = " + sum);
                if (sum.get() == 100) {
                    System.out.println(sum + " = ok");
                } else {
                    System.out.println(sum + " = no");
                }
                break;
            }
        }
    }

    /*
    -----------去掉volatile关键字,采用java.util.concurrent.atomic.*-----------
    pool-1-thread-1初始化sum = 0
    pool-1-thread-2初始化sum = 0
    pool-1-thread-3初始化sum = 0
    pool-1-thread-4初始化sum = 0
    pool-1-thread-5初始化sum = 0
    pool-1-thread-6初始化sum = 0
    pool-1-thread-7初始化sum = 0
    pool-1-thread-8初始化sum = 1
    pool-1-thread-9初始化sum = 1
    pool-1-thread-10初始化sum = 1
    pool-1-thread-2计算后sum = 91
    pool-1-thread-3计算后sum = 92
    pool-1-thread-4计算后sum = 93
    pool-1-thread-6计算后sum = 95
    pool-1-thread-7计算后sum = 95
    pool-1-thread-5计算后sum = 96
    pool-1-thread-1计算后sum = 97
    pool-1-thread-10计算后sum = 100
    pool-1-thread-9计算后sum = 100
    pool-1-thread-8计算后sum = 100
    sum最终 = 100
    100 = ok

    -----------结论-----------
    对比DemoThread14,我们发现Atomic类可以保证共享变量的原子性。

    -----------特殊的-----------
    Atomic类采用了CAS(checkAndSet)这种非锁机制。
     */
}

       Atomic类不能保证成员方法的原子性。


public class DemoThread16 implements Runnable {

    private static AtomicInteger sum = new AtomicInteger(0);

    private synchronized static void add() {

        sum.addAndGet(1);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        sum.addAndGet(9);

        System.out.println(Thread.currentThread().getName() + "计算后sum = " + sum);
    }

    @Override
    public void run() {
        add();
    }

    public static void main(String[] args) {

        ExecutorService es = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i ++) {
            es.submit(new DemoThread16());
        }

        es.shutdown();
    }

    /*
    -----------不添加synchronized关键字-----------
    pool-1-thread-1计算后sum = 28
    pool-1-thread-2计算后sum = 28
    pool-1-thread-3计算后sum = 37
    pool-1-thread-4计算后sum = 46
    pool-1-thread-5计算后sum = 55
    pool-1-thread-6计算后sum = 64
    pool-1-thread-8计算后sum = 82
    pool-1-thread-7计算后sum = 82
    pool-1-thread-10计算后sum = 100
    pool-1-thread-9计算后sum = 100

    -----------添加synchronized关键字-----------
    pool-1-thread-1计算后sum = 10
    pool-1-thread-10计算后sum = 20
    pool-1-thread-9计算后sum = 30
    pool-1-thread-8计算后sum = 40
    pool-1-thread-7计算后sum = 50
    pool-1-thread-6计算后sum = 60
    pool-1-thread-5计算后sum = 70
    pool-1-thread-4计算后sum = 80
    pool-1-thread-3计算后sum = 90
    pool-1-thread-2计算后sum = 100

    -----------结论-----------
    只Atomic类不能保证成员方法的原子性,但配合synchronized关键字可保证原子性操作。
     */
}

       Atomic类采用了CAS(CheckAndSet)这种锁机制。

4. ThreadLocal

       使用ThreadLocal维护变量时,ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。


public class DemoThread21 {

    /**
     * System.out.println()语句之后的注释表示程序的执行顺序。
     *
     * @param args
     */
    public static void main(String[] args) {

        ThreadLocal<Integer> th = new ThreadLocal<>();

        new Thread(new Runnable() {
            @Override
            public void run() {
                th.set(10);
                System.out.println(Thread.currentThread().getName() + ":" + th.get());  // 1
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + ":" + th.get());// 4
            }
        }).start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                Integer v = th.get();
                System.out.println(Thread.currentThread().getName() + ":" + v); // 2
                th.set(100);
                System.out.println(Thread.currentThread().getName() + ":" + th.get()); // 3
            }
        }).start();
    }

    /*
    Thread-0:10
    Thread-1:null
    Thread-1:100
    Thread-0:10

    -----------结论-----------
    ThreadLocal为每一个线程创建一个独立的副本,线程中的数据操作互不干扰。
}

5. 同步类容器

       Vector、HashTable等古老的并发容器,都是使用Collections.synchronizedXXX等工厂方法创建的,并发状态下只能有一个线程访问容器对象,性能很低。


public class DemoThread26 implements Runnable{

    // private static List<String> list = new ArrayList<>();

    private static List<String> list = Collections.synchronizedList(new ArrayList<>());

    private static void add() {
        for (int i = 0; i < 10; i ++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            list.add("A");
        }
    }

    @Override
    public void run() {
        add();
    }

    public static void main(String[] args) {

        ExecutorService es = Executors.newFixedThreadPool(5);

        for (int i = 0; i < 5; i ++) {
            es.submit(new DemoThread26());
        }

        es.shutdown();

        while (true) {
            if (es.isTerminated()) {
                System.out.println("线程结束了");
                System.out.println("list.size = " + list.size());
                if (list.size() != 50) {
                    System.out.println("线程不安全");
                } else {
                    System.out.println("线程安全");
                }
                break;
            }
        }
    }

    /*
    -----------不添加Collections.synchronizedList()-----------
    线程结束了
    list.size = 39
    线程不安全

    -----------添加Collections.synchronizedList()-----------
    线程结束了
    list.size = 50
    线程安全

    -----------结论-----------
    添加Collections.synchronizedXXX()可以快速将非线程安全的集合类改造成线程安全的,即并发状态下只能有一个线程访问容器对象,性能很低。
     */
}

6. 并发类容器

       【JDK5.0】之后提供了多种并发类容器可以替代同步类容器,提升性能、吞吐量。ConcurrentHashMap替代HashMap、HashTable;ConcurrentSkipListMap替代TreeMap。

       ConcurrentHashMap将hash表分为16个segment,每个segment单独进行锁控制,从而减小了锁的粒度,提升了性能。


public class DemoThread27 {

    /**
     * 通过并发下的运行时间对比ConcurrentHashMap和Hashtable的性能
     */
    public static void testMapOne() {

        // Hashtable<String, Integer> hash = new Hashtable<>();

        ConcurrentHashMap<String, Integer> hash = new ConcurrentHashMap<>();

        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    long start = System.currentTimeMillis();
                    for (int i = 0; i < 1000000; i++) {
                        hash.put("k" + i, i);
                    }
                    System.out.println(Thread.currentThread().getName() + ": " + (System.currentTimeMillis() - start));
                }
            }).start();
        }
    }

    public static void testSkipListMapOne() {

        // SortedMap<String, Integer> skipMap = new TreeMap<>();

        // 性能低,线程安全
        // SortedMap<String, Integer> skipMap = Collections.synchronizedSortedMap(new TreeMap<>());

        // 性能高,线程安全
        ConcurrentSkipListMap<String, Integer> skipMap = new ConcurrentSkipListMap<>();

        for (int i = 0; i < 10; i++) {

            new Thread(new Runnable() {
                @Override
                public void run() {
                    long start = System.currentTimeMillis();
                    for (int i = 0; i < 1000; i++) {
                        try {
                            Thread.sleep(0);
                            skipMap.put("k" + i, i);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println(Thread.currentThread().getName() + ": " + (System.currentTimeMillis() - start));
                    // System.out.println(skipMap);
                }
            }).start();
        }
    }

    /**
     * ConcurrentHashMap用法
     */
    public static void testMapTwo() {

        ConcurrentHashMap<String, Integer> hashMap = new ConcurrentHashMap<>();

        hashMap.put("d", 2);
        hashMap.put("c", 4);
        hashMap.put("e", 2);

        hashMap.put("e", 3);
        System.out.println(hashMap); // {c=4, d=2, e=3}

        hashMap.putIfAbsent("d1", 1);
        System.out.println(hashMap); // {c=4, d=2, e=3, d1=1}
    }

    /**
     * ConcurrentSkipListMap用法
     */
    public static void testSkipListMapTwo() {

        ConcurrentSkipListMap<String, Integer> skipListMap = new ConcurrentSkipListMap<>();

        skipListMap.put("d", 2);
        skipListMap.put("c", 4);
        skipListMap.put("e", 2);

        skipListMap.put("e", 3);
        System.out.println(skipListMap); // {c=4, d=2, e=3}

        skipListMap.putIfAbsent("d1", 1);
        System.out.println(skipListMap); // {c=4, d=2, d1=1, e=3}
    }

    public static void main(String[] args) {
        // testMapOne();
        // testSkipListMapOne();
        testMapTwo();
        // testSkipListMapTwo();
    }

    /*
    -----------古老的并发容器Hashtable的执行时间-----------
    Thread-6: 4249
    Thread-4: 4466
    Thread-3: 4582
    Thread-2: 4647
    Thread-9: 4575
    Thread-5: 4576
    Thread-8: 4593
    Thread-0: 4710
    Thread-1: 4726
    Thread-7: 4638

    -----------改进的并发容器ConcurrentHashMap的执行时间-----------
    Thread-0: 2582
    Thread-7: 2703
    Thread-9: 2763
    Thread-4: 2833
    Thread-3: 2900
    Thread-5: 2883
    Thread-1: 3010
    Thread-2: 3032
    Thread-8: 2901
    Thread-6: 2928

    Thread-0: 4313
    Thread-2: 4312
    Thread-3: 4270
    Thread-1: 4448
    Thread-9: 4259
    Thread-5: 4225
    Thread-7: 4357
    Thread-6: 4358
    Thread-8: 4394
    Thread-4: 4366

    -----------结论1-----------
    执行结果表明,ConcurrentHashMap大多数情况下会比Hashtable优秀一些。

    -----------使用SortedMap-----------
    Exception in thread "Thread-1" java.lang.NullPointerException
    Thread-6: 79
    Thread-9: 79
    Thread-7: 79
    Thread-2: 81
    Thread-8: 80
    Thread-4: 81
    Thread-5: 82
    Thread-3: 82
    Thread-0: 84

    -----------使用Collections.synchronizedSortedMap(SortedMap)-----------
    Thread-0: 166
    Thread-8: 157
    Thread-9: 157
    Thread-1: 166
    Thread-2: 165
    Thread-3: 166
    Thread-5: 166
    Thread-6: 165
    Thread-7: 165
    Thread-4: 166

    -----------使用ConcurrentSkipListMap-----------
    Thread-3: 71
    Thread-1: 74
    Thread-7: 71
    Thread-5: 72
    Thread-2: 74
    Thread-8: 72
    Thread-9: 72
    Thread-6: 73
    Thread-0: 77
    Thread-4: 92

    -----------结论2-----------
    直接使用SortedMap性能高但线程不安全;
    添加锁Collections.synchronizedSortedMap(SortedMap)线程安全但性能降低;
    使用使用ConcurrentSkipListMap既保证线程安全又提高了性能。
     */
}

       Copy On Write容器,简称COW;写时复制容器,向容器中添加元素时,先将容器Copy出一个新容器,然后将元素添加到新容器中,再将原容器的引用指向新容器。并发读的时候不需要锁定容器,因为原容器没有变化,使用的是一种读写分离的思想。但是由于每次更新都会复制新容器,所以如果数据量较大,并且更新操作频繁则对内存消耗很高,建议在高并发读的场景下使用。CopyOnWriteArraySet是基于CopyOnWriteArrayList实现的,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法, adIfAbsent方法同样采用锁保护,并创建一个新的大小+1的Object数组。遍历当前Object数组,如Object数组中已有了当前元素,则直接返回,如果没有则放入Object数组的尾部,并返回。从以上分析可见,CopyOnWriteArraySet在add时每次都要进行数组的遍历,因此其性能会低于CopyOnWriteArrayList。


public class DemoThread28 {

    public static void testOne() {

        CopyOnWriteArrayList<Integer> al = new CopyOnWriteArrayList<>();

        al.add(1);
        al.add(4);
        al.add(2);
        al.addIfAbsent(3);
        al.addIfAbsent(5);

        System.out.println(al); // [1, 4, 2, 3, 5]

        al.add(2);

        System.out.println(al); // [1, 4, 2, 3, 5, 2]
    }

    public static void testTwo() {

        CopyOnWriteArraySet<Integer> as = new CopyOnWriteArraySet<>();

        as.add(1);
        as.add(4);
        as.add(2);
        as.add(3);

        System.out.println(as); // [1, 4, 2, 3]

        as.add(2);
        System.out.println(as); // [1, 4, 2, 3]
    }

    public static void main(String[] args) {
        // testOne();
        testTwo();
    }
}

7. 并发-无阻塞队列

       ConcurrentLinkedQueue并发无阻塞队列,BlockingQueue并发阻塞队列,均实现自Queue接口。

       ConcurrentLinkedQueue无阻塞、无锁、高性能、无界、线程安全,性能优于BlockingQueue、不允许null值


public class DemoThread29 {

    public static void main(String[] args) {

        ConcurrentLinkedQueue<Integer> clq = new ConcurrentLinkedQueue<>();

        clq.add(1);
        clq.add(2);

        clq.offer(3);
        clq.offer(4);

        System.out.println(clq); // [1, 2, 3, 4]

        System.out.println(clq.peek()); // 1
        System.out.println(clq.size()); // 4

        System.out.println(clq.poll()); // 1
        System.out.println(clq.size()); // 3

        System.out.println(clq.poll()); // 2
        System.out.println(clq.poll()); // 3
        System.out.println(clq.poll()); // 4
        System.out.println(clq.size()); // 0

        System.out.println(clq.peek()); // null
        System.out.println(clq.poll()); // null
    }
}

8. 并发-阻塞队列

  • a) ArrayBlockingQueue

       基于数组实现的阻塞有界队列、创建时可指定长度,内部实现维护了一个定长数组用于缓存数据,内部没有采用读写分离,写入和读取数据不能同时进行,不允许null值。


public class DemoThread30 {

    public static void testAdd() {

        ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(3);

        abq.add(1);
        abq.offer(2);

        try {
            abq.put(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // abq.add(8); // 如果队列满了,则抛出异常。
        System.out.println(abq); // [1, 2, 3]

        abq.offer(4); // 如果队列满了,不阻塞,不抛出异常。
        System.out.println(abq); // [1, 2, 3]

        try {
            //可设置最大阻塞时间timeout。超时之后如果队列还是满的,不阻塞,不抛出异常。
            abq.offer(5, 2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(abq); // [1, 2, 3]

        try {
            abq.put(6); // 如果队列是满的,则永远阻塞下去。
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(abq); // 无输出...

        System.out.println("over"); // 无输出...
    }

    public static void testTakeOne() {

        ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(2);

        abq.add(2);
        abq.add(4);

        System.out.println(abq); // [2, 4]

        System.out.println(abq.peek()); // 2 获取头部元素不移除。
        System.out.println(abq); // [2, 4]

        System.out.println(abq.poll()); // 2 获取头部元素并移除。
        System.out.println(abq); // [4]

        try {
            System.out.println(abq.take()); // 4 获取头部元素并移除。
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(abq); // []

        System.out.println(abq.poll()); // null 如果队列为空,不阻塞,不抛异常。
        System.out.println(abq); // []

        try {
            // 可设置阻塞时间timeout。超时之后如果队列依然为空,不阻塞,不抛异常。
            System.out.println(abq.poll(2, TimeUnit.SECONDS)); // null
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(abq); // []

        try {
            System.out.println(abq.take()); // 如果队列为空,则永远阻塞,不抛异常。
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(abq); // 无输出...

        System.out.println("over"); // 无输出...
    }

    public static void testTakeTwo() {

        ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(2);

        abq.add(1);
        abq.add(2);

        ArrayList<Integer> al = new ArrayList<>();
        abq.drainTo(al, 1); // 取abq中指定个数的元素到al中,并从abq中移除。

        System.out.println(al); // [1]
        System.out.println(abq); // [2]
    }

    public static void testTakeThree() {

        ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(2);

        abq.add(1);
        abq.add(2);

        ArrayList<Integer> al = new ArrayList<>();
        abq.drainTo(al); // 取abq中全部元素到al中,并从abq中移除。
        System.out.println(abq); // []
        System.out.println(al); // [1, 2]

        ArrayList<Integer> arrayList = new ArrayList<>();
        abq.drainTo(arrayList); // 当队列为空时,不阻塞,不抛异常。
        System.out.println(abq); // []
        System.out.println(arrayList); // []
    }

    public static void main(String[] args) {
        // testAdd();
        // testTakeOne();
        // testTakeTwo();
        testTakeThree();
    }
}
  • b) LinkedBlockingQueue

       基于链表的阻塞队列,内部维护一个链表存储缓存数据,支持写入和读取的并发操作,创建时可指定长度也可以不指定,不指定时代表无界队列,不允许null值。


public class DemoThread31 {

    public static void testAdd() {

        LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<>(3);

        lbq.add(1);
        lbq.offer(2);

        try {
            lbq.put(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // lbq.add(8); // 如果队列满了,则抛出异常。
        System.out.println(lbq); // [1, 2, 3]

        lbq.offer(4); // 如果队列满了,不阻塞,不抛出异常。
        System.out.println(lbq); // [1, 2, 3]

        try {
            //可设置最大阻塞时间timeout。超时之后如果队列还是满的,不阻塞,不抛出异常。
            lbq.offer(5, 2, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(lbq); // [1, 2, 3]

        try {
            lbq.put(6); // 如果队列是满的,则永远阻塞下去。
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(lbq); // 无输出...

        System.out.println("over"); // 无输出...
    }

    public static void testTakeOne() {

        LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<>();

        lbq.add(2);
        lbq.add(4);

        System.out.println(lbq); // [2, 4]

        System.out.println(lbq.peek()); // 2 获取头部元素不移除。
        System.out.println(lbq); // [2, 4]

        System.out.println(lbq.poll()); // 2 获取头部元素并移除。
        System.out.println(lbq); // [4]

        try {
            System.out.println(lbq.take()); // 4 获取头部元素并移除。
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(lbq); // []

        System.out.println(lbq.poll()); // null 如果队列为空,不阻塞,不抛异常。
        System.out.println(lbq); // []

        try {
            // 可设置阻塞时间timeout。超时之后如果队列依然为空,不阻塞,不抛异常。
            System.out.println(lbq.poll(2, TimeUnit.SECONDS)); // null
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(lbq); // []

        try {
            System.out.println(lbq.take()); // 如果队列为空,则永远阻塞,不抛异常。
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(lbq); // 无输出...

        System.out.println("over"); // 无输出...
    }

    public static void testTakeTwo() {

        LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<>();

        lbq.add(1);
        lbq.add(2);

        ArrayList<Integer> al = new ArrayList<>();
        lbq.drainTo(al, 1); // 取abq中指定个数的元素到al中,并从abq中移除。

        System.out.println(al); // [1]
        System.out.println(lbq); // [2]
    }

    public static void testTakeThree() {

        LinkedBlockingQueue<Integer> lbq = new LinkedBlockingQueue<>();

        lbq.add(1);
        lbq.add(2);

        ArrayList<Integer> al = new ArrayList<>();
        lbq.drainTo(al); // 取abq中全部元素到al中,并从abq中移除。
        System.out.println(lbq); // []
        System.out.println(al); // [1, 2]

        ArrayList<Integer> arrayList = new ArrayList<>();
        lbq.drainTo(arrayList); // 当队列为空时,不阻塞,不抛异常。
        System.out.println(lbq); // []
        System.out.println(arrayList); // []
    }

    public static void main(String[] args) {
        // testAdd();
        // testTakeOne();
        // testTakeTwo();
        testTakeThree();
    }
}
  • c) SychronousQueue

       没有任何容量,必须先有线程先从队列中take,才能向queue中add数据,否则会抛出队列已满的异常。不能使用peek方法取数据,此方法底层没有实现,会直接返回null。


public class DemoThread32 {

    public static void testOne() {

        SynchronousQueue<Integer> sq = new SynchronousQueue<>();

        sq.add(1); // java.lang.IllegalStateException: Queue full
    }

    public static void testTwo() {

        SynchronousQueue<Integer> sq = new SynchronousQueue<>();

        try {
            sq.put(1); // 阻塞...
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void testThree() {

        SynchronousQueue<Integer> sq = new SynchronousQueue<>();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (true) {
                        System.out.println(sq.take());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    sq.put(1);
                    sq.put(2);
                    sq.put(3);
                    sq.put(4);

                    /*
                    sq.add(1);
                    Thread.sleep(1000);
                    sq.add(2);
                    Thread.sleep(1000);
                    sq.add(3);
                     */
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    public static void testFour() {

        SynchronousQueue<Integer> sq = new SynchronousQueue<>();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Integer v = sq.poll(2, TimeUnit.SECONDS);
                        if (v == null) {
                            System.out.println("null");
                            break;
                        } else {
                            System.out.println(v);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    sq.put(1);
                    sq.put(2);
                    sq.put(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

    public static void main(String[] args) {

        // testOne();
        // testTwo();
        testThree();
        // testFour();
    }
}
  • d) PriorityBlockingQueue

       一个无界阻塞队列,默认初始化长度11,也可以手动指定,但是队列会自动扩容。资源被耗尽时导致OutOfMemoryError。不允许使用null元素。不允许插入不可比较的对象(导致抛出ClassCastException), 加入的对象实现Comparable接口


class Demo implements Comparable<Demo> {

    private int id;
    private String name;

    public Demo(int id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public String toString() {
        return "Demo{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }

    @Override
    public int compareTo(Demo o) {
        if (this.id > o.id) {
            return 1;
        } else if (this.id < o.id) {
            return -1;
        } else {
            return 0;
        }
    }
}

public class DemoThread33 {

    /**
     * 队列中的数据不是按照顺序排列的
     */
    public static void testAdd() {

        PriorityBlockingQueue<Demo> pq = new PriorityBlockingQueue<>(); // 默认容量11

        pq.add(new Demo(3, "a"));
        pq.offer(new Demo(1, "b"));
        pq.put(new Demo(2, "c"));
        pq.offer(new Demo(4, "d"), 2, TimeUnit.SECONDS);

        System.out.println(pq); // [Demo{id=1, name='b'}, Demo{id=3, name='a'}, Demo{id=2, name='c'}, Demo{id=4, name='d'}]
    }

    /**
     * 队列中的元素按照顺序被取走
     */
    public static void testTake1() {

        PriorityBlockingQueue<Demo> pq = new PriorityBlockingQueue<>();

        pq.add(new Demo(3, "a"));
        pq.offer(new Demo(1, "b"));
        pq.put(new Demo(2, "c"));
        pq.offer(new Demo(4, "d"), 2, TimeUnit.SECONDS);

        try {
            System.out.println(pq.poll()); // Demo{id=1, name='b'}
            System.out.println(pq.poll(2, TimeUnit.SECONDS)); // Demo{id=2, name='c'}
            System.out.println(pq.take()); // Demo{id=3, name='a'}
            System.out.println(pq.take()); // Demo{id=4, name='d'}
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void testTake2() {

        PriorityBlockingQueue<Demo> pbq = new PriorityBlockingQueue<>();

        pbq.add(new Demo(3, "a"));
        pbq.add(new Demo(1, "b"));

        System.out.println(pbq); // [Demo{id=1, name='b'}, Demo{id=3, name='a'}]

        System.out.println(pbq.peek()); // Demo{id=1, name='b'}
        System.out.println(pbq); // [Demo{id=1, name='b'}, Demo{id=3, name='a'}]

        System.out.println(pbq.poll()); // Demo{id=1, name='b'}
        System.out.println(pbq);// [Demo{id=3, name='a'}]

        try {
            System.out.println(pbq.take()); // Demo{id=3, name='a'}
            System.out.println(pbq); // []
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(pbq.peek()); // null
        System.out.println(pbq.poll()); // null
        try {
            System.out.println(pbq.poll(2, TimeUnit.SECONDS)); //null
            System.out.println(pbq.take()); // 阻塞...
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("over"); // 无输出...
    }

    public static void testTake3() {

        PriorityBlockingQueue<Integer> pbq = new PriorityBlockingQueue<>();

        pbq.add(3);
        pbq.add(2);
        pbq.add(3);
        pbq.add(1);
        pbq.add(5);

        ArrayList<Integer> arrayList = new ArrayList<>();
        pbq.drainTo(arrayList, 2);
        System.out.println(arrayList); // [1, 2]
        System.out.println(pbq); // [3, 5, 3]

        Object[] list = arrayList.toArray();
        Arrays.sort(list); // [1, 2]
        System.out.println(Arrays.toString(list));
    }

    public static void testTake4() {

        PriorityBlockingQueue<Integer> pbq = new PriorityBlockingQueue<>();

        pbq.add(3);
        pbq.add(2);
        pbq.add(3);
        pbq.add(1);
        pbq.add(5);

        ArrayList<Integer> arrayList = new ArrayList<>();
        pbq.drainTo(arrayList);

        ArrayList<Integer> list = new ArrayList<>();
        pbq.drainTo(list);

        System.out.println(list); // []
        System.out.println(pbq); // []
    }

    public static void main(String[] args) {
        // testAdd();
        testTake1();
        // testTake2();
        // testTake3();
        // testTake4();
    }
}
  • e) DelayQueue

       DelayQueue:Delayed元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的getDelay(TimeUnit.NANOSECONDS)方法返回一个小于等于0的值时,将发生到期。即使无法使用take或poll移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size方法同时返回到期和未到期元素的计数。此队列不允许使用null元素。内部元素需实现Delayed接口


class User implements Delayed {

    private int id;
    private String name;
    private long endTime;

    public User(int id, String name, long endTime) {
        this.id = id;
        this.name = name;
        this.endTime = endTime;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getEndTime() {
        return endTime;
    }

    public void setEndTime(long endTime) {
        this.endTime = endTime;
    }

    @Override
    public String toString() {
        return name;
    }

    @Override
    public int compareTo(Delayed o) {

        User user = (User) o;

        if (this.getEndTime() > user.getEndTime()) {
            return 1;
        } else if (this.getEndTime() < user.getEndTime()) {
            return -1;
        } else {
            return 0;
        }
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return this.endTime - System.currentTimeMillis();
    }
}

/**
 * 到时间后强制用户下线功能模拟
 */
public class DemoThread34 {

    DelayQueue<User> dq = new DelayQueue<>();

    public void login(User user) {
        dq.add(user);
        System.out.println("login: " + user.getId() + "-" + user.getName() + "-" + user.getEndTime());
    }

    public void logout() {
        try {
            User user = dq.take(); // 此处不能使用dq.poll()[非阻塞式方法]
            System.out.println("logout: " + user.getId() + "-" + user.getName() + "-" + user.getEndTime());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int onlineSize() {
        return dq.size();
    }

    public DelayQueue<User> getUsers() {
        return dq;
    }

    public static void main(String[] args) {

        DemoThread34 demoThread34 = new DemoThread34();

        demoThread34.login(new User(1, "a", 3000 + System.currentTimeMillis()));
        demoThread34.login(new User(2, "b", 1000 + System.currentTimeMillis()));
        demoThread34.login(new User(3, "c", 2000 + System.currentTimeMillis()));

        System.out.println(demoThread34.getUsers());

        while (true) {
            demoThread34.logout();
            System.out.println(demoThread34.getUsers());
            if (demoThread34.onlineSize() == 0) {
                break;
            }
        }
    }
}
代码下载参见https://github.com/HengYk/ConcurrentDemo