第三章 JAVA 并发编程(精通篇之设计模式)
1. 单例模式
饿汉模式:类加载的时候,就进行对象的创建,系统开销较大,但是不存在线程安全问题。
懒汉模式:多数采用饿汉模式,在使用时才真正的创建单例对象,但是存在线程安全问题。
静态内部类单例:兼具懒汉模式和饿汉模式的优点。
- 饿汉示例
public class SingletonOne {
private static SingletonOne singletonOne = new SingletonOne();
private SingletonOne() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static SingletonOne getInstance() {
return singletonOne;
}
@Override
public String toString() {
return "" + this.hashCode();
}
}
- 懒汉示例(线程安全问题和解决方案)
- 懒汉示例(线程安全的性能优化)
public class SingletonTwo {
private static SingletonTwo singletonTwo = null;
private SingletonTwo() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//线程安全 + 性能优化
public /*synchronized*/ static SingletonTwo getInstance() {
if (singletonTwo == null) {
synchronized (SingletonTwo.class) {
if (singletonTwo == null) {
singletonTwo = new SingletonTwo();
}
}
}
return singletonTwo;
}
@Override
public String toString() {
return "" + this.hashCode();
}
}
- 静态内部类单例示例
public class SingletonThree {
private static class Singleton{
private static SingletonThree SingletonThree = new SingletonThree();
}
private SingletonThree() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static SingletonThree getInstance() {
return Singleton.SingletonThree;
}
@Override
public String toString() {
return "" + this.hashCode();
}
}
- 测试程序
public class SingletonTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i ++) {
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
SingletonThree singletonThree = SingletonThree.getInstance();
System.out.println(singletonThree);
}
});
}
fixedThreadPool.shutdown();
}
}
2 Future模式
简单来说,客户端请求之后,先返回一个应答结果,然后异步的去准备数据,客户端可以先去处理其他事情,当需要最终结果的时候再来获取, 如果此时数据已经准备好,则将真实数据返回;如果此时数据还没有准备好,则阻塞等待。
public interface Data {
String getResult();
}
public class RealData implements Data {
private String data;
public RealData(String data) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
}
@Override
public String getResult() {
return data;
}
}
public class FutureData implements Data {
private RealData realData = null;
private boolean isReady = false;
public synchronized void setRealData(RealData realData) {
if (isReady) return;
this.realData = realData;
isReady = true;
notifyAll();
}
@Override
public synchronized String getResult() {
if (!isReady) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realData.getResult();
}
}
public class Client {
public FutureData request(String string) {
FutureData futureData = new FutureData();
new Thread(new Runnable() {
@Override
public void run() {
RealData realData = new RealData(string);
futureData.setRealData(realData);
}
}).start();
return futureData;
}
}
public class FutureTest {
public static void main(String[] args) {
Client client = new Client();
Data data = client.request("!!!");
System.out.println("1");
String result = data.getResult();
System.out.println("2");
System.out.println(result);
}
}
JDK的Concurrent包提供了Futrue模式的实现,可以直接使用。使用Futrue模式需要实现Callable接口,并使用FutureTask进行封装,使用线程池进行提交。
public class FutureTestTwo {
public static void main(String[] args) {
RealDataTwo realDataTwo = new RealDataTwo("yk");
FutureTask<String> futureTask = new FutureTask<String>(realDataTwo);
FutureTask<String> futureTask2 = new FutureTask<String>(realDataTwo);
ExecutorService executorService = Executors.newFixedThreadPool(2);////注意线程数为1和为2时的区别(newFixedThreadPool特性)
executorService.submit(futureTask);
executorService.submit(futureTask2);
System.out.println("执行其他业务逻辑");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println("请求数据" + futureTask.get());
System.out.println("请求数据" + futureTask2.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
public class RealDataTwo implements Callable<String>{
private String data;
public RealDataTwo(String data) {
this.data = data;
}
@Override
public String call() throws Exception {
System.out.println("后台业务逻辑--耗时操作");
Thread.sleep(5000);
return data;
}
}
3. Producer-Consumer模式
Producer-Consumer称为生产者消费者模式,是消息队列中间件的核心实现模式,ActiveMQ、RocketMQ、Kafka、RabbitMQ。
public class Data {
private String id;
private String name;
public Data(String id, String name) {
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
public class Data {
private String id;
private String name;
public Data(String id, String name) {
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
public class Consumer implements Runnable{
private String name;
private BlockingQueue<Data> queue;
public Consumer(String name, BlockingQueue<Data> queue) {
this.name = name;
this.queue = queue;
}
private static Random r = new Random();
@Override
public void run() {
while (true) {
try {
Data data = this.queue.poll(5, TimeUnit.SECONDS);
if (data == null) {
System.out.println("当前消费:" + name + ", 超过5s无法获取数据,退出监听");
return;
}
Thread.sleep(r.nextInt(1000));
System.out.println("当前消费者:" + name + ", 消费成功,消费数据为id:" + data.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Main {
public static void main(String[] args) throws Exception{
BlockingQueue<Data> queue = new LinkedBlockingQueue<>(1);
Producer p1 = new Producer("p1", queue);
Producer p2 = new Producer("p2", queue);
Producer p3 = new Producer("p3", queue);
Consumer c1 = new Consumer("c1", queue);
// Consumer c2 = new Consumer("c2", queue);
// Consumer c3 = new Consumer("c3", queue);
// Consumer c4 = new Consumer("c4", queue);
// Consumer c5 = new Consumer("c5", queue);
ExecutorService executorService = Executors.newFixedThreadPool(8);
executorService.execute(p1);
executorService.execute(p2);
executorService.execute(p3);
executorService.execute(c1);
// executorService.execute(c2);
// executorService.execute(c3);
// executorService.execute(c4);
// executorService.execute(c5);
Thread.sleep(3000);
p1.stop();
p2.stop();
p3.stop();
executorService.shutdown();
}
}
4. Master-Worker模式
Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。 客户端将所有任务提交给Master,Master分配Worker去并发处理任务,并将每一个任务的处理结果返回给Master,所有的任务处理完毕后,由Master进行结果汇总再返回给Client。
public class Task {
private int id;
private int price;
public Task() {
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public int getPrice() {
return price;
}
public void setPrice(int price) {
this.price = price;
}
}
public class Master {
//任务列表
private ConcurrentLinkedDeque<Task> taskQueue = new ConcurrentLinkedDeque<>();
//存放worker的Map
private HashMap<String, Thread> workers = new HashMap<>();
//存放每一个worker的处理结果
private ConcurrentHashMap<String, Object> taskResultMap = new ConcurrentHashMap<>();
/**
* 构造函数
*
* @param worker worker处理类
* @param workerCount worker的数量
*/
public Master(Worker worker, int workerCount) {
worker.setWorkQueue(this.taskQueue);
worker.setTaskResultMap(this.taskResultMap);
for (int i = 0; i < workerCount; i++) {
this.workers.put(Integer.toString(i), new Thread(worker));
}
}
public void submit(Task task) {
this.taskQueue.add(task);
}
public void execute() {
for (Map.Entry<String, Thread> me: workers.entrySet()) {
me.getValue().start();
}
}
public boolean isComplete() {
for (Map.Entry<String, Thread> me: workers.entrySet()) {
if (me.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
public int getResult() {
int priceResult = 0;
for (Map.Entry<String, Object> me: taskResultMap.entrySet()) {
priceResult += (int) me.getValue();
}
return priceResult;
}
}
public class Worker implements Runnable{
private ConcurrentLinkedDeque<Task> workQueue;
private ConcurrentHashMap<String, Object> taskResultMap;
public Worker() {
}
public void setWorkQueue(ConcurrentLinkedDeque<Task> workQueue) {
this.workQueue = workQueue;
}
public void setTaskResultMap(ConcurrentHashMap<String, Object> taskResultMap) {
this.taskResultMap = taskResultMap;
}
@Override
public void run() {
while (true) {
Task task = this.workQueue.poll();
if (task == null) {
break;
}
System.out.println("Worker-" + Thread.currentThread().getName() + "-执行任务" + task.getId());
//处理任务
Object output = handle(task);
this.taskResultMap.put(Integer.toString(task.getId()), output);
}
}
private Object handle(Task input) {
Object output = null;
try {
Thread.sleep(3000);
output = input.getPrice() + 1;
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}
}
public class Main {
public static void main(String[] args) {
//Master master = new Master(new Worker(), 8);
System.out.println(Runtime.getRuntime().availableProcessors());
Master master = new Master(new Worker(), Runtime.getRuntime().availableProcessors());
Random random = new Random();
for (int i = 0; i <= 20; i ++) {
Task task = new Task();
task.setId(i);
task.setPrice(random.nextInt(1000));
master.submit(task);
}
master.execute();
long start = System.currentTimeMillis();
//循环检查并等待所有worker执行完毕
while (true) {
if (master.isComplete()) {
long end = System.currentTimeMillis() - start;
int priceResult = master.getResult();
System.out.println("最终结果:" + priceResult + ",执行时间:" + end);
break;
}
}
}
}