线程复用:线程池
一个系统中,如果盲目大量地创建线程对系统是有伤害的。大量的线程回收也会给GC增加很大的压力,延长GC的停顿时间,同数据库连接池一样,为了避免创建和销毁线程的高开销,对线程进行复用,需要时从线程池中获取线程,用完还给线程池,让线程池 空闲线程 和 工作线程 处于一个动态平衡之中。
不要重复发明轮子:JDK对线程池的支持
为了更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员进行线程控制,其本质就是线程池。包含了许许多多的方法,以上成员均在java.util.concurrent包中,都是jdk并发包的核心类。其中ThreadPoolExecutor表示一个线程池。
Executor接口提供了各种类型的线程池,主要有以下工厂方法:
子接口ExecutorService:public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduleExecutorService newScheduledThreadPool (int corePoolSizze)
以上工厂方法提供了不同特性的线程池 具体说明如下:
- newFixedThreadPool()方法:返回一个固定数量的线程池,如果有空闲线程立即执行,如果没有则新任务会暂存在一个等待队列中,待有线程空闲时便处理任务队列中的任务。
- newSingleThreadExecutor()方法:返回一个单线程的线程池,如果没有空闲线程复用那么进入等待队列等待。
- newCachedThreadPool()方法:该方法返回一个动态的线程池,如果没有空闲线程可以复用,那么就创建一个新线程,创建完毕后返回复用
- newSingleThreadScheduledExecutor()方法:返回一个线程池大小为一,可以定时执行任务的线程池(或者周期性执行任务)
- newScheduledThreadPool()方法:该方法也返回一个可以执行定时任务 指定大小的线程池。
一个基本的newFixedThreadPool()
示例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public static class PoolDemo1 implements Runnable{
public void run() {
//打印当前线程部分信息
System.out.println("Thread ID="+Thread.currentThread().getId()+"");
try {
//休息1s
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
PoolDemo1 poolDemo1 = new PoolDemo1();
//构建线程池固定大小
//ExecutorService pool = Executors.newFixedThreadPool(5);
//构建动态的线程池 这样的话十个线程ID都不一样新建了10个
ExecutorService pool = Executors.newCachedThreadPool();
for(int i=0;i<10;i++){
//由于线程池只有5个固定线程 所以 启动5个后会停顿1s然后线程的ID可以看出接下来的5个是和原来的一样的
pool.submit(poolDemo1);
}
//停止操作
pool.shutdown();
}
}
计划任务newScheduledThreadPool:
另一个值得注意的方法是newScheduledThreadPool 它返回一个ScheduledExcutorService对象,可以根据时间需要对线程进行调度。它的一些主要方法如下:public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit)
;public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)
;public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
;
与其他几个线程不同,ScheduledExecutorService并不一定会立即安排执行任务,它是起到了计划任务的作用,会在指定的时间对任务进行周期性地调度。schedule会在给定时间进行一次调度,而二三两个方法都是进行周期性地调度,但是有小小的区别。
FixedRate()任务即意思是每一个任务在一个周期性任务开始执行时间为起点,调度固定频率周期的任务,而FixDelay则是在上一个任务结束以后,在经过delay时间进行任务调度。代码:
1 | ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10); |
另外一个值得注意的问题就是异常处理,因为遇到异常后续任务都会停止调度,因此必须保证异常被及时处理,为周期性任务提供稳定的调度条件。
线程池实现原理
newFixedThreadPool或者newCachedThreadPool等线程池内部实现均采用了ThreadPoolExecutor实现。1
2
3
4
5public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ThreadPoolExecutor构造方法为:1
2
3
4
5
6
7
8
9
10
11 public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler
) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
以上几个参数意义如下:
- corePoolSize :指定了线程池的核心线程数量
- maximumPoolSize:线程池最大数量
- long keepAliveTime:当线程数量超过了corePoolSize时候 多余线程的存活时间
- workQueue 阻塞队列,被提交但是尚未执行的任务。
- threadFactory 线程工厂 一般用默认的即可
- handler 拒绝策略。当任务太多时候采用什么拒绝策略。
以上的参数大部分很容易理解,其中workQueue它是一个阻塞队列BlockingQueue接口的对象,仅用于存放Runnable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中可以使用如下几种阻塞队列BlockingQueue:
直接提交队列 SynchronousQueue :该队列是一种特殊的阻塞队列,它没有任何容量,每一个插入操作都要等待一个对应的删除操作,反之删除操作也要等待插入操作,如果是此种阻塞队列,提交的任务不会被真实的保存而是直接交由线程去执行。如果没有空闲线程就新建,达到最大值就采用拒绝策略,使用此种阻塞队列就需要线程池的最大容量特别大否则很容易就去执行拒绝策略。
有界的任务队列 ArrayBlockigQueue(int capacity) :这是构造一个有指定capacity容量的阻塞队列,在线程数小于corePoolSize时候会创建新线程,若大于corePoolSize那么就会将新任务加入等待队列,如果等待队列已满且线程数小于MaximumPoolSize的话那么就新建线程去执行任务,大于MaxmumPoolSize的话就执行拒绝策略。可见有界队列仅在任务队列满时候,才有可能将线程数提升到corePoolSize之上。换言之除非系统非常繁忙,否则确保核心线程数维持在CorePoolSize左右。
无界的任务队列 LinkedBlockingQueue
(int capacity or null) :与有界队列相比,无界任务队列LinkedBlockingQueue不存在无法入队的情况(除非系统资源耗尽),当线程数大于CorePoolSize时候新的任务都会进入此队列进行等待,如果任务的创建和执行速度差异很大,该阻塞队列会快速增长直至耗尽系统资源。优先任务队列 PriorityBlockingQueue :PriorityBlockingQueue是一种特殊的无界队列,它可以控制任务执行的先后顺序,它不同于其他先进先出的阻塞队列,它可以根据任务 自身的优先级顺序先后执行,确保系统性能的同时也能有很好的质量保证。
根据如上所述,观看newFixedThreadPool()方法可以看出:1
2
3
4
5public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
它采用的是无界的非阻塞队列,也就是说如果大量地任务进行提交而之前的来不及处理时会耗尽系统资源。newSingleThreadExecutor()
方法返回单线程池,它是newFixedThreadPool()方法的一种退化,只是简单地将线程池输了设置为1。newCachedThreadPool()
方法返回corePoolSize为0,maximumPoolSize为无穷大的线程池,这意味着没有任务的时候线程池内无线程,有任务就会使用直接提交队列synchronousBlockingQueue
,超大并发提交任务时候会极快地耗尽系统资源。
注意:使用自定义的线程池时候,要根据实际情况选择对应的阻塞队列,这样对系统的影响也不同。
JDK的四种线程池拒绝策略
- AbortPolicy策略:该策略直接抛出异常,简单粗暴阻止工作。
- Cal- lerRunsPolicy 策略:只要线程未关闭,就在当前调用者线程中运行被丢弃的任务。但是这样做会极大地降低调用线程的性能。
- Dis- cardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前事务。
- Dis- cardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,那么可能是最好的一种方案了。
以上的内置策略都实现了RejectedExeutionHandler接口,若以上策略仍无法满足实际应用的需要,完全可以自己扩展。
ThreadPoolExecutor.AbortPolicy
:丢弃任务并抛出RejectedExecutionException异常。ThreadPoolExecutor.DiscardPolicy
:也是丢弃任务,但是不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy
:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)ThreadPoolExecutor.CallerRunsPolicy
:由调用线程处理该任务
以上我们介绍的都是默认的ThreadFactory(Executors.defaultThreadFactory())
我们也可以自定ThreadFactory 通过重写new ThredFactory(){//...method}
就行。
线程池的扩展
ThreadPoolExecutor也是一个可扩展的线程池,它提供了beforeExecute()
和afterExecute()
和terminated()
三个接口对线程池进行控制。ThreadPoolExecutor.Worker.runTask()
方法内部实现可以看出Worker.runTask()
方法会被线程池以多线程模式异步调用,这就使得beforeExecute()和afterExecute接口也将同时多线程访问。下面演示一个通过beforeExecute()和afterExecute()的例子。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56/**
* 线程池执行实例
* @author 圈圈
* @version 1.0
*/
public class ThreadPoolExecutorDemo {
public static class Executor1 implements Runnable{
public String name;
public Executor1(String name){
this.name=name;
}
public Executor1(){
}
public void run() {
// TODO Auto-generated method stub
try {
Thread.sleep(1000);
System.out.println("正在执行的线程ID= "+Thread.currentThread().getId()+" 线程名为:"+name);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
//构建一个线程池执行器实例
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()){
//线程池运行前调用
protected void beforeExecute(Thread t, Runnable r) {
// TODO Auto-generated method stub
super.beforeExecute(t, r);
System.out.println("准备执行"+((Executor1)r).name);
}
//运行后调用
protected void afterExecute(Runnable r, Throwable t) {
// TODO Auto-generated method stub
super.afterExecute(r, t);
System.out.println(((Executor1)r).name+" 执行完毕");
}
//线程池退出时调用
protected void terminated() {
System.out.println("线程池退出!");
};
};
//用自定义线程池执行器 执行5个线程execute而非submit方法
for(int i=0;i<5;i++){
executor.execute(new Executor1("线程"+i));
Thread.sleep(800);
}
//这是比较安全的一个关闭线程池的方法 当然只是发出一个信号 等待执行完成后再停止
executor.shutdown();
}
}
合理安排线程池大小:优化线程池数量
只要避免极大和极小两种情况,线程池影响不会太大。《Java Courrency in prictice》中提到估算线程池大小的经验公式:
Ncpu=CPU的数量 Ucpu =目标CPU的使用率 0-1之间;
W/C = 等待时间与计算时间的比率
那么最优线程池的线程数量大小等于
Nthreads = Ncpu Ucpu (1+W/C)
在 Java中 可以通过Runtime.getRuntime().avaliableProcessors()
获取CPU的数量。
线程池中隐蔽的错误
使用线程池虽然是好事,但是也会让许多异常被“吃掉”了。这样使得排查极为麻烦。例如两数相除分母为0可能使用submit不会打印错误,这时候就要使用execute()方法 可以去堆栈信息中查询异常:
这样可以扩展我们的ThreadPoolExecutor 让它在调度任务之前执行打印我们的异常信息。详情看书P115.
分而治之:Fork/Join框架
分而治之一直是非常有效地处理大数据的方法,著名的MapReduce也是采取分治思想,简单来说就是你没有处理1000个数据的能力,然后你就100个处理分阶段批次处理10次,然后对结果进行合成以达到你要处理1000个数据的能力。Linux中通过fork来创建子进程Java使用ForkJoinPool连接池来管理Fork/Join处理的资源效率,Java中由于线程池的优化,提交任务和线程数量不是一对一的关系,大多数情况下一个物理线程要执行多个逻辑任务,因此每个线程必须要有一个任务队列,例如线程A已经执行完任务了,它就会帮线程B执行任务,线程B就从队列的顶部获取任务,线程A则从队列底部获取任务,这样也避免冲突执行优化。
下面是一个ForkJoinPool的一个重要接口:public<T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
这里的ForkJoinTask就是支持fork和join 的等待任务。ForkJoinTask有两个重要子类,RecursiveAction(无返回) 和 RecursiveTask(有返回类型)。它们分别表示没有返回值的任务和可以携带返回值的任务。
下面演示一个例子,由于需要返回值 所以采用RecurisiceTask1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87/**
* 分治思想
* Fork / Join 测试
* Fork/join 框架的使用 这里实例用来计算数列求和
* @author 圈圈
* @version 1.0
*/
public class ForkJoinDemo extends RecursiveTask<Long>{
/**
*
*/
private static final long serialVersionUID = 1L;
//设置任务分解的规模 如果大于它就分解 如果小于就直接记录
public static final int THEREHOLD= 10000;
private long start;
private long end;
public ForkJoinDemo(long start, long end) {
super();
this.start = start;
this.end = end;
}
//实现抽象方法 返回计算结果
protected Long compute() {
long sum =0 ;
//判断是否还需要分区
boolean canCompute = (end-start)<THEREHOLD;
if(canCompute){
//如果不需要分区 直接计算
for(long i = start;i<end;i++){
sum+=i;
}
}else{
//大于10000就需要分区
//分成100个等规模的小任务 并fork提交
long step = (start+end)/100;
//构建用于分区的子任务集合list
ArrayList<ForkJoinDemo> list =new ArrayList<ForkJoinDemo>();
//指针位置
long pos = start;
for (int i=0;i<100;i++) {
long lastOne = pos+step;
if(lastOne>end) lastOne=end;
//fork分支的子任务
ForkJoinDemo subTask = new ForkJoinDemo(pos, lastOne);
pos+=step+1;
list.add(subTask);
//执行分支
subTask.fork();
}
//分支执行完 join 收集结果
for(ForkJoinDemo fork:list){
//聚集统计总和
sum+=fork.join();
}
}
//返回聚集后的结果
return sum;
}
public static void main(String[] args) {
//构建Fork/Join Pool线程池
ForkJoinPool fjpool = new ForkJoinPool();
//构建要分支计算的起始值
ForkJoinDemo demo = new ForkJoinDemo(0, 200000L);
//任务提交给线程池
ForkJoinTask<Long> result = fjpool.submit(demo);
//取值
try {
Long result1 = result.get();
System.out.println("总和为:"+result1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
使用fork join 时候需要注意,如果任务划分层次很深,一直无返回结果 那么可能有两种情况:
1、系统内线程数越积越多 导致性能下降卡顿。
2、函数调用层次太深,导致栈溢出。
此外,ForkJoin 使用一个无锁的栈来管理空闲线程,如果一个工作线程暂时取不到可用的任务,则可能会被挂起,被压入线程池维护的栈中,待将来有任务可用时再从栈中唤醒这些线程。
JDK的并发容器
JDK为大家准备了一批好用且平易近人的容器类,它们可以大大地减少开发工作量。如下列举一些:
- ConcurrentHashMap: 这是一个高效的并发HashMap。你可以理解为一个线程安全的HashMap。
- CopyOnWriteArrayList:这是一个List,从名字来看就和ArrayList是一族的。在读多写少的并发场合性能远远好于Vector;
- ConcurrentLinkedQueue:高效的并发队列,可以理解为高效的线程安全的LinkedList
- BlockingQueue:之前有介绍过几种阻塞队列,这是一个接口,提供了 多种实现以及子接口,非常适合用于作为数据共享的通道。
- ConcurrentSkipListMap:跳表的实现,这是一个Map,通过跳表的数据结构进行快速查找。
除以上介绍的外。java.util 的Collections工具类也提供了线程安全的转换集合。
ConCurrentLinkedQueue:高并发环境中性能最好的队列就是它了。因为它有很复杂的实现。
高效读取:不变模式下的CopyOnWriteArrayList
很多应用场景中读操作远远大于写操作,比如有些系统级别的信息 往往只需要加载或者修改很少的次数,但是会被系统内所有模块频繁的访问 这种场景就需要读效率高 写就无所谓了。CopyOnWriteArrayList 这在读写锁上 读写分离的思想上又提升了一步,它读与写之间是可以并行的,但是写与写是互斥的。这样一来读操作性能就大幅提升了。那是怎么实现的呢?顾名思义 从CopyOnWrite可以看出 写操作是将原值复制一份 修改副本 写完之后再将修改后的副本替换掉原来的数据。这样保证写操作不会影响读操作了。
查看源码其读操作没有任何锁限制,非常简单。写操作:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
是通过重入锁锁住然后创建副本 然后通过setArray修改变量值。 由于array是volatile的所以对读线程可见,读线程能立即察觉。
数据共享通道:BlockingQueue
BlockingQueue是一个接口,其之所以适合作为数据共享通道还在于他的blocking阻塞上,它会让服务线程在队列为空时,进行等待,当有新的消息进入队列后自动将线程唤醒。这时候就有队列的两个入队方法offer()
如果队列满了返回false 不阻塞
put() 如果队列满了 就等待阻塞(notFull.await 直到调用take()方法的extract()方法 有空位时候 就有notFull.signal())
对应的两个出队方法:poll()
如果队列为空返回Null 不阻塞take()
如果队列为空 则等待阻塞(对应字段 Condition notEmpty.await()) 直到队列有可用元素(insert()方法有notEmpty.signal())。
BlockingQueue的使用非常普遍,在后续的“5.3”生产者消费者一节中我们还会看到他们的身影,看他们如何使用BlockingQueue解耦生产者和消费者。
随机数据结构:跳表(ConcurrentSkipListMap)
JDK并发包中除了常见的哈希表以外,还有一种有趣的数据结构—跳表。跳表也是一种用来快速查找的数据结构,有点类似于平衡树。它们都可以对元素进行快速的查找,但一个重要的区别是:对平衡树的插入和删除操作会改变树的整体结构,对跳表的插入和删除操作只改变部分的数据结构,这时候需要一个全局的锁来保证平衡树的安全,对于跳表只需要部分的锁即可。这样在高并发环境下就有更好的性能,就查询性能而言 跳表也是O(log n)所以JDK用跳表实现了一个Map。
跳表的另外一个特点是随机算法。跳表的本质是维护了多个链表,并且链表是分层的。例如最底层维护了一张跳表所有元素,往上一层是其子集,一个元素插入哪些层完全是随机的,如果运气不好性能会很糟糕,但实际情况而言它的表现是非常好的。
跳表内所有元素是排好序的,通过跳跃式的算法进行查找,例如:
查找元素7
h –> 3 —> 6
h –1 – 3—6 —-8
h–1–2–3–4–5–6–7–8
跳表会从顶层往下查找,查找元素7发现第一层6,快速跳跃小于7的元素,深入第二层6的位置发现8比7大就直接深入底层查找到了7。整个过程比一般链表从 1 开始逐步遍历要好的多。因此很显然,跳表是一种用空间换时间的方法。
使用跳表实现Map和哈希表实现Map区别为 哈希表Map不会保存元素顺序,而跳表会保存,且跳表内所有元素都是有序的,所以如果应用需求需要有序数据 那么跳表是不二选择 具体实现为 ConcurrentSkipListMap。 其内部数据结构主要是 Node<K,V> Node所有操作均采用CAS,另外还有一个就是Index 通过Index进行全网组织。