第三章 JAVA 并发编程(精通篇之设计模式)
1. 单例模式
饿汉模式:类加载的时候,就进行对象的创建,系统开销较大,但是不存在线程安全问题。
懒汉模式:多数采用饿汉模式,在使用时才真正的创建单例对象,但是存在线程安全问题。
静态内部类单例:兼具懒汉模式和饿汉模式的优点。
- 饿汉示例
public class SingletonOne {
    private static SingletonOne singletonOne = new SingletonOne();
    private SingletonOne() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static SingletonOne getInstance() {
        return singletonOne;
    }
    @Override
    public String toString() {
        return "" + this.hashCode();
    }
}
- 懒汉示例(线程安全问题和解决方案)
- 懒汉示例(线程安全的性能优化)
public class SingletonTwo {
    private static SingletonTwo singletonTwo = null;
    private SingletonTwo() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    //线程安全 + 性能优化
    public /*synchronized*/ static SingletonTwo getInstance() {
        if (singletonTwo == null) {
            synchronized (SingletonTwo.class) {
                if (singletonTwo == null) {
                    singletonTwo = new SingletonTwo();
                }
            }
        }
        return singletonTwo;
    }
    @Override
    public String toString() {
        return "" + this.hashCode();
    }
}
- 静态内部类单例示例
public class SingletonThree {
    private static class Singleton{
        private static SingletonThree SingletonThree = new SingletonThree();
    }
    private SingletonThree() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static SingletonThree getInstance() {
        return Singleton.SingletonThree;
    }
    @Override
    public String toString() {
        return "" + this.hashCode();
    }
}
- 测试程序
public class SingletonTest {
    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i ++) {
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    SingletonThree singletonThree = SingletonThree.getInstance();
                    System.out.println(singletonThree);
                }
            });
        }
        fixedThreadPool.shutdown();
    }
}
2 Future模式
简单来说,客户端请求之后,先返回一个应答结果,然后异步的去准备数据,客户端可以先去处理其他事情,当需要最终结果的时候再来获取, 如果此时数据已经准备好,则将真实数据返回;如果此时数据还没有准备好,则阻塞等待。
public interface Data {
    String getResult();
}
public class RealData implements Data {
    private String data;
    public RealData(String data) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.data = data;
    }
    @Override
    public String getResult() {
        return data;
    }
}
public class FutureData implements Data {
    private RealData realData = null;
    private boolean isReady = false;
    public synchronized void setRealData(RealData realData) {
        if (isReady) return;
        this.realData = realData;
        isReady = true;
        notifyAll();
    }
    @Override
    public synchronized String getResult() {
        if (!isReady) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realData.getResult();
    }
}
public class Client {
    public FutureData request(String string) {
        FutureData futureData = new FutureData();
        new Thread(new Runnable() {
            @Override
            public void run() {
                RealData realData = new RealData(string);
                futureData.setRealData(realData);
            }
        }).start();
        return futureData;
    }
}
public class FutureTest {
    public static void main(String[] args) {
        Client client = new Client();
        Data data = client.request("!!!");
        System.out.println("1");
        String result = data.getResult();
        System.out.println("2");
        System.out.println(result);
    }
}
JDK的Concurrent包提供了Futrue模式的实现,可以直接使用。使用Futrue模式需要实现Callable接口,并使用FutureTask进行封装,使用线程池进行提交。
public class FutureTestTwo {
    public static void main(String[] args) {
        RealDataTwo realDataTwo = new RealDataTwo("yk");
        FutureTask<String> futureTask = new FutureTask<String>(realDataTwo);
        FutureTask<String> futureTask2 = new FutureTask<String>(realDataTwo);
        ExecutorService executorService = Executors.newFixedThreadPool(2);////注意线程数为1和为2时的区别(newFixedThreadPool特性)
        executorService.submit(futureTask);
        executorService.submit(futureTask2);
        System.out.println("执行其他业务逻辑");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            System.out.println("请求数据" + futureTask.get());
            System.out.println("请求数据" + futureTask2.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }
}
public class RealDataTwo implements Callable<String>{
    private String data;
    public RealDataTwo(String data) {
        this.data = data;
    }
    @Override
    public String call() throws Exception {
        System.out.println("后台业务逻辑--耗时操作");
        Thread.sleep(5000);
        return data;
    }
}
3. Producer-Consumer模式
Producer-Consumer称为生产者消费者模式,是消息队列中间件的核心实现模式,ActiveMQ、RocketMQ、Kafka、RabbitMQ。
public class Data {
    private String id;
    private String name;
    public Data(String id, String name) {
        this.id = id;
        this.name = name;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}
public class Data {
    private String id;
    private String name;
    public Data(String id, String name) {
        this.id = id;
        this.name = name;
    }
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}
public class Consumer implements Runnable{
    private String name;
    private BlockingQueue<Data> queue;
    public Consumer(String name, BlockingQueue<Data> queue) {
        this.name = name;
        this.queue = queue;
    }
    private static Random r = new Random();
    @Override
    public void run() {
         while (true) {
             try {
                 Data data = this.queue.poll(5, TimeUnit.SECONDS);
                 if (data == null) {
                     System.out.println("当前消费:" + name + ", 超过5s无法获取数据,退出监听");
                     return;
                 }
                 Thread.sleep(r.nextInt(1000));
                 System.out.println("当前消费者:" + name + ", 消费成功,消费数据为id:" + data.getId());
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
    }
}
public class Main {
    public static void main(String[] args) throws Exception{
        BlockingQueue<Data> queue = new LinkedBlockingQueue<>(1);
        Producer p1 = new Producer("p1", queue);
        Producer p2 = new Producer("p2", queue);
        Producer p3 = new Producer("p3", queue);
        Consumer c1 = new Consumer("c1", queue);
//        Consumer c2 = new Consumer("c2", queue);
//        Consumer c3 = new Consumer("c3", queue);
//        Consumer c4 = new Consumer("c4", queue);
//        Consumer c5 = new Consumer("c5", queue);
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        executorService.execute(p1);
        executorService.execute(p2);
        executorService.execute(p3);
        executorService.execute(c1);
//        executorService.execute(c2);
//        executorService.execute(c3);
//        executorService.execute(c4);
//        executorService.execute(c5);
        Thread.sleep(3000);
        p1.stop();
        p2.stop();
        p3.stop();
        executorService.shutdown();
    }
}
4. Master-Worker模式
Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。 客户端将所有任务提交给Master,Master分配Worker去并发处理任务,并将每一个任务的处理结果返回给Master,所有的任务处理完毕后,由Master进行结果汇总再返回给Client。
public class Task {
    private int id;
    private int price;
    public Task() {
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    }
}
public class Master {
    //任务列表
    private ConcurrentLinkedDeque<Task> taskQueue = new ConcurrentLinkedDeque<>();
    //存放worker的Map
    private HashMap<String, Thread> workers = new HashMap<>();
    //存放每一个worker的处理结果
    private ConcurrentHashMap<String, Object> taskResultMap = new ConcurrentHashMap<>();
    /**
     * 构造函数
     *
     * @param worker      worker处理类
     * @param workerCount worker的数量
     */
    public Master(Worker worker, int workerCount) {
        worker.setWorkQueue(this.taskQueue);
        worker.setTaskResultMap(this.taskResultMap);
        for (int i = 0; i < workerCount; i++) {
            this.workers.put(Integer.toString(i), new Thread(worker));
        }
    }
    public void submit(Task task) {
        this.taskQueue.add(task);
    }
    public void execute() {
        for (Map.Entry<String, Thread> me: workers.entrySet()) {
            me.getValue().start();
        }
    }
    public boolean isComplete() {
        for (Map.Entry<String, Thread> me: workers.entrySet()) {
            if (me.getValue().getState() != Thread.State.TERMINATED) {
                return false;
            }
        }
        return true;
    }
    public int getResult() {
        int priceResult = 0;
        for (Map.Entry<String, Object> me: taskResultMap.entrySet()) {
            priceResult += (int) me.getValue();
        }
        return priceResult;
    }
}
public class Worker implements Runnable{
    private ConcurrentLinkedDeque<Task> workQueue;
    private ConcurrentHashMap<String, Object> taskResultMap;
    public Worker() {
    }
    public void setWorkQueue(ConcurrentLinkedDeque<Task> workQueue) {
        this.workQueue = workQueue;
    }
    public void setTaskResultMap(ConcurrentHashMap<String, Object> taskResultMap) {
        this.taskResultMap = taskResultMap;
    }
    @Override
    public void run() {
        while (true) {
            Task task = this.workQueue.poll();
            if (task == null) {
                break;
            }
            System.out.println("Worker-" + Thread.currentThread().getName() + "-执行任务" + task.getId());
            //处理任务
            Object output = handle(task);
            this.taskResultMap.put(Integer.toString(task.getId()), output);
        }
    }
    private Object handle(Task input) {
        Object output = null;
        try {
            Thread.sleep(3000);
            output = input.getPrice() + 1;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return output;
    }
}
public class Main {
    public static void main(String[] args) {
        //Master master = new Master(new Worker(), 8);
        System.out.println(Runtime.getRuntime().availableProcessors());
        Master master = new Master(new Worker(), Runtime.getRuntime().availableProcessors());
        Random random = new Random();
        for (int i = 0; i <= 20; i ++) {
            Task task = new Task();
            task.setId(i);
            task.setPrice(random.nextInt(1000));
            master.submit(task);
        }
        master.execute();
        long start = System.currentTimeMillis();
        //循环检查并等待所有worker执行完毕
        while (true) {
            if (master.isComplete()) {
                long end = System.currentTimeMillis() - start;
                int priceResult = master.getResult();
                System.out.println("最终结果:" + priceResult + ",执行时间:" + end);
                break;
            }
        }
    }
}

 
      
      
