上一篇我们介绍了Java8函数式编程的特点和基本操作,以及增强型的Future,CompletableFuture,这一篇我们将继续介绍Java8为并发所做的努力。
组合多个CompletableFuture
CompletableFuture 还允许你将多个CompletableFuture 进行组合。一种方法是使用thenCompose() 。将一个CompletableFuture执行完成的结果传递给下一个CompletionStage进行处理(返回新的实例)。
下面演示一个一个CompletableFuture调用后的结果作为另一个的参数调用的函数式过程:1
2
3
4
5
6
7final CompletableFuture<Void> future5 = CompletableFuture.supplyAsync(()->call(20))
//调用4000^2
.thenCompose((x)->CompletableFuture.supplyAsync(()->call(x)))
.thenApply((str)->"Result ="+String.valueOf(str))
.thenAccept((x)->System.out.println(x));
//阻塞 等待异步Future执行完毕
future5.get();
另外一种组合多个CompletableFuture方法是thenCombine(future2,doAction):1
2
3
4
5//thenCombine 组合CompletableFuture future2=1600
final CompletableFuture<Void> future6 = CompletableFuture.supplyAsync(()->call(10))
.thenCombine(future2, (x,y)->(x+y))
.thenAccept((i) -> System.out.println("100+1600="+i));
future6.get();
这里需要注意的是thenCombine的这个future2如果要进行运算操作 必须要有返回值哦。
读写锁的改进 StampedLock
这类似于悲观策略的读写锁,但是它提供了乐观策略的锁机制,非常类似无锁操作,使得乐观读线程不会阻塞写线程。
以下为测试代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**
* Java8读写锁加强 StampedLock
* 带有乐观策略的读锁
* 通过CLH自旋锁实现类似CAS机制
* @author 圈圈
* @version 1.0
*/
public class StampedLockDemo implements Runnable{
private double x,y;
private String type;
private final StampedLock lock = new StampedLock();
public StampedLockDemo(double x, double y,String type) {
super();
this.x = x;
this.y = y;
this.type=type;
}
//写锁
public void setAddValue(double x,double y){
//上写锁 返回一个类似时间戳的
long stamp = lock.writeLock();
try{
this.x+=x;
this.y+=y;
}finally{
//根据Long的状态解锁
lock.unlockWrite(stamp);
}
}
//读方法 读锁
public double getSqrtData(){
// 乐观读锁tryOptimisticRead
long stamp = lock.tryOptimisticRead();
double currentX = this.x, currentY=this.y;
//判断当前线程是否被修改过
if(!lock.validate(stamp)){
try {
//如果验证不通过(数据不一致会出现脏读现象)就上读锁
stamp = lock.readLock();
currentX = this.x;
currentY = this.y;
} finally {
//不要忘记释放锁
lock.unlockRead(stamp);
}
}
//没被修改直接返回
return Math.sqrt(currentX*currentX+currentY*currentY);
}
public static void main(String[] args) throws Exception {
for(int i=0;i<3;i++){
//启动3个写线程
new Thread(new StampedLockDemo(i, i*10, "1")).start();
}
for(int i=0;i<10;i++){
//启动10个读线程 和一个写线程
new Thread(new StampedLockDemo(i, i*10, "2")).start();
if(i==5){
new Thread(new StampedLockDemo(i, i*10, "1")).start();
}
}
Thread.sleep(5000);
}
public void run() {
// TODO Auto-generated method stub
//1代表启动修改值(增加指定的值)
if("1".equals(type)){
System.out.println(Thread.currentThread().getName()+"begin setAddValue!");
setAddValue(x,y);
}else{
double sqrtData = getSqrtData();
System.out.println(Thread.currentThread().getName()+" sqrtData="+sqrtData);
}
}
}
可以看到,StampLock通过乐观读来提高并行能力。
注意StampLock的坑
StampLock内部实现机制类似于CAS机制,是基于CLH锁的,在它挂起线程时候使用的是Unsafe.park()函数,stampLock死循环过程中,该函数等待时候中断了是不会有中断处理的,会再次进入循环,当退出条件不满足时候,会吃尽系统的资源。它会一直存在知道自己抢占到了锁。
有关StampLock锁
StampLock都是基于CLH锁的,CLH为一种自旋锁,它保证没有饥饿发生,并且可以保证FIFO的服务顺序。
CLH锁的基本思想是:锁维护一个等待队列(等待链表队列),所有申请锁,但是没有成功的线程都记录在这个等待队列中,每一个节点代表一个线程都保存有一个标记位,用于判断当前线程是否已经释放锁。
当一个线程试图获取锁时,会从当前等待队列尾部节点作为其前序节点,并使用类似如下代码判断前序节点是否已经成功释放锁:while(pre.locked){}
只要前序节点没有释放锁,则表示当前线程还不能继续执行,自旋等待。释放锁时同理,将自己的自身节点locked标记为设置为false,这后续线程可以继续执行。
关于乐观锁 一次成功的乐观读必须保证当前锁没有被占用,通过一个state标记来判断。乐观读后如果一个线程申请写锁那么会改变这个state标记,那么在乐观锁确认时(validate)就好发现这个改动 而导致乐观锁失败。乐观锁失效后就提升锁级别,使用悲观读锁。悲观读锁会尝试设置state状态,如果失败则进入acquireRead()二次尝试锁获取。在acquireRead()中线程会在不同条件下进行若干次的自旋,试图通过CAS操作获得锁,如果自旋宣告失败则会启用CLH队列,再进行自旋,如果发现自己成功获得读锁会进一步激活cowait队列中的全部读线程,如果最终依然失败,那么会被unsafe.park()
挂起当前线程,acquireWrite流程也与之类似,都通过自选尝试、加入等待队列、直至Unsafe.park()
挂起的逻辑行为。
原子类的增强
更快的原子类 LongAdder
Java.lang.concurrent.aotmic包下的大量的原子操作类,它们均采用CAS机制去无限循环,低并发情况下竞争成功率可能大,高并发竞争下不断失败尝试很是浪费性能,因此可以仿照ConcurrentHashMap。将热点数据进行分离,而最终的技术结果就是数组的求和累加,这样热点就进行了有效的分离,LongAddr正是使用了这种思想。
实际操作中LongAddr不会一开始就动用数组去处理,而是将所有数据都先记录在做一个base变量中,如果没有冲突就不扩展,有冲突就扩展为cell数组,依次判断下去。下面阐述了一波锁、原子操作类、加强原子类的耗时测试:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145/**
* java8加强原子类 LongAddr
* 辅助类
* 测试 同步锁、无锁原子操作Atomic 以及加强原子操作LongAddr 性能测试
* @author 圈圈
* @version 1.0
*/
public class LongAddrDemo{
//定义线程数 任务数 以及目标总数常量
private static final int MAX_THREADS=3;
private static final int MAX_TASKS=3;
private static final int TARGET_COUNT=1000000;
//构建原子 对象
private static AtomicLong account = new AtomicLong(0L);
private static LongAdder addr = new LongAdder();
//要测试的数字
private long count=0;
//定义程序计数器3个
private static CountDownLatch synchrLatch = new CountDownLatch(MAX_TASKS);
private static CountDownLatch atomicLatch = new CountDownLatch(MAX_TASKS);
private static CountDownLatch longAddrLatch = new CountDownLatch(MAX_TASKS);
public synchronized long increCount(){
return ++count;
}
public synchronized long getCount(){
return count;
}
//定义锁操作内部类
public static class SynchroTest implements Runnable{
public String name;
public long startTime;
//构建要调用方法的对象
public LongAddrDemo demo = new LongAddrDemo();
public SynchroTest(String name, long startTime) {
super();
this.name = name;
this.startTime = startTime;
}
public void run() {
long count2 = demo.getCount();
//计算出加锁的时间
while(count2<TARGET_COUNT){
count2=demo.increCount();
}
long endTime = System.currentTimeMillis();
System.out.println(name+"方法自增结果为:"+count2+" 耗时:"+(endTime-startTime)+"ms");
//计数器减少一
synchrLatch.countDown();
}
//启动同步方法测试
public static void testSync() throws Exception{
ExecutorService pool = Executors.newFixedThreadPool(MAX_THREADS);
long startTime = System.currentTimeMillis();
SynchroTest sync = new SynchroTest("同步", startTime);
//提交任务数相同的线程 开始计算会打印耗时
for(int i=0;i<MAX_TASKS;i++){
pool.submit(sync);
}
synchrLatch.await();
pool.shutdown();
}
}
public static class AtomicTest implements Runnable{
private String name;
private long startTime;
public AtomicTest(String name, long startTime) {
super();
this.name = name;
this.startTime = startTime;
}
//测试原子CAS操作的时间
public void run() {
//原子类
long count2 = account.get() ;
while(count2<TARGET_COUNT){
count2=account.incrementAndGet();
}
long endTime = System.currentTimeMillis();
System.out.println(name+"执行耗时"+(endTime-startTime)+"ms");
atomicLatch.countDown();
}
//测试
public static void testAtomic() throws Exception{
ExecutorService pool = Executors.newFixedThreadPool(MAX_THREADS);
long startTime= System.currentTimeMillis();
AtomicTest ato = new AtomicTest("原子CAS", startTime);
for(int i=0;i<MAX_TASKS;i++){
pool.submit(ato);
}
atomicLatch.await();
pool.shutdown();
}
}
//测试LongAddr也是同理
public static class LongAddrThread implements Runnable{
private String name;
private long startTime;
public LongAddrThread(String name, long startTime) {
super();
this.name = name;
this.startTime = startTime;
}
public void run() {
//sum()方法用于获取LongAddr的值
long sum = addr.sum();
while(sum<TARGET_COUNT){
//由于下面方法是void类型 所以需要重新赋值
addr.increment();
sum=addr.sum();
}
long endTime=System.currentTimeMillis();
System.out.println(name+"耗时:"+(endTime-startTime)+"ms");
longAddrLatch.countDown();
}
public static void testLongAddr() throws Exception {
ExecutorService pool = Executors.newFixedThreadPool(MAX_THREADS);
long startTime = System.currentTimeMillis();
LongAddrThread t1 = new LongAddrThread("LongAddr线程", startTime);
for(int i=0;i<MAX_TASKS;i++){
pool.submit(t1);
}
longAddrLatch.await();
pool.shutdown();
}
}
//下面测试三者性能:
public static void main(String[] args) throws Exception {
SynchroTest.testSync();
AtomicTest.testAtomic();
LongAddrThread.testLongAddr();
}
}
另外一个优化是LongAddr避免了伪共享问题,它并不是使用padding这种看起来比较碍眼的做法,而是采用新的注解@sun.misc.Contended:
上述使用了`@sun.misc.Contended来使得Java虚拟机自动为Cell解决伪共享问题,但是需要使用额外的参数,
-XX:-RestrictContended`,否则这个注释将被忽略。
LongAdder 的功能增强版 :LongAccumulator
LongAccumulator 是LongAdder的亲兄弟,它们有公共的父类 Striped64。因此LongAccumulator内部的优化方式和LongAdder是一样的。它们都将一个long整数进行分割,存储在不同的变量中,以防止多线程的竞争。二者的主要逻辑是类似的,但是LongAccumulator是LongAdder的功能扩展,对于LongAdder而言它只是每次对整数进行加法操作,而LongAccumulator则可以每次实现任意函数操作。可以使用下面的构造函数public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity)
第一个参数是需要执行的二元函数(接受2个long返回long),第二个参数为初始值。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31/**
* LongAddr的加强
* Longaddr每次只能对整数执行一次加法
* 而LongAccumulator可以实现任意函数操作
* @author 圈圈
* @version 1.0
*/
public class LongAccumulatorDemo {
public static void main(String[] args) throws InterruptedException {
//初始化LongAccumulator 函数意思为取其最大值
LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
//启动一千个线程
Thread[] ti = new Thread[1000];
for(int i=0;i<1000;i++){
ti[i] = new Thread(
() ->{
Random r = new Random();
long ran = r.nextInt();
//调用方法 accumulate 该方法通过第一个参数指定 保存最大值到内部(可能是cell数组内,也可能是base)
accumulator.accumulate(ran);
});
ti[i].start();
}
for(int i=0;i<1000;i++){
ti[i].join();
}
//通过longValue()获取最大值
System.out.println("LongAccumulator的最大值为:"+accumulator.longValue());
}
}