HengYk Blog

个人站

具有浪漫主义情调的理想主义务实青年


Java Concurrent P5

第三章 JAVA 并发编程(精通篇之设计模式)

1. 单例模式

       饿汉模式:类加载的时候,就进行对象的创建,系统开销较大,但是不存在线程安全问题。
       懒汉模式:多数采用饿汉模式,在使用时才真正的创建单例对象,但是存在线程安全问题。
       静态内部类单例:兼具懒汉模式和饿汉模式的优点。


  • 饿汉示例
public class SingletonOne {

    private static SingletonOne singletonOne = new SingletonOne();

    private SingletonOne() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static SingletonOne getInstance() {
        return singletonOne;
    }

    @Override
    public String toString() {
        return "" + this.hashCode();
    }
}
  • 懒汉示例(线程安全问题和解决方案)
  • 懒汉示例(线程安全的性能优化)
public class SingletonTwo {

    private static SingletonTwo singletonTwo = null;

    private SingletonTwo() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    //线程安全 + 性能优化
    public /*synchronized*/ static SingletonTwo getInstance() {
        if (singletonTwo == null) {
            synchronized (SingletonTwo.class) {
                if (singletonTwo == null) {
                    singletonTwo = new SingletonTwo();
                }
            }
        }
        return singletonTwo;
    }

    @Override
    public String toString() {
        return "" + this.hashCode();
    }
}
  • 静态内部类单例示例
public class SingletonThree {

    private static class Singleton{
        private static SingletonThree SingletonThree = new SingletonThree();
    }

    private SingletonThree() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static SingletonThree getInstance() {
        return Singleton.SingletonThree;
    }

    @Override
    public String toString() {
        return "" + this.hashCode();
    }
}
  • 测试程序
public class SingletonTest {

    public static void main(String[] args) {

        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i ++) {
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    SingletonThree singletonThree = SingletonThree.getInstance();
                    System.out.println(singletonThree);
                }
            });
        }

        fixedThreadPool.shutdown();
    }
}

2 Future模式

       简单来说,客户端请求之后,先返回一个应答结果,然后异步的去准备数据,客户端可以先去处理其他事情,当需要最终结果的时候再来获取, 如果此时数据已经准备好,则将真实数据返回;如果此时数据还没有准备好,则阻塞等待。


public interface Data {
    String getResult();
}
public class RealData implements Data {

    private String data;

    public RealData(String data) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.data = data;
    }

    @Override
    public String getResult() {
        return data;
    }
}
public class FutureData implements Data {

    private RealData realData = null;
    private boolean isReady = false;

    public synchronized void setRealData(RealData realData) {
        if (isReady) return;

        this.realData = realData;
        isReady = true;
        notifyAll();
    }

    @Override
    public synchronized String getResult() {
        if (!isReady) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return realData.getResult();
    }
}
public class Client {

    public FutureData request(String string) {

        FutureData futureData = new FutureData();

        new Thread(new Runnable() {
            @Override
            public void run() {
                RealData realData = new RealData(string);
                futureData.setRealData(realData);
            }
        }).start();

        return futureData;
    }
}
public class FutureTest {

    public static void main(String[] args) {

        Client client = new Client();
        Data data = client.request("!!!");
        System.out.println("1");

        String result = data.getResult();
        System.out.println("2");
        System.out.println(result);
    }
}

       JDK的Concurrent包提供了Futrue模式的实现,可以直接使用。使用Futrue模式需要实现Callable接口,并使用FutureTask进行封装,使用线程池进行提交。


public class FutureTestTwo {

    public static void main(String[] args) {

        RealDataTwo realDataTwo = new RealDataTwo("yk");
        FutureTask<String> futureTask = new FutureTask<String>(realDataTwo);
        FutureTask<String> futureTask2 = new FutureTask<String>(realDataTwo);

        ExecutorService executorService = Executors.newFixedThreadPool(2);////注意线程数为1和为2时的区别(newFixedThreadPool特性)

        executorService.submit(futureTask);
        executorService.submit(futureTask2);

        System.out.println("执行其他业务逻辑");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println("请求数据" + futureTask.get());
            System.out.println("请求数据" + futureTask2.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        executorService.shutdown();
    }
}
public class RealDataTwo implements Callable<String>{

    private String data;

    public RealDataTwo(String data) {
        this.data = data;
    }

    @Override
    public String call() throws Exception {
        System.out.println("后台业务逻辑--耗时操作");
        Thread.sleep(5000);
        return data;
    }
}

3. Producer-Consumer模式

       Producer-Consumer称为生产者消费者模式,是消息队列中间件的核心实现模式,ActiveMQ、RocketMQ、Kafka、RabbitMQ。


public class Data {

    private String id;
    private String name;

