MENU

JAVA线程池详解

2020-05-12 • Code

使用场景

Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead,and they provide a means of bounding and managing the resources,including threads, consumed when executing a collection of tasks.

根据上面官方的解释,线程池主要是为了解决下面的两个问题

  1. 提高执行大量异步任务时的性能
  2. 管理执行异步任务的线程

关键类

▲线程池相关的类
  • ThreadPoolExecutor

    线程池的核心类,实现了对线程的创建、执行、回收等各种管理功能。

  • Executors

线程池的工具类,提供了多种静态方法用来快速创建不同类型的线程池。

关键参数

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

上面是 ThreadPoolExecuto r的构造器,其中定义了线程池的几个关键参数。

corePoolSize

线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize 。如果当前线程数为 corePoolSize 继续提交的任务被保存到阻塞队列中,等待被执行。

maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,直到当前线程数等于 maximumPoolSize。

keepAliveTime

当线程池中的线程数大于 corePoolSize 时,多余线程的存活时间。如果线程空闲的时间达到keepAliveTime,则会被销毁,直到线程池中的线程数不超过 corePoolSize 。但是如果调用了 allowCoreThreadTimeOut(boolean) 方法,keepAliveTime 参数也会起作用,直到线程池中的线程数为0。

unit

keepAliveTime 参数的时间单位。

workQueue

任务缓存队列,用来存放等待执行的任务。如果当前线程数为 corePoolSize ,继续提交的任务就会被保存到任务缓存队列中,等待被执行。这里的 BlockingQueue 可以有以下几种选择:

  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无限制的线程增长

  • LinkedBlockingQueue:基于链表结构的阻塞队列,如果不设置初始化容量,其容量为 Integer.MAX_VALUE ,即为无界队列,按 FIFO 顺序执行任务。如果线程池中线程数达到了corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无限制的增长。

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,构造时需指定容量大小,按FIFO顺序执行任务。可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列。

  • PriorityBlockingQueue: 会按照元素的优先级对元素进行排序,按照优先级顺序出队,每次出队的元素都是优先级最高的元素。注意,此阻塞队列为无界阻塞队列,即容量没有上限。

  • DelayQueue:基于PriorityQueue,一种延时阻塞队列,DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue也是一个无界队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

threadFactory

线程工厂,创建新线程时使用的线程工厂。

handler

任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略:

  • AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,默认策略。

  • CallerRunsPolicy:由调用execute方法的线程执行该任务。

  • DiscardPolicy:丢弃任务,但是不抛出异常。

  • DiscardOldestPolicy:丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义拒绝策略,如记录日志或持久化存储不能处理的任务。

线程池状态

// 线程池的运行状态,总共有5个状态,用高3位来表示
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

从上面的源码可以知道线程池有以下 5 个状态,但是线程池中并没有使用单独的变量来表示线程池的运行状态,而是使用一个AtomicInteger类型的变量ctl来表示线程池的控制状态,其将线程池运行状态与工作线程的数量打包在一个整型中,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量,对ctl的操作主要参考以下几个函数

// 通过与的方式,获取ctl的高3位,也就是线程池的运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//通过与的方式,获取ctl的低29位,也就是线程池中工作线程的数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
//通过或的方式,将线程池状态和线程池中工作线程的数量打包成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
//SHUTDOWN状态的值是0,比它大的均是线程池停止或清理状态,比它小的是运行状态
private static boolean isRunning(int c) {
		return c < SHUTDOWN;
}

RUNNING

高3位为111,接受新任务并处理阻塞队列中的任务

SHUTDOWN

高3位为000,不接受新任务但会处理阻塞队列中的任务

STOP

高3位为001,不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务

TIDYING

高3位为010,所有任务都已终止,工作线程数量为0,线程池将转化到TIDYING状态,即将要执行terminated()钩子方法

TERMINATED

高3位为011,terminated()方法已经执行结束

复用原理

