# 线程池
# 为什么要线程池
举个例子
举个例子来说,线程就像是某个公司的客服小姐姐,每天都要接很多客户的电话,如果同时有1000个客户打电话进来咨询,按正常的逻辑,那就需要1000个客服小姐姐,但是在现实中往往需要考虑成本问题,招这么多人费用太多了,于是就可以这样优化,可以招100个人成立一个客服中心,如果同时超过100个人则提示让客户等待,等有空闲的客服小姐姐时就去响应客户。实现效益最大化。这就是一个池化技术在现实生活中类似的例子。
在Java中,如果每当一个请求到达就创建一个新线程,开销是相当大的。在实际使用中,每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在实际处理实际的用户请求的时间和资源要多的多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。
如果在一个JVM中创建太多的线程,可能会导致系统由于过度消耗内存或者“切换过度”而导致系统资源不足。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。
线程池主要用来解决线程生命周期开销问题和资源不足问题,通过对多个任务重用线程,线程创建的开销被分摊到多个任务上了,而且由于在请求到达时线程已经存在,所以消除了创建所带来的延迟。这样,就可以立即请求服务,使应用程序响应更快。另外,通过适当的调整线程池中的线程数据可以防止出现资源不足的情况。
# ThreadPoolExecutor
ThreadPoolExecutor是Executors类的实现,Executors类里面提供了一些静态工厂,生成一些常用的线程池,主要有以下几个:常用线程池
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
参数名、说明:
- corePoolSize 线程池维护线程的最少数量(核心线程数量)
- 默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将
allowCoreThreadTimeOut设置为true。
- 默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将
- maximumPoolSize 线程池维护线程的最大数量(最大线程数量)
- 线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。
当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。
- 线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。
- keepAliveTime 线程池维护线程所允许的空闲时间
- 非核心线程的闲置超时时间,超过这个时间就会被回收。
- unit 时间单位
- 指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。
- workQueue 任务队列,用来存放我们所定义的任务处理线程
SynchronousQueueLinkedBlockingDeque,是基于双向链表实现的双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(添加或删除);并且该阻塞队列是支持线程安全。可以指定队列的容量,如果不指定默认容量大小是Integer.MAX_VALUE。ArrayBlockingQueue,是基于数组实现的有界阻塞队列,此队列按先进先出的原则对元素进行排序。新元素插入到队列的尾部,获取元素的操作则从队列的头部进行。PriorityBlockingQueue,是带优先级的无界阻塞队列,每次出队都返回优先级最高或者最低的元素(规则可以通过实现Comparable接口自己制定),内部是使用平衡二叉树实现的,遍历不保证有序。
- threadFactory 线程创建工厂
- 线程工厂,提供创建新线程的功能。ThreadFactory是一个接口,只有一个方法;
- 通过线程工厂可以对线程的一些
属性进行定制。 - 在ThreadPoolExecutor中已经包含四种处理策略
public interface ThreadFactory {
Thread newThread(Runnable r);
}
2
3
默认工厂:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager var1 = System.getSecurityManager();
this.group = var1 != null?var1.getThreadGroup():Thread.currentThread().getThreadGroup();
this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
}
public Thread newThread(Runnable var1) {
Thread var2 = new Thread(this.group, var1, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
if(var2.isDaemon()) {
var2.setDaemon(false);
}
if(var2.getPriority() != 5) {
var2.setPriority(5);
}
return var2;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
- RejectedExecutionHandler 线程池对拒绝任务的处理策略
- RejectedExecutionHandler也是一个接口,只有一个方法
- 当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的
rejectedExecution方法。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}
2
3
# 线程池规则
线程池的线程执行规则跟
任务队列有很大的关系。
任务队列没有大小限制:- 如果线程数量<=核心线程数量,那么直接启动一个核心线程来执行任务,不会放入队列中。
- 如果线程数量>核心线程数,但<=最大线程数,并且任务队列是LinkedBlockingDeque的时候,超过核心线程数量的任务会放在任务队列中排队。
- 如果线程数量>核心线程数,但<=最大线程数,并且任务队列是SynchronousQueue的时候,线程池会创建新线程执行任务,这些任务也不会被放在任务队列中。这些线程属于非核心线程,在任务完成后,闲置时间达到了超时时间就会被清除。
- 如果线程数量>核心线程数,并且>最大线程数,当任务队列是LinkedBlockingDeque,会将超过核心线程的任务放在任务队列中排队。也就是当任务队列是LinkedBlockingDeque并且没有大小限制时,线程池的最大线程数设置是无效的,他的线程数最多不会超过核心线程数。
- 如果线程数量>核心线程数,并且>最大线程数,当任务队列是SynchronousQueue的时候,会因为线程池拒绝添加任务而抛出异常。
任务队列大小有限时:- 当LinkedBlockingDeque塞满时,新增的任务会直接创建新线程来执行,当创建的线程数量超过最大线程数量时会抛异常。
- SynchronousQueue没有数量限制。因为他根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。
# execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
- Proceed in 3 steps:
*
- 1. If fewer than corePoolSize threads are running, try to
- start a new thread with the given command as its first
- task. The call to addWorker atomically checks runState and
- workerCount, and so prevents false alarms that would add
- threads when it shouldn't, by returning false.
*
- 2. If a task can be successfully queued, then we still need
- to double-check whether we should have added a thread
- (because existing ones died since last checking) or that
- the pool shut down since entry into this method. So we
- recheck state and if necessary roll back the enqueuing if
- stopped, or start a new thread if there are none.
*
- 3. If we cannot queue task, then we try to add a new
- thread. If it fails, we know we are shut down or saturated
- and so reject the task.
*/
int c = ctl.get();
/**
- workerCountOf方法取出低29位的值,表示当前活动的线程数;
- 如果当前活动的线程数小于corePoolSize,则新建一个线程放入线程池中,并把该任务放到线程中
*/
if (workerCountOf(c) < corePoolSize) {
/**
- addWorker中的第二个参数表示限制添加线程的数量 是根据据corePoolSize 来判断还是maximumPoolSize来判断;
- 如果是ture,根据corePoolSize判断
- 如果是false,根据maximumPoolSize判断
*/
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:
- 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
- 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
- 如 果
workerCount >= corePoolSize && workerCount <maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务; - 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
这里要注意一下
addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。
所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);
也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。
# addWork
addWorker方法的主要作用是在线程池中创建一个新的线程并执行,firstTask参数用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize ,false表示新增线程前需要判断当前活动的线程数是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# Worker
线程池中的每一个对象被封装成一个Worker对象,ThreadPool维护的就是一组Worker对象。
Worker类继承了AQS,并实现了Runnable接口,其中包含了两个重要属性:firstTask用来保存传入的任务,thread是在调用构造方法是通过ThreadFactory来创建的线程,是用来处理任务的线程。
Worker(Runnable firstTask)
public void run()
void interruptIfStarted()
2
3
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
- This class will never be serialized, but we provide a
- serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/*- Thread this worker is running in. Null if factory fails. */
final Thread thread;
/*- Initial task to run. Possibly null. */
Runnable firstTask;
/*- Per-thread task counter */
volatile long completedTasks;
/**
- Creates with given first task and thread from ThreadFactory.
- @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker,这行代码是为了防止线程刚创建就被中断。
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 为什么要继承 AbstractQueuedSynchronizer 类
答案就一点:为了方便获取锁。
- 如果不继承 AbstractQueuedSynchronizer 类,Worker 自身没有锁的能力,需要在类中定义 Lock 属性来获取锁。
- 如果继承 AbstractQueuedSynchronizer 类,Worker 自身拥有锁的能力,不需要再在类中定义 Lock 属性来获取锁。
# 中断 Worker 的线程
/**
* 中断所有线程。
*/
private void interruptWorkers() {
// 遍历Worker集合
for (Worker w : workers)
// 中断Worker的线程
w.interruptIfStarted();
}
2
3
4
5
6
7
8
9
在调用
List<Runnable> shutdownNow()方法关闭线程池的时候,内部就会调用 interruptWorkers() 方法,interruptWorkers() 方法内部再去调用 Worker 的 void interruptIfStarted() 方法中断 Worker 的线程。
# 线程池中任务有三种排队策略
- 直接提交
- 有界队列
- 无界队列
# 在ThreadPoolExecutor中已经包含四种处理策略
# CallerRunsPolicy
线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); }}
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。(开始我总不想丢弃任务的执行,但是对某些应用场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执行的,很可能导致程序没法继续跑了。需要视业务情景而定吧。)
# AbortPolicy
处理程序遭到拒绝将抛出运行时 RejectedExecutionException,直接抛出异常,简单粗暴
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException();}
这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是一种勇气,为了保证后续任务的正常进行,丢弃一些也是可以接收的,记得做好记录)
# DiscardPolicy
不能执行的任务将被删除,什么都不做,既不抛出异常,也不会执行。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
# DiscardOldestPolicy
如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程),当任务被拒绝添加时,会抛弃任务队列中最旧的任务(也就是最先加入队列的任务),再把这个新任务添加进去。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) {e.getQueue().poll();e.execute(r); }}
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
# 常用线程池
# newCachedThreadPool
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
这种类型的线程池特点是:
- 工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。
- 如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。
- 在使用CachedThreadPool时,一定要注意控制任务的数量,否则,由于大量线程同时运行,很有会造成系统瘫痪。
# newFixedThreadPool
创建一个指定工作线程数量的线程池。每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
FixedThreadPool是一个典型且优秀的线程池,它具有线程池提高程序效率和节省创建线程时所耗的开销的优点。但是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工作线程,还会占用一定的系统资源。
# newSingleThreadExecutor
创建一个单线程化的Executor,即只创建唯一的工作者线程来执行任务,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
# newScheduleThreadPool
创建一个定长的线程池,而且支持定时的以及周期性的任务执行,支持定时及周期性任务执行
# Executor线程池只看这一篇就够了
线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。
- 线程实现方式
Thread、Runnable、Callable
//实现Runnable接口的类将被Thread执行,表示一个基本任务
public interface Runnable {
//run方法就是它所有内容,就是实际执行的任务
public abstract void run();
}
//Callable同样是任务,与Runnable接口的区别在于它接口泛型,同时它执行任务候带有返回值;
//Callable的使用通过外层封装成Future来使用
public interface Callable<V> {
//相对于run方法,call方法带有返回值
V call() throws Exception;
}
2
3
4
5
6
7
8
9
10
11
注意:启动Thread线程只能用start(JNI方法)来启动,start方法通知虚拟机,虚拟机通过调用器映射到底层操作系统,通过操作系统来创建线程来执行当前任务的run方法
# Executor框架
Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。从图中可以看出Exectuor下有一个重要的子接口ExecutorService,其中定义了线程池的具体行为:
- execute(Runnable runnable):执行Runnable类型的任务
- submit(task):用来提交Callable或者Runnable任务,并返回代表此任务的Future对象
- shutdown():在完成已经提交的任务后封闭办事,不在接管新的任务
- shutdownNow():停止所有正在履行的任务并封闭办事
- isTerminated():是一个钩子函数,测试是否所有任务都履行完毕了
- isShutdown():是一个钩子函数,测试是否该ExecutorService是否被关闭
# ExecutorService中的重点属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
2
3
# ctl
对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分信息:线程池的运行状态(runState)和线程池内有效线程的数量(workerCount)。
这里可以看到,使用Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY 就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
# ctl相关方法
//获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取活动线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
//获取运行状态和活动线程数的值
private static int ctlOf(int rs, int wc) { return rs | wc; }
2
3
4
5
6
# 线程池的状态
- RUNNING = 1 << COUNT_BITS; //高3位为111
- SHUTDOWN = 0 << COUNT_BITS; //高3位为000
- STOP = 1 << COUNT_BITS; //高3位为001
- TIDYING = 2 << COUNT_BITS; //高3位为010
- TERMINATED = 3 << COUNT_BITS; //高3位为011
1、RUNNING
状态说明:线程池处于RUNNING状态,能够接收新任务,以及对已添加的任务进行处理。
状态切换:线程池的初始化状态是RUNNING。换句话说,线程池一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。
2、SHUTDOWN
状态说明:线程池处于SHUTDOWN状态,不接收新任务,能够处理已经添加的任务。
状态切换:调用shutdown()方法时,线程池由RUNNING -> SHUTDOWN。
3、STOP
状态说明:线程池处于STOP状态,不接收新任务,不处理已提交的任务,并且会中断正在处理的任务。
状态切换:调用线程池中的shutdownNow()方法时,线程池由(RUNNING or SHUTDOWN) -> STOP。
4、TIDYING
状态说明:当所有的任务已经停止,ctl记录“任务数量”为0,线程池会变为TIDYING状态。当线程池处于TIDYING状态时,会执行钩子函数 terminated()。 terminated()在ThreadPoolExecutor类中是空, 的,若用户想在线程池变为TIDYING时,进行相应处理,可以通过重载 terminated()函数来实现。
状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行任务也为空时,就会由SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP-> TIDYING。
5、TERMINATED
状态说明:线程池线程池彻底停止,线程池处于TERMINATED状态,
状态切换:线程池处于TIDYING状态时,执行完terminated()之后, 就会由TIDYING->TERMINATED。
# 线程池使用
RunTask类:
public class RunTask implements Runnable {
public void run() {
System.out.println("Thread name:"+Thread.currentThread().getName());
}
}
ExecutorSample类:
public class ExecutorSample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i=0;i<20;i++){
//提交任务无返回值
executor.execute(new RunTask());
//任务执行完成后有返回值
Future<Object> future = executor.submit(new RunTask());
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 线程池的具体使用
- ThreadPoolExecutor 默认线程池
- ScheduledThreadPoolExecutor 定时线程池
ThreadPoolExecutor
# 线程池的创建
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
2
3
4
5
6
7
8
9
10
- corePoolSize:线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
- maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。
- keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize时候,如果这时候没有新的任务提交,核心线程外的线程不会立即被销毁,而是会等待,直到等待的时间超过了keepAliveTime
- unit:keepAliveTime的单位时间
- workQueue:用于保存等待被执行的任务的阻塞队列,且任务必须实现Runnable接口,在JDK中提供了如下阻塞队列:
- ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。
- LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQueue。
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue。
- PriorityBlockingQueue:具有优先级的无界阻塞队列。
- threadFactory:ThreadFactory 类型的变量,用来创建新线程。默认使用ThreadFactory.defaultThreadFactory来创建线程, 会使新创建线程具有相同的NORM_PRIORITY优先级并且都是非守护线程,同时也设置了线程名称。
- handler:线程池的饱和策略。当阻塞队列满了,且没有空闲的工作队列,如果继续提交任务,必须采用一种策略处理该任务.
# 线程池的监控
- public long getTaskCount() //线程池已执行与未执行的任务总数
- public long getCompletedTaskCount() //已完成的任务数
- public int getPoolSize() //线程池当前的线程数
- public int getActiveCount() //线程池中正在执行任务的线程数量
# 线程池的原理
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意这一个步骤需要获取全局锁)。
- 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意这一个步骤需要获取全局锁)。
- 如果创建的新线程将使当前运行的线程超出maximumPoolSize,任务将被执行饱和策略。
- ThreadPoolExecutor 采用上述的设计思路,是为执行execute()方法时,尽可能避免获取全局锁(一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后,几乎所有的execute()方法调用都是在执行步骤2,而步骤2不需要获取全局锁。
# 阿里规范要求不能使用Executors
第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
第二:提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
# 阿里线程池的创建
我们可以通过ThreadPoolExecutor来创建一个线程池。
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);
- corePoolSize - 线程池核心池的大小。
- maximumPoolSize - 线程池的最大线程数。
- keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
- unit - keepAliveTime 的时间单位,有日,小时,分钟,秒等等
- workQueue - 用来储存等待执行任务的队列。
- threadFactory - 线程工厂。
- handler - 拒绝策略(饱和策略)。

