Yuchuan Weng.Miko Tech-Blog.

Java并发编程笔记(六)并行模式与算法

2017/09/07 Share

单例模式

除了懒汉式、饿汉式外、线程安全可以创建一个静态内部类来维护单例。

1
2
3
4
5
6
7
8
9
10
11
public class SingletonDemo{
private SingletonDemo(){}
//私有静态内部类来维护单例
private static class LoadInstance(){
private static SingletonDemo single = new SingletonDemo();
}
//公共方法供外部访问
public static SingletonDemo getInstance(){
return LoadInstance.single;
}
}

利用虚拟机类的初始化机制创建单例,只能对loadInstance进行初始化。

不变模式

Java中不变模式总是多线程友好的,它的核心思想是尽可能地去除这些同步操作,提高并行程序性能,可以使用一种不可改变的对象,依靠对象的不变性,确保其在没有同步操作的多线程环境下依然保持多线程状态的一致性和正确性。这就是不变模式。
一个对象一旦被创建,它的内部状态将永远不会改变,需要注意的是它比只读属性具有更强的一致性和不变性,只读属性自身对自身状态可以修改。因此不变模式主要满足以下两个条件:

  • 当对象创建后 ,其内部状态和数据不再发生任何变化
  • 对象需要被共享,被多线程频繁访问

在Java中需要满足:

  • 去除setter方法以及可以修改的方法
  • 将所有属性设置为final 类也是
  • 确保没有子类可以重载(final class)
  • 有一个可以创建完整对象的构造函数

在不变模式中,根据里氏代换原则,父类是不变的对象子类也必须是不变的对象,但是我们无法控制这一点因此设为final的。JDK中不变模式应用最广泛的是java.lang.String类以及基本类型的包装类,所有实例方法均不需要进行同步操作。不变模式通过回避问题来达到多线程 线程安全的一个操作。

生产者-消费者模式

