我们采用并行算法的目的是为了提升效率,但事实上使用多线程的方式会额外地增加系统开销,如果在单核CPU上并行CPU的效率一般低于串行的,因此合理地使用并发才能将CPU性能发挥到极致。
有助于提高锁性能的几点建议
减少锁持有时间
如果每个程序都占用锁时间很长。那么将是是一笔非常大的开销,只在必要时候进行同步就能明显减少开销,提高系统吞吐量。(减少锁的占有时间也有效地降低锁冲突的可能性 进而提升系统的并发能力)
减小锁粒度
减小锁的粒度也是一种削弱多线程锁竞争 的有效手段。前面所介绍的HashMap,如果你直接在类上加synchronized锁 那么无疑是线程安全的,但是这也锁粒度太大,所以性能很低,这时候的ConcurrentHashMap引入了分段锁(这也就使得性能很高)。
ConcurrentHashMap被进一步细分为16个小段,根据hashcode值得到该表项应该放入哪个小段中,然后对该段进行加锁,完成put操作(如果幸运的话可以并行16个线程)。在多线程环境中如果多个线程同时put操作,只要被加入的表项不是存放同一个段中,都可以真正并行。减少锁粒度会面临一个新的问题,但是在访问全局的信息里,ConcurrentHashMap就需要获取每一段的锁然后进行处理了,例如获取size()方法 会先进行无锁的方式求和如果失败就进行有锁方式的累加,最后返回结果,这样在高并发场合ConcurrentHashMap的效率低于同步的HashMap;因此 如果在使用全局信息不频繁的情况下 使用ConcurrentHashMap这类减小所粒度的类并发效率才会好。
读写分离来替换独占锁
理论上读是不修改数据的,所以读多写少并发场合可以使用读写锁来进行控制,有效提升系统的并发能力。
锁分离
如果将读写锁的思想进一步的衍生,那么就有锁分离思想。利用类似的分离思想也可以对独占锁进行分离。LinkedBlockingQueue take()方法和Put()方法都是操作修改但是由于是基于链表的,所以分别作用于队列的底端和顶端 理论上来说二者不冲突。
在JDK中并未用这两种方式,取而代之的是使用两把不同的重入锁(削弱了锁竞争的可能性)。
1 |
|
put()方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) { //如果当前没有可用数据就等待
notFull.await(); //等待有数据了被唤醒
}
enqueue(node); //插入数据
c = count.getAndIncrement(); //更新总数 c是count+1前的值
if (c + 1 < capacity) //有足够的空间通知其他线程
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
take()方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) { //如果当前没可用的等待put方法唤醒
notEmpty.await();
}
x = dequeue(); //取得第一个数据 然后数量-1,原子操作因为会和put同时访问count
c = count.getAndDecrement();
if (c > 1) //通知其他take操作
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); //通知Put操作 已有足够空间
return x;
}
以上通过两把锁takeLock和putLock实现了真正的锁分离,从而实现了并发操作。
锁粗化
这一点和之前的减少锁的持有时间是相互违背的,因为如果同时大量反复地请求和释放同一把锁时,这是不利于性能的,虚拟机会对锁进行粗化,就是把所有简短且频繁的锁操作整合成一个大锁操作。1
2
3
4
5
6
7
8
9
10
11
12synchronized(obj){
//doSth ..
}
//不需要并发代码
synchronized(obj){
//doSth
}
以上代码被锁粗化成:
sychronized(obj){
//doSth
//不需要并发代码
}
虚拟机对锁优化所做的努力
这里介绍JDK 虚拟机为提高并发性能而绞尽脑汁的几种“锁”优化策略:
锁偏向
如果不是特别高并发场合,每次请求都是同一个线程,那么锁就可以进入偏向模式,该模式下当一个线程再次请求锁可以直接获得无须阻塞同步,这样就节省了大量关锁和申请的时耗,对于锁竞争激烈的场合这还是不适用的,这个通过适用虚拟机参数-XX:+UseBiasedLocking
可以开启偏向锁。
轻量级锁
如果偏向锁失败,虚拟机并不会立即挂起线程,虚拟机还会使用此种优化手段,就是利用对象头部作为指针指向一个持有锁的线程堆栈内部,判断一个线程是否持有对象锁,如果线程获取轻量级锁成功可以顺利进入临界区。如果轻量级加锁失败,则表示被其他线程先争取到了锁,那么当前线程锁请求就会膨胀为重量级锁。
自旋锁
在上述的锁膨胀后,虚拟机不会立刻在系统层面挂起该线程。还会进行最后的尝试。如果一个线程暂时无法获得锁且在短时间内可以获得 那么挂起是一种得不偿失的手段了。因此 虚拟机会让线程做几个空循环(自旋的含义) 若干次循环后如果可以获得锁那么就顺利地进入临界区。如果还不能获得锁 才会真实地将线程在操作系统层面挂起。
锁消除
锁消除是一种更彻底地锁优化,Java虚拟机在JIT编译时,通过对上下文的扫描,去除不可能存在共享资源竞争,通过锁消除可以去除毫无意义的锁竞争。
例如在方法中定义局部变量Vector,由于局部变量是在线程栈上分配的,属于线程私有数据。因此不可能被其他线程访问,这种情况下内部所有锁是无必要的,虚拟机会去除。
锁消除涉及的一项关键技术就是逃逸分析 。 所谓逃逸分析就是观察某一变量是否会逃出特定的作用域,就像上述如果是定义一个方法的Vector v,最后返回的不是v 而是其他对象, 那么说明这个对象没有逃离出方法的这个作用域,那么就可以被锁消除优化了,如果是直接返回的return v的话 那么就代表v逃逸出当前函数 有可能呢被其它线程访问,虚拟机就不会进行锁消除操作了。
逃逸分析必须在-server模式下运行,
可以使用-XX:+DoEscapeAnalysis
参数打开逃逸分析
使用-XX:+EliminateLocks
参数可以打开锁消除。
人手一支笔ThreadLocal
ThreadLocal相当于100个人同时写文档,Threadlocal就充当100支笔,使得变量在多线程下可以同时操作。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44/**
* ThreadLocal 多线程提供每个线程一个操作副本
* @author 圈圈
* @version 1.0
*/
public class ThreaadLocalDemo {
public static class TLSDemo1 implements Runnable{
//保证多线程环境下格式化日期不会有错误
public static final ThreadLocal<SimpleDateFormat> tls = new ThreadLocal<>();
private String name;
public TLSDemo1(String name){
this.name=name;
}
public void run() {
// TODO Auto-generated method stub
//如果该线程的key为null 就设置一个 如果已经持有就使用
//需要注意的是为每个线程分配一个对象工作不是由ThreadLocal来保证的,是由我们在应用层面来保证的
if(tls.get()==null){
tls.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}
try {
//如果该线程有实例就直接使用
Date date = tls.get().parse("2017-07-30 09:23:22");
System.out.println(name+" 转换的时间为:"+date);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
//线程池构建100个线程
ExecutorService pool = Executors.newCachedThreadPool();
for(int i=0;i<1000;i++){
//使用TLS机制为每个变量创建一个副本 不会冲突
pool.submit(new TLSDemo1("Thread"+i));
}
pool.shutdown();
}
}
现在来分析一波他的set方法源码:1
2
3
4
5
6
7
8public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
可以看出 ,它通过当前线程来去获取一个ThreadLocalMap,并把值设入ThreadLocalMap当中,而ThreadLocal你可以理解为一个hashmap,一个内部成员,写入ThreadLocalMap的key正是当前的ThreadLocal对象,value就是我们需要的值。
get()方法自然而然也就是把值都取出来:1
2
3
4
5
6
7
8
9
10
11
12
13public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
"unchecked") (
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
首先get方法也是通过当前线程去获取一个ThreadLocalMap对象,然后通过该key去获取value。
由此我们可以引出一个问题,这些变量都是维护在Thread类内部的(ThreadLocalMap)定义所在类,这也就意味着只要线程不退出,对象的引用就一直存在。当线程退出时,Thread类会进行一些清理工作,其中就包括 清理ThreadLocalMap:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 /**
当线程退出时 由系统进行回调 资源清理
*/
private void exit() {
if (group != null) {
group.threadTerminated(this);
group = null;
}
/*加速资源清理 */
target = null;
/* Speed the release of some of these resources */
threadLocals = null;
inheritableThreadLocals = null;
inheritedAccessControlContext = null;
blocker = null;
uncaughtExceptionHandler = null;
}
因此 如果我们使用线程池(例如固定大小的线程池) 线程未必会退出,如果这样你将一些很大的对象放入ThreadLocal内(实际放入ThreadLocalMap内),可能会使系统出现内存泄漏的可能。(意思就是如果设置了对象不使用它 该对象也无法被回收)。此时你就要使用ThreadLocal.remove()方法将这个变量移除,就像关闭流和数据库连接一样,如果你不需要对象了就应该告诉虚拟机请回收防止内存泄漏。
1 | public void remove() { |
threadLocals = null;会加速垃圾回收(把对象设置为null即为弱引用,如果被垃圾回收器发现会立即回收),ThreadLocalMap是一个类似Hashmap。更精确地说 它是类似WeakHashMap。ThreadLocalMap实现使用了弱引用 WeakReference
static class Entry extends WeakReference<ThreadLocal<?>> {
/* The value associated with this ThreadLocal. /
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
这里的参数k就是Map的key,v就是Map的Value,k也相当于是ThreadLocal的实例,实际上它并不是真的持有ThreadLocal的实例(而是弱引用ThreadLocal),当ThreadLocal的强引用失去时候,这个key他就会变为null被回收。
ThreadLoca对性能有何帮助
如果共享对象对于竞争的处理容易引起性能损失我们还是应该考虑使用ThreadLocal为每个线程分配单独的对象,例如多线程下产生随机数。
无锁
锁是一种悲观策略,执行的悲观算法,它总是假设每次访问临界区资源都会冲突,因此每次操作都小心翼翼。而无锁就像是乐观算法,通过CAS机制来假设每次都不会冲突,如果检测到冲突就重试当前操作直到没有冲突为止。
与众不同的并发策略:比较交换(CAS)
这种无锁无阻塞的方式天生没有死锁现象,也不会有争夺锁资源的高开销,也没有线程间频繁调度带来的开销,使用比较交换由于其非阻塞带来如上的优点,使其要比基于锁的方式拥有更优越的性能。但是程序CAS看起来更复杂一点。
CAS算法为CAS(E,V,P) E代表要更新的变量,V代表期望值,P代表要更新成这个值,当且仅当E的值和V期望值相等 才将E设置为P,否则就代表其他线程修改过了,重新尝试或者放弃操作。这也是采用乐观算法来假设都不冲突 不阻塞地进行CAS操作,当然并发下修改同一个值的话也只有一个会操作成功 其余线程均会失败。硬件层面,大多数的现代处理器都已经支持原子化的CAS指令。JDK5.0以后这种操作在虚拟机可以说是无处不在。
无锁的线程安全整数:AtomicInteger
为了让Java程序员能够受益于CAS等CPU指令,JDK并发包中的atomic包包含了直接使用CAS操作的线程安全类型;
AtomicInteger一些方法:public final int get() //获取当前值
public final void set(int newValue) //设置当前值
public final int getAndSet(int newValue) //设置新值并返回旧值
public final boolean compareAndSet(int expect,int u) //如果当前值为期望值expect则设置为u
public final int getAndIncrement/Decrement() //当前值加减一 并返回旧值
public final int increament/decrement AndGet() //当前值加减一 并返回新值
public final int getAndAdd(int delta) //返回旧值 新加指定delta数值
public final int addAndGet(int delta) //返回新值,添加指定delta数值
1 | //就内部实现来说 AtomicInteger 中保存一个核心字段: |
main方法:1
2
3
4
5
6
7//创建十个线程 操作原子变量i
ExecutorService pool2 = Executors.newFixedThreadPool(10);
for(int i=0;i<10;i++){
pool2.submit(new AtomicDemo());
}
pool2.shutdown();
System.out.println("Atomic操作后的结果为:"+AtomicDemo.i); //输出100 无冲突
这种自增方法内部都是无限循环加上CAS操作的,此外AtomicReference表示对象引用,等待还有类似的类。
Java中的指针:Unsafe类
sun/misc/Unsafe包下的类,它是一个特殊的类,从名字来看它和C/C++的指针一样,是不安全的,所以Java移除指针的原因在此,如果指针指错了位置,你很有可能会覆盖别人的内存,导致系统崩盘。这个类里封装了一些native的 CAS 方法,以前的的ConcurrentLinkedQueue 中Node的一些CAS操作也都是使用Unsafe类来实现的。
Unsafe类它是JDK一个内部专属类,不允许我们使用,其帮助Java实现了许多底层原理。
注意:根据Java类加载器的原理,应用程序的类由 App Loader加载
系统核心类 如rt.jar 由 BootStrap类加载器加载(该加载器没有java对象,因此当一个类加载器返回为Null时,这说明它是由BootStrap加载的,这个类也极有可能是rt.jar中的类了)。
无锁的对象引用 AtomicReference
与AtomicInteger不同之处在于 它是操作对象引用的,这里会引出一个原子操作的逻辑问题。
CAS操作是保证我的值和期望值相等就进行修改,但是如果该值被修改了一次以后又修改回原值,那么就无法捕捉到了,如果需求是要和变化过程有关那么AtomicReference就无能为力了。有一种应用场景是:如果一张蛋糕卡卡内金额小于20元 那么就充值10元,每个用户只能充值一次,如果是高并发应用场合的话一个用户恰巧19元充值了10元 然后此刻它又消费了10元小于20元,那么这时候系统又会再次充值10元。即存在漏洞,
那么就要引出AtomicStampedReference 来很好地解决这个问题了。
带时间戳的对象引用AtomicStampedReference
AtomicStampedReference与AtomicReference不同的是这类内部不仅维护了对象值,还维护了一个时间戳(可以表示一个状态),该对象数值更新以后 对应的时间戳状态也进行更新。因此使用此类时可以判断 只有当指定值和时间戳都满足期望值,写入才会成功。因此对象被反复读写 写回原值,只要时间戳发生变化 就能防止不恰当的写入操作。AtomicStampedReference(V initialRef, int initialStamp)
创建一个新的 AtomicStampedReference与给定的初始值。compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)
以上参数依次为: 期望值,写入新值,期望时间戳,新时间戳getReference()
返回引用的当前值。getStamp()
返回当前时间戳。set(V newReference, int newStamp)
设置引用和时间戳的值。
下面展示一个 顾客消费 店家充值的例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65/**
* 原子性CAS 带有时间戳的引用类测试例子
* 模拟顾客卡内20元以下 商家充值20元 一个顾客只能充值一次
* @author 圈圈
* @version 1.0
*/
public class AtomicStampReferenceDemo {
//构建待有时间戳的原子引用对象 arg1 引用类型 arg2时间戳
public static AtomicStampedReference<Integer> ato = new AtomicStampedReference<Integer>(19, 0);
public static void main(String[] args) {
//启动5个线程 模拟同步更新后台数据库 为用户充值
for(int i=0;i<5;i++){
//商家充值线程
new Thread(){
public void run() {
//获取时间戳和余额
final int time = ato.getStamp(); //等价于获取value
//循环判断
while(true){
while(true){
int money = ato.getReference();
if(money<20){
//如果赠与成功 修改时间戳 避免二次赠与
if(ato.compareAndSet(money, money+20, time, time+1)){
System.out.println("余额小于20, 充值成功,现有余额:"+ato.getReference());
break;
};
}else{
System.out.println("余额大于20 不充值!");
break;
}
}
}
}
}.start();
//顾客消费模型线程
new Thread(){
public void run() {
for(int i=0;i<100;i++){
while(true){
int time = ato.getStamp();
int money = ato.getReference();
if(money >10){
System.out.println("大于10元");
//如果消费成功修改时间戳 使其不会重复
if(ato.compareAndSet(money, money-10, time, time+1)){
System.out.println("消费10元,余额"+ato.getReference());
break;
}
}else{
System.out.println("余额不足10元无法消费");
break;
}
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}}.start();
}
}
}
数组也能无锁:AtomicIntegerArray
数组是一种特殊的数据结构,使用Unsafe类 通过CAS方式控制数组多线程下的安全性,这里以AtomicIntegerArray为例。compareAndSet(int i, int expect, int update)
如果当前值 ==为预期值,则将位置 i处的元素设置为给定的更新值。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36/**
* 无锁原子数组操作类
* CAS操作
* @author 圈圈
* @version 1.0
*/
public class AtomicArrayDemo {
public static class AtomicIntegerDemo1 implements Runnable{
public static AtomicIntegerArray arr1 = new AtomicIntegerArray(10);
public void run() {
// TODO Auto-generated method stub
for(int j=0;j<10000;j++){
//将第i个元素下标+1 对10个数组元素进行累加操作 每个元素各加1000次
arr1.getAndIncrement(j%arr1.length());
}
}
public static void main(String[] args) throws Exception {
// ExecutorService pool = Executors.newCachedThreadPool();
Thread[] tls = new Thread[10];
for(int i=0;i<10;i++){
//启动十个线程]
tls[i]=new Thread(new AtomicIntegerDemo1());
tls[i].start();
}
// pool.shutdown();
for(int i=0;i<10;i++){
tls[i].join();
}
for(int i=0;i<10;i++){
//获取多线程执行后的数组10个元素 全部为100000 这代表线程是安全的
System.out.println("数组的值arr["+i+"]="+AtomicIntegerDemo1.arr1.get(i));
}
}
}
}