runWorker 方法

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
	//复用的关键在这个while 循环,只要满足  task != null(Worker线程第一次使用是满足) || getTask() !=null 这 2 个条件就能一直复用这个Worker线程
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//真正执行提交的任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask 方法

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
              //如果线程池处于shutdown状态,并且队列为空,或者线程池处于stop或者terminate状态,在线程池数量-1,返回null,回收线程
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
						 //标识当前线程在空闲时,是否应该超时回收
            // 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

getTask()方法的返回情况:

  1. 线程池处于RUNNING状态,阻塞队列不为空,返回成功获取的task对象
  2. 线程池处于SHUTDOWN状态,阻塞队列不为空,返回成功获取的task对象
  3. 线程池状态大于等于STOP,返回null,回收线程
  4. 线程池处于SHUTDOWN状态,并且阻塞队列为空,返回null,回收线程
  5. worker数量大于maximumPoolSize,返回null,回收线程
  6. 线程空闲时间超时,返回null,回收线程

代码示例

PriorityBlockingQueue

public static void main(String[] args) throws Exception {
		ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, 3, 0, TimeUnit.MICROSECONDS, new PriorityBlockingQueue<>(3));
		//故意乱序
		poolExecutor.execute(new OrderTask(3));
		poolExecutor.execute(new OrderTask(1));
		poolExecutor.execute(new OrderTask(2));
	}

	//实现 Comparable 接口 ,也可不实现构造PriorityBlockingQueue 传入一个比较器即可
	static class OrderTask implements Runnable, Comparable<OrderTask> {
		//排序依据
	    int order;
		public OrderTask(int order) {
			this.order = order;
		}
		@Override
		public void run() {
			System.out.println("this taks order = " + order);
		}

		@Override
		public int compareTo(OrderTask o) {
			return order - o.order;
		}
	}
//out 按order排序再执行的
this taks order = 1
this taks order = 2
this taks order = 3

CallerRunsPolicy

public static void main(String[] args) throws Exception {
        //构造一个核心线程和最大线程数都是 1 的线程池,而且任务队列容量也是1 这样的话该线程池就只能最多处理 2 个任务
		ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<>(1),
				new ThreadPoolExecutor.CallerRunsPolicy());
		//提交 3 个任务 会有一个被拒绝		
		for (int i = 0; i < 3; i++) {
			poolExecutor.execute(() -> {
				System.out.println(Thread.currentThread().getName());
			});
		}
	}
//out  在 CallerRunsPolicy 这个策略下超出线程池处理能力的任务会在 execute 方法调用者的线程里执行 ,这里就是 main 线程。
main
pool-1-thread-1
pool-1-thread-1

ForkJoinPool

public static void main(String[] args) throws Exception {
		List<String> tses = new ArrayList<>();
		for (int i = 0; i < 45; i++) {
			tses.add(i + ".ts");
		}
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		ForkJoinTask<List<String>> forkJoinTask = new TseqTask(tses, 10);
		ForkJoinTask<List<String>> task = forkJoinPool.submit(forkJoinTask);
		List<String> res = task.get();
		System.out.println(res);
	}

	static class TseqTask extends RecursiveTask<List<String>> {
		private List<String> tses;
		private int splitSize;
		public TseqTask(List<String> tses, int splitSize) {
			this.tses = tses;
			this.splitSize = splitSize;
		}

		@Override
		protected List<String> compute() {
			if (tses.size() <= splitSize) {
			//此处模拟耗时操作
				return tses;
			}
			List<TseqTask> subTasks = new ArrayList<>();
			for (int i = 0; i < tses.size(); i += splitSize) {
				int to = i + splitSize > tses.size() ? tses.size()  : i + splitSize;
				TseqTask subTsTask = new TseqTask(tses.subList(i, to), splitSize);
				subTasks.add(subTsTask);
				subTsTask.fork();
			}
			List<String> res = new ArrayList<>();
			for (TseqTask subTask : subTasks) {
				try {
					res.addAll(subTask.get());
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			return res;
		}
	}

参考资料

最后编辑于: 2020-05-12 01:27:35
文章列表 文章二维码
QR Code for this page
Tipping QR Code