# 阿里线程池创建方式
根据阿里巴巴java开发规范,推荐了3种线程池创建方式
- 推荐方式1:首先引入:commons-lang3包
- 推荐方式 2:首先引入:com.google.guava包
- 推荐方式 3:spring配置线程池方式:自定义线程工厂bean需要实现ThreadFactory,可参考该接口的其它默认实现类,使用方式直接注入bean,调用execute(Runnable task)方法即可
# enough
# 开启线程的三种方式
- 1)继承Thread类,重写run()方法,在run()方法体中编写要完成的任务
new Thread().start(); - 2)实现Runnable接口,实现run()方法
new Thread(new MyRunnable()).start(); - 3)实现Callable接口MyCallable类,实现call()方法,使用FutureTask类来包装Callable对象,使用FutureTask对象作为Thread对象的target创建并启动线程;调用FutureTask对象的get()方法来获得子线程执行结束后的返回值。
FutureTask<Integer> ft = new FutureTask<Integer>(new MyCallable());
new Thread(ft).start();
2
# run()和start()方法区别
run()方法只是线程的主体方法,和普通方法一样,不会创建新的线程。
只有调用start()方法,才会启动一个新的线程,新线程才会调用run()方法,线程才会开始执行。
# 在Java中wait和seelp方法的不同
wait()方法属于Object类,调用该方法时,线程会放弃对象锁,只有该对象调用notify()方法后本线程才进入对象锁定池准备获取对象锁进入运行状态。
sleep()方法属于Thread类,sleep()导致程序暂停执行指定的时间,让出CPU,但它的监控状态依然保存着,当指定时间到了又会回到运行状态,sleep()方法中线程不会释放对象锁。
# 谈谈wait/notify关键字的理解
- notify: 唤醒在此对象监视器上等待的单个线程
- notifyAll(): 通知所有等待该竞争资源的线程
- wait: 释放obj的锁,导致当前的线程等待,直接其他线程调用此对象的notify()或notifyAll()方法
当要调用wait()或notify()/notifyAll()方法时,一定要对竞争资源进行加锁,一般放到synchronized(obj)代码中。
当调用obj.notify/notifyAll后,调用线程依旧持有obj锁,因此等待线程虽被唤醒,但仍无法获得obj锁,直到调用线程退出synchronized块,释放obj锁后,其他等待线程才有机会获得锁继续执行。
# 什么导致线程阻塞
- 1)线程执行了Thread.sleep(int millsecond)方法,放弃CPU,睡眠一段时间,一段时间过后恢复执行;
- 2)线程执行一段同步代码,但无法获得相关的同步锁,只能进入阻塞状态,等到获取到同步锁,才能恢复执行;
- 3)线程执行了一个对象的wait()方法,直接进入阻塞态,等待其他线程执行notify()/notifyAll()操作;
- 4)线程执行某些IO操作,因为等待相关资源而进入了阻塞态,如
System.in,但没有收到键盘的输入,则进入阻塞态。 - 5)线程礼让,Thread.yield()方法,暂停当前正在执行的线程对象,把执行机会让给相同或更高优先级的线程,但并不会使线程进入阻塞态,线程仍处于可执行态,随时可能再次分得CPU时间。
- 线程自闭,join()方法,在当前线程调用另一个线程的join()方法,则当前线程进入阻塞态,直到另一个线程运行结束,当前线程再由阻塞转为就绪态。
- 6)线程执行suspend()使线程进入阻塞态,必须resume()方法被调用,才能使线程重新进入可执行状态
# 线程如何关闭
- 1 ) 使用标志位
- 2)使用stop()方法,但该方法就像关掉电脑电源一样,可能会发生预料不到的问题
- 3)使用中断interrupt()
public class Thread {
// 中断当前线程
public void interrupt();
// 判断当前线程是否被中断
public boolen isInterrupt();
// 清除当前线程的中断状态,并返回之前的值
public static boolen interrupted();
}
2
3
4
5
6
7
8
但调用interrupt()方法只是传递中断请求消息,并不代表要立马停止目标线程。