synchronized的功能扩展:重入锁
重入锁完全可以替代Synchronized关键字,在java5以前重入锁的性能远远好于synchronized关键字,但从jdk6开始,synchronized上做足了优化,使得两者性能差距并不大。
重入锁使用java.util.concurrent.locks.ReentrantLock类来实现。重入锁的基本示例如下:
1 | package thread.test.concurrent; |
以上代码可以看出,与synchronized具有更高的灵活性以及显式地操作临界区资源过程,开发人员手动指定何处加减锁。为什么角重入锁呢?因为一个资源区可以多次加锁,同时也要解除等量的锁:1
2
3
4
5
6
7
8
9
10 //锁住临界区资源
lock.lock();
//锁两次
lock.lock();
//。。。。
finally{
lock.unlock();
//上面锁住了两次 这里也要释放2次
lock.unlock();
}
一个线程两次获得同一把锁这是允许的,释放也记得释放相同次数的锁。
除了灵活性之外,重入锁还具有更高级的功能:
重入锁的中断响应
重入锁具有非常有必要的中断功能,例如一个线程正在等待锁,你可以根据需要取消对锁的请求。代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50public static class ReentrantLockDemo2 implements Runnable{
//两个线程分别占用一个去请求另外一个
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
//设置参数判断用于造成死锁
int lock;
public ReentrantLockDemo2(int lock) {
// TODO Auto-generated constructor stub
this.lock=lock;
}
public void run() {
// TODO Auto-generated method stub
try {
if(lock==1){
//线程一占领锁一 并打断后申请锁2
//lockInterruptibly方法 这是一个锁申请动作,申请过程中可被响应打断
lock1.lockInterruptibly();
try {
Thread.sleep(500);
} catch (Exception e) {
//如果1被打断就申请二
lock2.lockInterruptibly();
}
}else{
//线程二申请锁二 打断后申请锁1
//lockInterruptibly 这是一个锁申请动作,申请过程中可被响应打断
lock2.lockInterruptibly();
try {
Thread.sleep(500);
} catch (Exception e) {
//如果2被打断就申请一
lock1.lockInterruptibly();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
//finally语句块isHeldByCurrentThread方法用于判断锁是否被当前线程持有 是的话就解除锁资源退出
if(lock1.isHeldByCurrentThread()) lock1.unlock();
if(lock2.isHeldByCurrentThread()) lock2.unlock();
System.out.println(Thread.currentThread().getName()+"线程退出");
}
}
}
主方法main method:1
2
3
4
5
6
7
8
9
10
11
12//构造两个线程造成死锁效应
Thread t3 = new Thread(new ReentrantLockDemo2(1),"t1线程");
Thread t4 = new Thread(new ReentrantLockDemo2(2),"t2线程");
t3.start();t4.start();
//以上两个线程以及死锁 主线程休息5s后执行操作解除死锁
System.out.println("主线程休眠5s");
Thread.sleep(5000);
/**
* 中断t4 这样就放弃了对lock1的申请 同时释放已经获得的lock2
* 这个操作可以让t3线程顺利地获得lock2而继续执行下去
*/
t4.interrupt();
锁申请等待限时
为了避免死锁或者饥饿现象,可以规定一个线程请求一个锁资源的等待时间,使用 tryLock(时间值,时间单位)
方法进行一次限时的等待。tryLock()
方法可以不带时间参数运行,这种情况下不会线程等待时长直接尝试获取,如果获取到就返回true否则false;1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33/**
* 重入锁ReentrantLock:
* 限时等待tryLock(time,timeUnit)
* 或者无等待申请 tryLock()
* @author 圈圈
* @version 1.0
*/
public static class TimeLock implements Runnable{
public static ReentrantLock timeLock = new ReentrantLock();
public void run() {
// TODO Auto-generated method stub
try {
//如果5s内获取到锁的话
if(timeLock.tryLock(5,TimeUnit.SECONDS)){
//占用8s 大于申请的 5s
System.out.println(Thread.currentThread().getName()+"霸占锁8秒");
Thread.sleep(8000);
}else{
System.out.println(Thread.currentThread().getName()+"5s内锁申请不到,失败!");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
//释放资源
if(timeLock.isHeldByCurrentThread()) timeLock.unlock();
}
}
}
main方法:1
2
3
4//tryLock有时间的尝试请求资源
Thread timeT1 = new Thread(new TimeLock(),"time线程1");
Thread timeT2 = new Thread(new TimeLock(),"time线程2");
timeT1.start();timeT2.start();
tryLock()方法不会等待,使用while(true)可以不停地尝试获取线程加锁
公平锁与非公平锁
ReentrantLock lock = new ReentrantLock(boolean fair);
如上的代码我们可知,重入锁是提供公平锁的操作模式的,当构造函数参数为true时,锁是公平的。而传统的synchronized生成的锁都是非公平的,公平锁的一大特点就是:不会产生饥饿现象,按照申请时间的先后顺序分配。由于公平锁维护的是一个有序的队列,所以维护成本高,性能相对低下,因此锁是非公平的,如无特殊需求不需要使用公平锁。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21/**
* 重入锁之
* --公平锁
* @author 圈圈
* @version 1.0
*/
public static class FairLock implements Runnable{
public static ReentrantLock lock = new ReentrantLock(true);
public void run() {
// TODO Auto-generated method stub
while(true){
lock.lock();
try{
System.out.println("当前线程"+Thread.currentThread().getName()+"获取了公平锁");
}finally{
lock.unlock();
}
}
}
}
main方法:1
2
3
4
5
6
//公平锁 非公平锁测试
Thread fair = new Thread(new FairLock(),"fair1");
Thread fair2 = new Thread(new FairLock(),"fair2");
fair.start();
fair2.start();
输出结果(基本都是交替输出,这就是公平锁的输出结果):
ReentrantLock的几个重要方法如下:
1、lock() 获得锁 如果锁被占有 则等待
2、tryLock() 获得锁不等待直接返回 tryLock(time,timeUnit) 等待指定的时间
3、lockInterruptibly() 获得锁,但优先响应中断。
4、unLock() 释放锁
5、isHeldByCurrentThread() 判断该锁是否被当前线程持有
就重入锁的实现来看,主要集中在Java层面,包含3个要素:
1、原子性。原子状态使用CAS操作来存储当前线程的状态,判断锁是否已经被别的线程持有。
2、等待队列。所有没请求到锁的线程会进入等待队列进行等待,待有线程释放锁后就从等待队列唤起一个线程。
3、阻塞源于park()和unpark(),用来挂起和恢复线程 详见线程阻塞工具类LockSupport。
重入锁的搭档Condition
我们知道synchornized关键字和object的wait()
,notify()
,notifyAll()
是搭档,那么重入锁和Condition也是很好的搭档。通过Lock接口调用lock.newCondition()
构造Condition的时候就可以绑定重入锁对象了,利用Condition对象我们就可以让线程在合适的时间内等待或者被唤醒。
重入锁的搭档Condition 常见方法如下 :
void await()
当前等待并释放锁void awaitUninterrupptible()
与await()相同,但是不会在等待过程中响应中断。long awaitNanos()
boolean await(long time,TimeUnit)
等待指定时间boolean awaitUntil(Date deadline)
等待直到某一时间点void signal()
唤醒一个等待中的线程void signalAll()
唤醒所有等待线程
实例代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43/**
* Condition测试
* @author 圈圈
* @version 1.0
*/
public class ConditionDemo {
//Condition是与ReentrantLock好搭档 用于线程间的通信
public static class ConditionTest implements Runnable {
//构建非公平锁
public static ReentrantLock lock = new ReentrantLock();
//构建Condition实例
public static Condition condition = lock.newCondition();
//测试condition
public void run() {
// TODO Auto-generated method stub
lock.lock();
try{
System.out.println(Thread.currentThread().getName()+"阻塞等待");
//类似于objec.wait() 此操作也会释放锁
condition.await();
System.out.println(Thread.currentThread().getName()+"恢复执行");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
if(lock.isHeldByCurrentThread()) lock.unlock();
}
}
}
public static void main(String[] args) throws Exception {
Thread t1 = new Thread(new ConditionTest(),"ConditionThread");
t1.start();
Thread.sleep(5000);
//让主线程获得锁并唤醒
ConditionTest.lock.lock();
//唤醒等待队列的线程
ConditionTest.condition.signal();
//主线程释放锁资源还给t1线程继续执行
ConditionTest.lock.unlock();
}
}
在JDK内部,重入锁和Condition被广泛使用,例如ArrayBlockingQueue 它的take()方法和put()方法都使用了。
允许多个线程同时访问(控制线程访问个数)信号量Semaphore
信号量Semaphore为多线程协同操作提供了更强大的控制方法。广义而言信号量是对锁的扩展,无论是内部锁synchronized或者重入锁ReentrantLock。
信号量主要提供了以下两个构造函数public Semaphore(int permits)
允许访问的线程个数public Semaphore(int permits,boolean fair)
arg2指定是否公平
其主要方法为:public void acquire() //请求获得一个许可 如无则等待
public void acquireUninterruptibly() //类似 但是不相应中断
public boolean tryAcquire() //同理不等待 直接返回true false
public boolean tryAcquire(long time,TimeUnit) //等待指定时间
public void release()//释放一个许可
代码实例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25/**
* Semaphore信号量控制
* @author 圈圈
* @version 1.0
*/
public static class SemaphoreDemo implements Runnable{
//构造五个信号量用于接受5个线程
final Semaphore SEM = new Semaphore(5);
public void run() {
// TODO Auto-generated method stub
try {
//申请一个许可 获取到就执行 没获取到就等待
SEM.acquire();
//每一秒执行5个线程 5个一组地进来
Thread.sleep(1000);
System.out.println(Thread.currentThread().getId()+"get there~!");
//释放一个许可
SEM.release();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
main方法:1
2
3
4
5
6
7
8//信号量测试
//创建一个固定大小为20的线程池
ExecutorService pool = Executors.newFixedThreadPool(20);
SemaphoreDemo semaphoreDemo = new SemaphoreDemo();
for(int i=0;i<20;i++){
//线程池执行二十个线程
pool.submit(semaphoreDemo);
}
ReadWriteLock读写锁
由于重入锁或者 内部锁读与写之间都是串行执行的,这会造成非常多的不必要的开销,因为读线程是不改变变量值的,当B1线程在读 B2线程想要读就得等待,这种开销是完全没必要的。 所以JDK5提供了读写分离的读写锁,实现了真正的读与读之间并发执行。
特点:
- 读与读不互斥:互不影响
- 读与写之间互斥:读阻塞写 ,写阻塞读
- 写与写互斥:相互阻塞
读写锁适用于系统中读多写少的并发场景。
读写锁实例代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82/**
* 读写锁测试
* @author 圈圈
* @version 1.0
*/
public class ReadWriteLockDemo {
//普通重入锁
private static Lock lock = new ReentrantLock();
//读写锁
private static ReentrantReadWriteLock reentrantReadWriteLock= new ReentrantReadWriteLock();
//读锁
private static Lock readLock =reentrantReadWriteLock.readLock();
//写锁
private static Lock writeLock = reentrantReadWriteLock.writeLock();
//操作数i
private int i;
//读操作
public Object handleRead(Lock lock) throws InterruptedException{
try{
lock.lock();
//读操作耗时1s
Thread.sleep(1000);
return i;
}finally{
lock.unlock();
}
}
//写操作
public void handleWrite(Lock lock,int value) throws InterruptedException{
try{
lock.lock();
//写操作耗时1s
Thread.sleep(1000);
i=value;
}finally{
lock.unlock();
}
}
public static void main(String[] args) {
//模拟读写操作实例
final ReadWriteLockDemo demo = new ReadWriteLockDemo();
//构建一个读线程方法
Runnable readRun = new Runnable() {
public void run() {
// TODO Auto-generated method stub
try {
demo.handleRead(readLock); //本实例为读并行的读锁
// demo.handleRead(lock); // 传入 读操作串行的重入锁 效率低
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
Runnable writeRun = new Runnable() {
public void run() {
// TODO Auto-generated method stub
try {
//写操作均阻塞
demo.handleWrite(writeLock, 4);
// demo.handleWrite(lock, 222);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
//启动18个读线程
for(int j =0;j<18;j++){
new Thread(readRun).start();
}
//启动2个写线程
for(int j =18;j<20;j++){
new Thread(writeRun).start();
}
}
}
以上代码如果是并行的读写锁读操作 执行很快 否则会很慢(串行读20s左右)。
倒计时器 CountDownLatch
CountDownLatch是一个非常实用的多线程工具类,这个工具通常用来控制线程的等待,它可以让某一个线程等待直到倒计时结束,再开始执行。public CountDownLatch(int count)
构造器包含了实用计数器作为计数个数 即需要特定数量的线程来完成任务。countDown.await()
方法会要求主线程等待所有检查操作完成后才能继续执行countDown.countDown()
方法没执行一次计数器减1,代表可以少一个线程去执行特定的操作。
实例代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38/**
* 程序计数器操作demo
* 每执行一次countDown()方法计数器减少一
* @author 圈圈
* @version 1.0
*/
public class CountDownLatchDemo implements Runnable{
//构建计数器对象 传入参数为10 即代表需要有10个线程来执行检查任务
static final CountDownLatch latch = new CountDownLatch(10);
static final CountDownLatchDemo demo = new CountDownLatchDemo();
public void run() {
// TODO Auto-generated method stub
try {
//模拟检查任务
Thread.sleep(2000);
System.out.println("check Completed!");
//计数减少1
latch.countDown();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(10);
for(int i=0;i<10;i++){
//从线程池中启动十个线程
pool.submit(demo);
}
//等待检查 阻塞主线程 对run方法进行计数 计数减为零主线程才可以继续执行 否则一直阻塞
latch.await();
System.out.println("Fire!");
///停止线程池
pool.shutdown();
}
}
CyclicBarrier循环栅栏
一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)cyclibarrier.await()。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
使用场景:
需要所有的子任务都完成时,才执行主任务,这个时候就可以选择使用CyclicBarrier。
CyclicBarrier是另外一种多线程并发控制的实用工具。和CountDownLatch类似,它也可以实现线程间的计数等待,且功能比CountDownLatch更为强大和复杂,循环栅栏 —循环的意思即为假设每次可以计数拦截10个线程,第二批又可以重新从10开始计数如此地循环下去。其构造方法如下:public CyclicBarrier(int parties,Runnable barrierAction)
arg1位计数总数,arg2位计数完成后会执行的任务
实例代码如下 我们模拟一个一群士兵集结完毕后 回调将军线程并提示集结完毕的指令场景:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82/**
* CyclicBarrier
* 循环栅栏
* @author 圈圈
* @version 1.0
*/
public class CyclicBarrierDemo {
//模拟循环栅栏阻塞后 执行将军的命令
public static class Soldier implements Runnable{
private String name;
private final CyclicBarrier cyclibarrier;
public Soldier(String name, CyclicBarrier cyclibarrier) {
this.name = name;
this.cyclibarrier = cyclibarrier;
}
public void run() {
// TODO Auto-generated method stub
//等待所有线程到齐 await()方法执行之前所有参与者将一直等待
try {
cyclibarrier.await();
doWork();
//等待所有士兵完成工作 重用等待
cyclibarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
//BrokenBarrierException 该异常为表示当前cyclicBarrier已经破损,就地散伙不再等待了
e.printStackTrace();
}
}
void doWork(){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(name+" 任务完成!");
}
}
//BarrierRun类 用于CyclicBarrier执行完的回调函数 await()执行2次这里也调用2次
public static class BarrierRun implements Runnable{
boolean flag;
int N;
public BarrierRun(boolean flag, int n) {
super();
this.flag = flag;
N = n;
}
public void run() {
// TODO Auto-generated method stub
if(flag){
System.out.println("司令:【士兵"+N+"个任务完成");
}else{
//阻塞await方法调用会执行这个命令等待士兵线程过来并执行完
System.out.println("司令:【士兵】"+N+"个集结完毕!");
flag=true;
}
}
}
public static void main(String[] args) {
final int N= 10;
//构建十个士兵线程组
Thread[] allSoldier = new Thread[N];
boolean flag =false;
//构建cyclicBarrier 循环栅栏对象,传入阻塞放行后会执行的线程 await()方法每调用一次这个方法返回方法就会执行一次
CyclicBarrier cyclicBarrier = new CyclicBarrier(N, new BarrierRun(flag, N));
System.out.println("集合队伍!");
//启动十个线程
for(int i=0;i<N;i++){
System.out.println("士兵"+i+"报道");
allSoldier[i]= new Thread(new Soldier("士兵"+i, cyclicBarrier));
allSoldier[i].start();
}
}
}
线程阻塞工具类 LockSupport
LockSupport是一个非常方便实用的线程阻塞工具类,它可以在线程内任意位置让线程阻塞和解除阻塞,它弥补了suspend()
方法在resume()
方法之后执行导致无法继续执行的情况,和wait()
方法相比较它不需要获得某个对象的锁,也不会抛出InterruptedException异常。
LockSupport方法的静态方法park()
可以阻塞当前线程,park(obj)
还可以等待某一对象的阻塞,类似的还有parkNanos()
,parkUntil()
等方法,它们实现了一个限时的等待。调用静态方法unpark(thread)
可以解除某个线程的阻塞。
实例代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76/**
* 并发包下的
* LockSupport
* 线程阻塞工具类 LockSupport
* 它的优势在于可以随时阻塞,不需要获得锁或者担心不阻塞在阻塞之前执行
* 且不会抛出异常
* @author 圈圈
* @version 1.0
*/
public class LockSupportDemo {
public static class LockSupportDemo1 implements Runnable{
public static LockSupportDemo1 obj = new LockSupportDemo1();
public void run() {
//阻塞
synchronized(obj){
System.out.println(Thread.currentThread().getName()+"阻塞了");
//阻塞
// LockSupport.park();
//park方法还可以传入等待对象 通过jps --> jstack pid 的CLI命令可以看到等待的对象
LockSupport.park(this);
System.out.println(Thread.currentThread().getName()+"解除阻塞");
}
}
}
/**
* LockSupport还支持中断影响,虽然它不会抛出异常 但是可以通过判断方法获得中断标记并返回
* @author 圈圈
* @version 1.0
*/
public static class LockSupportDemo2 implements Runnable{
public static LockSupportDemo2 obj = new LockSupportDemo2();
public void run() {
//阻塞
synchronized(obj){
System.out.println(Thread.currentThread().getName()+"阻塞了");
//阻塞
LockSupport.park();
//park方法还可以传入等待对象 通过jstack pid 的CLI命令可以看到等待的对象
// LockSupport.park(this);
if(Thread.interrupted()){
System.out.println(Thread.currentThread().getName()+"中断");
}
System.out.println(Thread.currentThread().getName()+"解除阻塞");
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new LockSupportDemo1(),"locksupport1");
Thread t2 = new Thread(new LockSupportDemo1(),"locksupport2");
t1.start();
Thread.sleep(2000);
t2.start();
//unpark方法用于解除特定线程的阻塞
LockSupport.unpark(t1);
Thread.sleep(2000);
LockSupport.unpark(t2);
t1.join();t2.join();
/**
* 执行以上这段代码发现永久不会因为park()方法导致线程永久性地挂起
* 这是因为LockSupport实现了一个类似信号量的机制,它为每个线程准备了一个许可,如果许可可用park函数立即返回,否则就阻塞。而Unpark函数实际上就是使得某一不可用许可变为可用,但是和信号量不同的是,一个线程有且仅有一个许可。
* 这个特点就使得即使unpark()方法发生在park()方法之前 park()方法也可以继续执行不会阻塞。
*/
Thread t3 = new Thread(new LockSupportDemo2(),"locksupport3");
Thread t4 = new Thread(new LockSupportDemo2(),"locksupport4");
t3.start();
Thread.sleep(1000);
t4.start();
//中断就解除阻塞 马上响应返回
t3.interrupt();
LockSupport.unpark(t4);
}
}
下一篇我们将介绍非常重要的内容,线程池。