该模式是一种多线程经典的设计模式,通常有若干个生产者线程来负责提交用户请求,若干个消费者线程来处理用户请求,生产者和消费者之间通过共享缓冲区进行通信,(旨在避免生产者消费者直接通信 解耦二者关系),由于缓冲区的存在,就允许生产者和消费者在某一局部内速度高于消费者,允许生产者和消费者在执行速度上存在时间差。
内存缓冲区类似阻塞队列BlockingQueue,无锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 多线程
* 生产者-生产者模型
* 用于处理用户提交的请求
* @author 圈圈
* @version 1.0
*/
public class Producer implements Runnable {
//设置一致性标记
private volatile boolean isRunning = true;
//定义缓冲区资源
private BlockingQueue<PCData> queue ;
//定义原子操作数
private static AtomicInteger count = new AtomicInteger();

//构造函数传入阻塞队列
public Producer(BlockingQueue<PCData> queue){
this.queue=queue;
}


@Override
public void run() {
// TODO Auto-generated method stub
PCData pc = null;
Random r = new Random();
System.out.println("Start producer id="+Thread.currentThread().getId());
while(isRunning){
try {
Thread.sleep(2000);
pc = new PCData(count.incrementAndGet()+10); //构造任务数据
System.out.println("data"+pc+"is into the queue");
//如果队列放不下的话 (会执行入队操作)
if(!queue.offer(pc, 2, TimeUnit.SECONDS)){
System.out.println("failed to put data :"+pc);
}
} catch (InterruptedException e) {
e.printStackTrace();
//设置中断标记
Thread.currentThread().interrupt();
}
}
}

public void stopThread(){
isRunning=false;
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 多线程
* 生产者-消费者模型
* @author 圈圈
* @version 1.0
*/
public class Consumer implements Runnable {
private BlockingQueue<PCData> queue ;


public Consumer(BlockingQueue<PCData> queue) {
this.queue = queue;
}


@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getId()+"Begin to Consumer!!");
Random r = new Random();
while(true){
try {
PCData take = queue.take();
if(take!=null){
double result = take.getData()/10+r.nextDouble();
System.out.println("消费者获得数据后执行除法结果为:"+result);
Thread.sleep(2000);
}else{
System.out.println("阻塞队列中暂无数据");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
}

数据共享对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 数据模型
* 为了应证不变模式 因此使用 不可变对象 PCData
* @author 圈圈
* @version 1.0
*/
public final class PCData {
private final int data;

public PCData(int d) {
super();
data = d;
}

public PCData(String data) {
this.data = Integer.valueOf(data);
}

public int getData() {
return data;
}

@Override
public String toString() {
return "PCData [data=" + data + "]";
}

生产者消费者执行类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 生产者消费者模式
* 执行类
* @author 圈圈
* @version 1.0
*/
public class App {

public static void main(String[] args) throws InterruptedException {
BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>();
//启动线程池
ExecutorService pool = Executors.newCachedThreadPool();
//构建生产者模型与消费者模型
Producer pro1 = new Producer(queue);
Producer pro2= new Producer(queue);

Consumer con1 = new Consumer(queue);
Consumer con2= new Consumer(queue);
Consumer con3 = new Consumer(queue);
pool.execute(pro1);
pool.execute(pro2);
pool.execute(con1);
pool.execute(con2);
pool.execute(con3);
Thread.sleep(2000);
System.out.println("现在停止生产者");
pro1.stopThread();
pro2.stopThread();
Thread.sleep(5000);
pool.shutdown();
}
}

生产者消费者模型很好地对生产者和消费者线程进行解耦,通过一个共享的缓冲区进行数据交换,允许生产者和消费者线程在性能上的执行差异,从一定程度上缓解了性能瓶颈对系统的影响。

高性能的生产者-消费者:无锁的实现

BlockingQueue不适合高并发场合,因为其内部通过锁实现的阻塞队列,所以我们可以将其更改为无锁的阻塞队列 内部都是通过CAS操作完成的ConcurrentLinkedQueue,但是使用CAS编程是比较困难的,但有一个好消息是目前有一个现成的Disruptor框架,它已经帮助我们实现了这一功能。

无锁的缓存框架 Disruptor

Disruptor是由LMAX公司开发的一款高效的无锁队列。它使用无锁的方式实现了一个环形队列,非常适用于实现生产者-消费者模式。比如事件和消息的发布,在Disruptor中别出心裁地使用了环形队列来代替普通线形队列,这个环形队列内部实现为一个普通的数组。环形队列避免了头部指针和尾部指针的操作,只要有一个cursor指针即可控制入队出队操作,但是要指定队列大小,要求我们大小必须为2的整数次方,这样通过sequence & (sequence -1) 与操作的二进制数是一个全1的数字,不会有任何一位是浪费的。这种固定大小的环形队列的另外一个好处就是可以做到完全的内存复用。在系统运行过程中不会有心的空间需要分配或者老的空间回收,极大减少分配空间和回收空间带来的额外开销。
1、导入Maven Disruptor 3.x版本
2、直接使用 按照如下步骤
消费者类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Disruptor 无锁缓存框架
* 消费者类
* @author 圈圈
* @version 1.0
*/
public class Consumer implements WorkHandler<PCData> {
/**
* 消费者直接实现WorkHandler接口
* 数据的读取已由框架进行封装
*/
@Override
public void onEvent(PCData event) throws Exception {
System.out.println("Consumer :"+Thread.currentThread().getId()+" 取到数据平方值是:"+event.getData()*event.getData());
}

实例工厂类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* PCData工厂类 用于协助Disruptor框架系统初始化时 构造所有的缓冲区中的对象实例
* 需要实现Disruptor工厂接口
* @author 圈圈
* @version 1.0
*/
public class PCDataFactory implements EventFactory<PCData>{

@Override
public PCData newInstance() {
// TODO Auto-generated method stub
return new PCData();
}
}

共享实体类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Disruptor共享数据模型
* PCData
* @author 圈圈
* @version 1.0
*/
public final class PCData {
private long data;
public PCData() {
super();
}
public void setData(long data) {
this.data = data;
}


public long getData() {
return data;
}
@Override
public String toString() {
return "PCData [data=" + data + "]";
}
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Disruptor 无锁缓存框架
* 生产者类
* @author 圈圈
* @version 1.0
*/
public class Producer {
//定义环形缓冲区
private final RingBuffer<PCData> ringBuff;

public Producer(RingBuffer<PCData> buff) {
super();
this.ringBuff = buff;
}

//数据入缓冲区 bytebuffer 可以用来包装任意类型
public void pushData(ByteBuffer buff){
//通过next方法获取下一个可用的序列号
long sequence = ringBuff.next();
try{
//取得一个可用的空闲 PCData
PCData pcData = ringBuff.get(sequence);
//获取buff中数据并设置进空闲对象
pcData.setData(buff.getLong(0));
}finally{
//发布到缓冲区 只有发布才能在消费者处看见
ringBuff.publish(sequence);
}
};
}

执行类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Disruptor 构造类执行
* @author 圈圈
* @version 1.0
*/
public class App {

public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newCachedThreadPool();
PCDataFactory fac = new PCDataFactory();
int ringBufferSize = 1024;
//构建Disruptor对象 指明创建实例的工厂 以及固定环形链表的大小 线程池 以及生产者类型和阻塞策略
Disruptor<PCData> disruptor = new Disruptor<PCData>(fac, ringBufferSize, pool,ProducerType.MULTI,new BlockingWaitStrategy());
//构建4个消费者线程 并启动
disruptor.handleEventsWithWorkerPool(
new Consumer(),
new Consumer(),
new Consumer()
,new Consumer());
disruptor.start();
//构建环形数据缓冲区
RingBuffer<PCData> ringBuffer = disruptor.getRingBuffer();
//构建可以存入任意类型的指定大小的byte buffer
ByteBuffer buf = ByteBuffer.allocate(8);
//构建生产者对象
Producer pro = new Producer(ringBuffer);
for(long l=0;l<=10;l++){
System.out.println("放入的数据为:"+l);
//循环往bytebuff中写入
buf.putLong(0, l);
//放入生产者方法 4个消费者循环去读(CAS环形链表)
pro.pushData(buf);
Thread.sleep(4000);
}
}
}

提高消费者响应时间:选择合适的策略

当新数据在Disruptor的环形缓冲区产生时,消费者如何知道这些新数据呢?通过几种策略对waitStrategy接口进行封装。

  • BlockingWaitStrategy:这是默认的策略,和阻塞队列非常的类似,它们使用锁和条件(Condition)进行数据的监控和线程的唤醒。因为涉及到线程的切换,BlockingWaitStrategy策略是最节省CPU的,高并发下性能是糟糕的。
  • SleepingWaitStrategy:这个策略也是对CPU使用率非常保守的,它会在循环中不断等待数据,它会先自旋等待如果不成功则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1)进行休眠,这样降低对CPU的占用,适用于数据处理可能会产生比较高的平均延时,好处是对生产者影响最小,典型应用为异步日志。
  • YieldingWaitStrategy:这个策略适用于低延时的场合,消费者线程会不断地循环监控缓冲区变化,在循环内部通过Thread.yield()方法谦让出CPU给别的线程执行,(相当于死循环式地执行),但是这里启动四个消费者线程,你的CPU必须要双核四线程了,否则整个应用都会受影响。
  • BusySpinWaitStrategy:这个是最疯狂的等待策略了。它就是一个死循环,消费者线程会尽最大努力疯狂监控缓冲区的变化。因此它会吃掉你所有CPU资源。你只有在对延时非常苛刻的场景中使用它,你开启了一个死循环监控,因此你的物理CPU数量必须要大于消费者线程数。

CPU Cache的优化:解决伪共享问题

为了提高CPU的速度,CPU有一个高速缓存Cache 在高速缓存中,读写数据最小单位为缓存行,它是从主存复制到缓存的最小单位,一般为32字节到128字节。两个不同变量放入同一个缓存行可能会导致后者使得前者缓存行失效 导致Cache无法命中,所以在X变量的前后插入都先占据一定的位置(padding)这样当内存被读入缓存 只有x一个变量是实际有效的,避免多线程同时修改不同变量导致全体变量的缓存失效的情况。
this.entries = new Object[sequencer.getBufferSize() + 2* BUFFER_PAD ]

Future模式

Future模式是多线程开发中一种常见的设计模式,它的核心思想就是异步调用。当我们需要调用一个函数的方法时,其执行的很慢 我们可以让它在后台慢慢执行,我们先去处理其他事在真正需要数据的场合再进行获取。对于Future模式会返回给你一个契约,最后凭借契约去获取你需要的值。JDK内部已经为我们提供了一套Future模式的完整实现FutureTask。Callable接口有一个方法call(),它会返回给需要构造的实际数据,这个接口的泛型通常指定自己需要构建的业务对象,下面展示JDK Future模式下的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Future模式
* 异步调用构造对象
* 使用JDK的Future模式 本类要实现Callable接口
* @author 圈圈
* @version 1.0
*/
public class RealData implements Callable<String>{
private String data;
public RealData(String data) {
super();
this.data = data;
}
//这里模拟对象的构造过程
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
StringBuffer sb = new StringBuffer();
for(int i=0;i<=100;i++){
Thread.sleep(100);
sb.append(data+" 编号:"+i);
}
return sb.toString();
}

}

FutureMain执行类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* Future模式执行类
* @author 圈圈
* @version 1.0
*/
public class FutureMain {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//构建FutureTask类 表示这个任务是有返回值的
FutureTask<String> task = new FutureTask<>(new RealData("RealData"));
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(task);
System.out.println("请求获取数据");
Thread.sleep(2000);
/**
* 关于Future 模式的 FutureTask类的几个方法如下:
* boolean cancel(flag)是否取消任务
* boolean isCancelled() 是否已取消
* boolean isDone() 是否已完成
* get() 获取对象
* get(long timeOut,TimeUnit) 等待指定的时间获取对象
*/
if(task.isDone()) {System.out.println("数据成功获取到:");}else{
System.out.println("数据还在构造Ing");
}
//会一直阻塞直至任务完成
System.out.println("RealData="+task.get());
}
}

并行流水线

一些涉及数据计算的流程是无法完美并行的,例如(A+B)*B/2 如果不计算出(A+B)是无法得到后续操作的。所以我们可以借鉴流水线的思想。上述过程可以转变为

P1:C=A+B
P2:D=C*B
P3:D=D/2

上述的P1、P2、P3均在单独线程中计算,并且每个线程只负责自己的 工作。P3就是最终的结果。为了实现我们需要定义一个信息交换的载体。
公共数据类(几个未知变量定义几个):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* PipelineMsg 并行流水线
* 实现 (B+C)*B/2 的分工合作的结果
* 为了实现这个功能 我们需要定义一个在线程间
* 携带结果的共享信息类
* @author 圈圈
* @version 1.0
*/
public class PipelineMsg {
public double i;
public double j;
//用于描述两个变量的信息
public String orgStr = null;
}

第一条流水线为加法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 并行流水线 加法操作
* @author 圈圈
* @version 1.0
*/
public class Plus implements Runnable{
public static BlockingQueue<PipelineMsg> bq = new LinkedBlockingQueue<PipelineMsg>();
@Override
public void run() {
while(true){
PipelineMsg take;
try {
take = bq.take();
take.j=take.i+take.j; //模拟B=(A+B)
Mutilply.bq.add(take); //流水线添加到下一个执行流程队列中
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

第二条流水线为乘法 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Mutilply implements Runnable{
public static BlockingQueue<PipelineMsg> bq = new LinkedBlockingQueue<PipelineMsg>();
@Override
public void run() {
while(true){
try {
PipelineMsg take = bq.take();
take.i=take.j*take.i; //模拟 (a+b)*a
Divide.bq.add(take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

第三条流水线为除法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 最终流水线-除法
* @author 圈圈
* @version 1.0
*/
public class Divide implements Runnable{
public static BlockingQueue<PipelineMsg> bq = new LinkedBlockingQueue<PipelineMsg>();
public void run() {
while(true){
try {
PipelineMsg take = bq.take();
take.i=take.i/2; //模拟 /2
System.out.println("The pipeline final result is "+take.orgStr+"="+ take.i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

测试应用类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 并行流水线执行类
* 模拟(A+B)*A/2
* 分为 B=A+B
* C=B*A
* D=C/2
* @author 圈圈
* @version 1.0
*/
public class App {
public static void main(String[] args) {
//启动流水线线程
new Thread(new Plus()).start();
new Thread(new Mutilply()).start();
new Thread(new Divide()).start();
//启动10000个线程执行流水线
for(int i=1;i<=100;i++){
for(int j=1;j<=100;j++){
PipelineMsg msg = new PipelineMsg();
msg.i=i;
msg.j=j;
msg.orgStr="("+i+"+"+j+")*"+i+"/2";
//放入第一条流水线
Plus.bq.add(msg);
}
}
}
}

在多核或者分布式的场合这种设计思路可以有效地将依赖关系的操作分配在不同线程中进行计算,尽可能地利用多核优势。

并行搜索

搜索几乎是每一个软件都不可缺少的部分,对于有序数据通常采用二分法查找,对无序数据只能挨个查找。多线程一种简单策略就是对数组按照线程数对数组进行分割,每个线程各自搜索,当其中有一个搜索到数据后,立即返回结果即可。

并行排序

串行排序改为并行排序算法的话并非易事,我们这里介绍几种简单的 也足以让人脑洞大开的平行排序算法。

分离数据相关性:奇偶交换排序

在介绍奇偶排序前 先介绍一下冒泡排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//先介绍一下冒泡排序 
public static void bubbleSort(int arr[]){
for(int i=arr.length-1;i>0;i--){
for(int j=0;j<i;j++){
if(arr[j]>arr[j+1]){
int temp = arr[j];
arr[j]=arr[j+1];
arr[j+1]=temp;
}
}
}
System.out.print("BubbleSort Result is : ");
for(int a :arr){
System.out.print(a+"、");
}
}

奇偶交换排序它将排序过程分为两个阶段,奇交换和偶交换。 奇交换它总是比较奇数索引已及其相邻的后续元素,而偶交换则是比较偶数索引以及其相邻的后续元素,并且奇交换和偶交换会成对出现,这样才能保证比较和交换涉及到数组的每一个元素。由此分离的思想 ,所有比较和交换是没有数据相关性的,这种就可以并行化了。

下面是奇偶交换排序的串行实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void oddEvenSort(int[] arr){
int exeFlag=1,start=0;
while(exeFlag==1||start==1){
//执行标记设为0 start标记用于表示奇交换还是偶交换
//如果上一次比较发生了数据交换 或者当前进行的是奇交换 那么就继续循环
//直到程序不再交换 并且发生了偶交换为止
exeFlag=0;
for(int i=start;i<arr.length-1;i+=2){
if(arr[i]>arr[i+1]){
int temp =arr[i];
arr[i]=arr[i+1];
arr[i+1]=temp;
//执行标记设为1
exeFlag=1;
}
}
//如果偶排序进行完了 那么进行奇排序
if(start==0)
start=1;
else
//奇排序进行完了 就GG
start=0;
}
System.out.print("\n奇偶排序: ");
for(int a :arr){
System.out.print(a+"、");
}
}

上述代码可以使用CountDownLatch方法进行计数器减少操作。等待一个线程执行完任务计数器减少1。

并行模式的奇偶交换排序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//无锁实现的integer 保证多线程操作下变量的原子性
static AtomicInteger exechFlag =new AtomicInteger(1);
public static class ConcurrentOddEventSort implements Runnable{
//定义角标和批倒计时器命令
int i;
CountDownLatch latch;
static int arr[];
public ConcurrentOddEventSort(int i, CountDownLatch latch) {
super();
this.i = i;
this.latch = latch;
}
public void run() {
// TODO Auto-generated method stub
if(arr[i]>arr[i+1]){
int temp = arr[i];
arr[i]=arr[i+1];
arr[i+1]=temp;
exechFlag.set(1);
}
//一个线程执行完计数器减少1
latch.countDown();
}

}

Main方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//奇偶交换排序 并行:
int start = 0;
ConcurrentOddEventSort.arr=arr1;
ExecutorService pool = Executors.newCachedThreadPool();
while(exechFlag.get()==1||start==1){
exechFlag.set(0);
//偶数的数组长度 当start为1 时 只有 len/2-1个线程
CountDownLatch latch = new CountDownLatch(ConcurrentOddEventSort.arr.length/2-(ConcurrentOddEventSort.arr.length%2==0?start:0));
for(int i=start;i<ConcurrentOddEventSort.arr.length;i++){
//提交线程
pool.submit(new ConcurrentOddEventSort(i, latch));
}
//等待所有线程执行完
latch.await();
if(start==0)start=1;
else start=0;
}
System.out.print("\n并发下的排序数组:");
for(int a :ConcurrentOddEventSort.arr){
System.out.print(a+"、");
}
CATALOG
  1. 1. 单例模式
  2. 2. 不变模式
  3. 3. 生产者-消费者模式
  4. 4. 高性能的生产者-消费者:无锁的实现
  5. 5. 无锁的缓存框架 Disruptor
    1. 5.1. 提高消费者响应时间:选择合适的策略
  6. 6. CPU Cache的优化:解决伪共享问题
  7. 7. Future模式
  8. 8. 并行流水线
  9. 9. 并行搜索
  10. 10. 并行排序
    1. 10.1. 分离数据相关性:奇偶交换排序