java线程池笔记

1、线程池继承图

classDiagram
    ThreadPoolExecutor <|-- AbstractExecutorService
    AbstractExecutorService <|-- ExecutorService
    ExecutorService <|-- Executor
    <<interface>> Executor
    <<interface>> ExecutorService
    <<abstract>> AbstractExecutorService

2、创建线程池

构造函数-核心参数

int corePoolSize 核心线程数 int maximumPoolSize 最大线程数 long keepAliveTime 空闲线程保存时间 TimeUnit unit 空闲线程保存时间单位 BlockingQueue workQueue 等待队列 ThreadFactory threadFactory 线程工厂 用于新建thread RejectedExecutionHandler handler 饱和策略

核心参数

        //ctl原子类高三位代表线程池状态,低29位维护线程池中线程数量
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        //COUNT_BITS:29
        //CAPACITY:11111111111111111111111111111
        //Integer.SIZE=32,也就是29位代表线程数量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

        //线程池的五种状态 线程池一般也是按照下面状态流转的
        //RUNNING:111 线程池正在运行中,处理任务并接收任务
        //SHUTDOWN:000 处理任务但是不再接收任务
        //STOP:001 不接收任务,且不再处理任务并且会中断正在执行的任务
        //TIDYING:010 线程池中所有任务都接收,worker为0,并且会调用terminated方法
        //TERMINATED:100 terminated方法调用之后会变成此状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 两个方法分别是获取线程池状态(ctl高三位),工作的线程个数(ctl低29位)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
        //参数rs表示runState,参数wc表示workerCount,即根据runState和workerCount打包合并成ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

核心执行方法:

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 * 在未来的某个时刻执行给定的任务。这个任务用一个新线程执行,或者用一个线程池中已经存在的线程执行
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 * 如果任务无法被提交执行,要么是因为这个Executor已经被shutdown关闭,要么是已经达到其容量上限,任务会被当前的RejectedExecutionHandler处理
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution                 RejectedExecutionException是一个RuntimeException
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     * 如果运行的线程少于corePoolSize,尝试开启一个新线程去运行command,command作为这个线程的第一个任务
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 如果任务成功放入队列,我们仍需要一个双重校验去确认是否应该新建一个线程(因为可能存在有些线程在我们上次检查后死了) 或者 从我们进入这个方法后,pool被关闭了
     * 所以我们需要再次检查state,如果线程池停止了需要回滚入队列,如果池中没有线程了,新开启 一个线程
     * 
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     * 如果无法将任务入队列(可能队列满了),需要新开区一个线程(自己:往maxPoolSize发展)
     * 如果失败了,说明线程池shutdown 或者 饱和了,所以我们拒绝任务
     */
    int c = ctl.get();

    /**
     * 1、如果当前线程数少于corePoolSize(可能是由于addWorker()操作已经包含对线程池状态的判断,如此处没加,而入workQueue前加了)
     */
    if (workerCountOf(c) < corePoolSize) {
        //addWorker()成功,返回
        if (addWorker(command, true))
            return;

        /**
         * 没有成功addWorker(),再次获取c(凡是需要再次用ctl做判断时,都会再次调用ctl.get())
         * 失败的原因可能是:
         * 1、线程池已经shutdown,shutdown的线程池不再接收新任务
         * 2、workerCountOf(c) < corePoolSize 判断后,由于并发,别的线程先创建了worker线程,导致workerCount>=corePoolSize
         */
        c = ctl.get();
    }

    /**
     * 2、如果线程池RUNNING状态,且入队列成功
     */
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();//再次校验位

        /**
         * 再次校验放入workerQueue中的任务是否能被执行
         * 1、如果线程池不是运行状态了,应该拒绝添加新任务,从workQueue中删除任务
         * 2、如果线程池是运行状态,或者从workQueue中删除任务失败(刚好有一个线程执行完毕,并消耗了这个任务),确保还有线程执行任务(只要有一个就够了)
         */
        //如果再次校验过程中,线程池不是RUNNING状态,并且remove(command)--workQueue.remove()成功,拒绝当前command
        if (! isRunning(recheck) && remove(command))
            reject(command);
        //如果当前worker数量为0,通过addWorker(null, false)创建一个线程,其任务为null
        //为什么只检查运行的worker数量是不是0呢?? 为什么不和corePoolSize比较呢??
        //只保证有一个worker线程可以从queue中获取任务执行就行了??
        //因为只要还有活动的worker线程,就可以消费workerQueue中的任务
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);  //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
                                     //第二个参数为true代表占用corePoolSize,false占用maxPoolSize
    }
    /**
     * 3、如果线程池不是running状态 或者 无法入队列
     *   尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
     */
    else if (!addWorker(command, false))
        reject(command);
}

3、基本使用

Executors提供了四种常见的线程池

newFixedThreadPool 固定线程池,核心线程池个数固定,采用LinkedBlockingQueue作为等待队列,LinkedBlockingQueue基本上算是无限的,也就是说任务进来到达核心线程个数上线之后,就一直等待。

newSingleThreadExecutor 单线程线程池,相当于newFixedThreadPool 设置的核心线程数为1,保证只有一个线程在执行任务,其余的都在等待,任务执行完成后继续下一个

newCachedThreadPool 可缓存线程池 核心线程数为9,最大线程数为Integer.MAX_VALUE,空闲时间为60s,等待队列使用了SynchronousQueue,SynchronousQueue是一种无缓冲的等待队列,也就是说真正执行的线程数可以动态伸缩,比较适合任务多,执行快的。

newScheduledThreadPool是可调度的线程池,可以在设定的时间内执行,其实现方式与上面三个是不一样的,实现类为ScheduledThreadPoolExecutor。

参考博客

results matching ""

    No results matching ""