    public Data(String id, String name) {
        this.id = id;
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
public class Data {

    private String id;
    private String name;

    public Data(String id, String name) {
        this.id = id;
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
public class Consumer implements Runnable{

    private String name;
    private BlockingQueue<Data> queue;

    public Consumer(String name, BlockingQueue<Data> queue) {
        this.name = name;
        this.queue = queue;
    }

    private static Random r = new Random();

    @Override
    public void run() {
         while (true) {
             try {
                 Data data = this.queue.poll(5, TimeUnit.SECONDS);
                 if (data == null) {
                     System.out.println("当前消费:" + name + ", 超过5s无法获取数据,退出监听");
                     return;
                 }

                 Thread.sleep(r.nextInt(1000));

                 System.out.println("当前消费者:" + name + ", 消费成功,消费数据为id:" + data.getId());
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
    }
}
public class Main {

    public static void main(String[] args) throws Exception{

        BlockingQueue<Data> queue = new LinkedBlockingQueue<>(1);

        Producer p1 = new Producer("p1", queue);
        Producer p2 = new Producer("p2", queue);
        Producer p3 = new Producer("p3", queue);

        Consumer c1 = new Consumer("c1", queue);
//        Consumer c2 = new Consumer("c2", queue);
//        Consumer c3 = new Consumer("c3", queue);
//        Consumer c4 = new Consumer("c4", queue);
//        Consumer c5 = new Consumer("c5", queue);

        ExecutorService executorService = Executors.newFixedThreadPool(8);

        executorService.execute(p1);
        executorService.execute(p2);
        executorService.execute(p3);

        executorService.execute(c1);
//        executorService.execute(c2);
//        executorService.execute(c3);
//        executorService.execute(c4);
//        executorService.execute(c5);

        Thread.sleep(3000);

        p1.stop();
        p2.stop();
        p3.stop();

        executorService.shutdown();
    }
}

4. Master-Worker模式

       Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。 客户端将所有任务提交给Master,Master分配Worker去并发处理任务,并将每一个任务的处理结果返回给Master,所有的任务处理完毕后,由Master进行结果汇总再返回给Client。


public class Task {

    private int id;
    private int price;

    public Task() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }
}
public class Master {

    //任务列表
    private ConcurrentLinkedDeque<Task> taskQueue = new ConcurrentLinkedDeque<>();

    //存放worker的Map
    private HashMap<String, Thread> workers = new HashMap<>();

    //存放每一个worker的处理结果
    private ConcurrentHashMap<String, Object> taskResultMap = new ConcurrentHashMap<>();

    /**
     * 构造函数
     *
     * @param worker      worker处理类
     * @param workerCount worker的数量
     */
    public Master(Worker worker, int workerCount) {

        worker.setWorkQueue(this.taskQueue);
        worker.setTaskResultMap(this.taskResultMap);

        for (int i = 0; i < workerCount; i++) {
            this.workers.put(Integer.toString(i), new Thread(worker));
        }
    }

    public void submit(Task task) {
        this.taskQueue.add(task);
    }

    public void execute() {
        for (Map.Entry<String, Thread> me: workers.entrySet()) {
            me.getValue().start();
        }
    }

    public boolean isComplete() {
        for (Map.Entry<String, Thread> me: workers.entrySet()) {
            if (me.getValue().getState() != Thread.State.TERMINATED) {
                return false;
            }
        }
        return true;
    }

    public int getResult() {
        int priceResult = 0;
        for (Map.Entry<String, Object> me: taskResultMap.entrySet()) {
            priceResult += (int) me.getValue();
        }
        return priceResult;
    }
}
public class Worker implements Runnable{

    private ConcurrentLinkedDeque<Task> workQueue;

    private ConcurrentHashMap<String, Object> taskResultMap;

    public Worker() {

    }

    public void setWorkQueue(ConcurrentLinkedDeque<Task> workQueue) {
        this.workQueue = workQueue;
    }

    public void setTaskResultMap(ConcurrentHashMap<String, Object> taskResultMap) {
        this.taskResultMap = taskResultMap;
    }

    @Override
    public void run() {

        while (true) {
            Task task = this.workQueue.poll();
            if (task == null) {
                break;
            }
            System.out.println("Worker-" + Thread.currentThread().getName() + "-执行任务" + task.getId());

            //处理任务
            Object output = handle(task);
            this.taskResultMap.put(Integer.toString(task.getId()), output);
        }
    }

    private Object handle(Task input) {
        Object output = null;
        try {
            Thread.sleep(3000);
            output = input.getPrice() + 1;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return output;
    }
}
public class Main {

    public static void main(String[] args) {

        //Master master = new Master(new Worker(), 8);

        System.out.println(Runtime.getRuntime().availableProcessors());
        Master master = new Master(new Worker(), Runtime.getRuntime().availableProcessors());

        Random random = new Random();
        for (int i = 0; i <= 20; i ++) {
            Task task = new Task();
            task.setId(i);
            task.setPrice(random.nextInt(1000));
            master.submit(task);
        }

        master.execute();

        long start = System.currentTimeMillis();

        //循环检查并等待所有worker执行完毕
        while (true) {
            if (master.isComplete()) {
                long end = System.currentTimeMillis() - start;
                int priceResult = master.getResult();
                System.out.println("最终结果:" + priceResult + ",执行时间:" + end);
                break;
            }
        }
    }
}
代码下载参见https://github.com/HengYk/ConcurrentDemo