云迈博客

您现在的位置是:首页 > 灌水专栏 > 正文

灌水专栏

线程池的相关知识

刘思佳2022-08-29灌水专栏173
概念定义线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控。使用场景单个任务处

概念

定义

线程池”,顾名思义就是一个线程缓存,线程是稀缺资源,如果被无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、调优和监控。

使用场景

  • 单个任务处理时间短。
  • 需要处理的任务数量大。

    优势

  1. 重用存在的线程,减少线程创建,消亡的开销,提高性能。

  2. 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。

  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    相关类

    Executor类

    Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。

    ExecutorService类

    相关方法:

  4. execute(Runnable command):履行Ruannable类型的任务。

  5. submit(task):可用来提交Callable或Runnable任务,并返回代表此任务的Future 对象。

  6. shutdown():在完成已提交的任务后封闭办事,不再接管新任务。

  7. shutdownNow():停止所有正在履行的任务并封闭办事。

  8. isTerminated():测试是否所有任务都履行完毕了。

  9. isShutdown():测试是否该ExecutorService已被关闭。

    Executors类

    创建线程池的工具类,主要是一些jdk的内置线程池。

    newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
     return new ThreadPoolExecutor(nThreads, nThreads,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>());
    }

    特点:只有核心线程,没有worker线程,面对大流量的时候,处理速度有一点满,但是有一点它的阻塞队列是一个无上限的阻塞链表,很容易导致OOM,建议不要使用。
    newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
         (new ThreadPoolExecutor(1, 1,
                                 0L, TimeUnit.MILLISECONDS,
                                 new LinkedBlockingQueue<Runnable>()));
    }

    特点:只有一个核心线程,没有worker线程,处理速度很慢,但是也使用了一个无上限的阻塞链表,很容易导致OOM。
    newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                   60L, TimeUnit.SECONDS,
                                   new SynchronousQueue<Runnable>());
    }

    特点:核心线程数为0,最大线程数可以无限大,来多少我接多少(皮包骨公司,无限接受),流量大会导致CPU飙升飚满。
    总结:一般来说我们都不会使用JDK自带的创建线程池的方式,或多或少都满足不了高并发的场景,如果大概知道相关的并发量是可以使用的。

    ThreadPoolExecutor类

    自定义线程池类

    相关参数

    ```java
    /**

    • ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,
    • 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),
    • 这里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存
    • workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),
    • 这个常量表示workerCount的上限值,大约是5亿。
      */
      private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
      // Integer.SIZE = 32
      private static final int COUNT_BITS = Integer.SIZE - 3;
      // CAPACITY = 2^29 - 1
      private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// 获取运行状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取活动线程数
private static int workerCountOf(int c) { return c & CAPACITY; }
// 获取运行状态和活动线程数的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

### 线程池的状态
```java
// 高3位为111
private static final int RUNNING    = -1 << COUNT_BITS;
// 高3位为000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 高3位为001
private static final int STOP       =  1 << COUNT_BITS;
// 高3位为010
private static final int TIDYING    =  2 << COUNT_BITS;
// 高3位为011
private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING
    • 状态说明:能够接受新任务,也能执行已添加进入队列里面的任务。
    • 状态转变:线程池的初始状态,线程池一创建就处于该状态。
  • SHUTDOWN
    • 状态说明:只能够执行已经添加进入队列里面的任务。
    • 状态转变:调用线程池的shutDown方法。
  • STOP
    • 状态说明:什么都不会去执行,并且还说中断在执行的任务。
    • 状态转变:调用线程池的shutDownNow()方法。
  • TIDYING
    • 状态说明:(1) 所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。
    • 状态转变:1) 线程池处于shutdown状态,并且队列任务为空;2)当线程池处于stop状态时,且线程池执行任务为空。
  • TERMINATED
    • 状态说明:线程池彻底终止
    • 状态转变:线程池处在TIDYING状态时,执行完terminated()之后。

      构造方法

      // corePoolSize : 核心线程数,在线程池中将一直维护一定数量的核心线程处理任务。
      // maximumPoolSize : 最大线程数,辅助核心线程数处理任务。
      // keepAliveTime : 线程允许的空闲时间 (除核心线程之外的线程允许的空闲时间)
      // unit : 空闲时间单位
      // workQueue : 保存等待被执行的任务的阻塞队列
      // threadFactory : 创建线程的方式,一般都使用默认的
      // handler : 线程饱和策略
      public ThreadPoolExecutor(int corePoolSize,
                               int maximumPoolSize,
                               long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory,
                               RejectedExecutionHandler handler) {
      if (corePoolSize < 0 ||
         maximumPoolSize <= 0 ||
         maximumPoolSize < corePoolSize ||
         keepAliveTime < 0)
         throw new IllegalArgumentException();
      if (workQueue == null || threadFactory == null || handler == null)
         throw new NullPointerException();
      this.acc = System.getSecurityManager() == null ?
         null :
      AccessController.getContext();
      this.corePoolSize = corePoolSize;
      this.maximumPoolSize = maximumPoolSize;
      this.workQueue = workQueue;
      this.keepAliveTime = unit.toNanos(keepAliveTime);
      this.threadFactory = threadFactory;
      this.handler = handler;
      }

      原理解析

      阻塞线程

  1. ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
  2. LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吐量通常要高于ArrayBlockingQuene;
  3. SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
  4. priorityBlockingQuene:具有优先级的无界阻塞队列;

    饱和策略

  • AbortPolicy:直接抛出异常,默认策略;
  • DiscardPolicy:用调用者所在的线程来执行任务;
  • CallerRunsPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • DiscardOldestPolicy:直接丢弃任务
  • 自定义饱和策略:实现RejectedExecutionHandler接口(记录日志或持久化数据任务)

    方法分析

    额外的方法

    public long getTaskCount() //线程池已执行与未执行的任务总数
    public long getCompletedTaskCount() //已完成的任务数
    public int getPoolSize() //线程池当前的线程数
    public int getActiveCount() //线程池中正在执行任务的线程数量

    execute()方法

    ```java

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// clt记录着runState和workerCount
int c = ctl.get();
// workerCountOf方法取出低29位的值,表示当前活动的线程数;
// 判断当前的线程数是否小于corePoolSize
// 如果是,使用入参线程去addWorker方法结束
/*

* addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
* 如果为true,根据corePoolSize来判断;
* 如果为false,则根据maximumPoolSize来判断
*/
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
// 如果大于或等于corePollSize
// 判断当前线程是否在运行并且添加队列成功
if (isRunning(c) && workQueue.offer(command)) {
    // 重新获取ctl值
    int recheck = ctl.get();
    // 判断是否运行状态,不是运行状态
    // 第一步移除队列里面的元素
    // 拒绝策略
    if (! isRunning(recheck) && remove(command))
        reject(command);
    // 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
    else if (workerCountOf(recheck) == 0)
        //   1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
        // 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPolSize,添加线程时根据maximumPoolSize来判断;
        // 如果判断workerCount大于0,则直接返回,在workQueue中新增的comman 会在将来的某个时刻被执行。
        addWorker(null, false);
}
/*
* 如果执行到这里,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
    reject(command);

}

execute的执行流程
![image.png](https://cdn.nlark.com/yuque/0/2022/png/1257745/1659702496639-b73b126d-b72b-4a90-b028-7648248ec9e1.png#clientId=u174dc389-8141-4&crop=0&crop=0&crop=1&crop=1&from=paste&height=470&id=ua86b7e9b&margin=%5Bobject%20Object%5D&name=image.png&originHeight=587&originWidth=1236&originalType=binary&ratio=1&rotation=0&showTitle=false&size=94526&status=done&style=none&taskId=u50cd5812-7bea-49c0-beab-3614a91bf56&title=&width=988.8)
#### addWorker()方法
```java
private boolean addWorker(Runnable firstTask, boolean core) {
    // goto写法 用于重试
    retry:
    for (;;) {
        // 获取运行状态
        int c = ctl.get();
        int rs = runStateOf(c);

        /**
         * rs:表示线程的运行状态
         * rs >= SHUTDOWN : 表示不在接受新任务
         * 接着判断以下3个条件,只要有1个不满足,则返回false
         * 1. rs处于关闭状态,只会执行已添加的任务
         * 2. firstTask为空
         * 3. 阻塞队列不为空
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 自旋
        for (;;) {
            // 获取线程数
            int wc = workerCountOf(c);
            // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1), 返回false;
            // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,如果为false则根据maximumPoolSize来比较。
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 尝试增加workerCount,如果成功,则跳出第一个for循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 39    // 如果增加workerCount失败,则重新获取ctl的值
            c = ctl.get();  // Re-read ctl
               // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建worker对象 
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 获取线程状态
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN 表示运行状态
                // rs为RUNNING或SHUTDOWN状态并且firstTask为null,向线程池里面添加线程
                // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize记录着线程池中出现过的最大线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 失败回退 从wokers移除w 线程数减1 尝试结束线程池
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。

Worker类

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable  {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        // 正在运行woker线程
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        // 传入的任务
        Runnable firstTask;
        /** Per-thread task counter */
        // 完成的任务数 监控用
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            // 禁止线程中断
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象。Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时通过ThreadFactory来创建的线程,是用来处理任务的线程。
在调用构造方法时 ,需要把任务传入 ,这里通过getThreadFactory().newThread(this); 来新建一个线程, newThread方法传入的参数是this,因为Worker本身继承了Runnable接口,也就是一个线程,所以一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  2. 如果正在执行任务,则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务, 这时可以对该线程进行中断;
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

所以,Worker继承自AQS,用于判断线程是否空闲以及是否可以被中断。
此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为什么这么做呢? 是因为AQS中默认的state是0,如果刚创建了一个Worker对象,还没有执行任务时,这时就不应该被中断,看一下tryAquire方法:

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

tryAcquire方法是根据state是否是0来判断的,所以,setState(-1);将state设置为-1是为了禁止在执行任务前对线程进行中断。
正因为如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0。

runWorker()方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 获取第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 把state从-1改为0 意思是可以允许中断
    w.unlock(); // allow interrupts
    // 是否因为异常退出循环
    boolean completedAbruptly = true;
    try {
        // 自旋,如果task不为空 或者 从队列里面key拿到任务
        // 这里就表明队列里面的优先级是最低的
        while (task != null || (task = getTask()) != null) {
            // 加锁操作
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果当前线程池状态等于stop 就中断
            // 如果线程池正在停止,那么要保证当前线程是中断状态
            // 如果不是的话,则要保证当前线程不是中断状态;
            // Thread.interrupted()方法会复位中断的状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 这设置为空 等下次循环就会从队列里面获取
                task = null;
                // 完成任务数加一
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

执行过程:

  1. while循环如果task为空,就会不断的getTask获取任务
  2. getTask从队列中取出任务。
  3. 如果线程是中断就保持中断,不是就保持运行。
  4. 调用 task.run() 执行任务。
  5. 如果task为null则跳出循环,执行processWorkerExit()方法;
  6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
在执行任务之前和执行任务之后可以做一些事情。
completedAbruptly 变 量 来 表 示 在 执 行 任 务 过 程 中 是 否 出 现 了 异 常 , 在processWorkerExit方法中会对该变量的值进行判断。

getTask()方法

private Runnable getTask() {
    // imeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        /*
         * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
         * 1. rs >= STOP,线程池是否正在stop;
         * 2. 阻塞队列是否为空。
         * 如果以上条件满足,则将workerCount减1并返回null。
         * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
         */
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        /*
         * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
         * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
         * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将 workerCount减1;
         * 如果减1失败,则返回重试。
         * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
         */
        // Are workers subject to culling?
        // 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /*
             * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
             * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
             */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

processWorkerExit()方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
       // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
    // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workeCount进行了减1操作,这里就不必再减了。
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,也就表示着从线程池中移除了一个工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根据线程池状态进行判断是否结束线程池
    tryTerminate();

    int c = ctl.get();
    /*
    * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
    * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
    * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
    */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束。

ScheduledThreadPoolExecutor类

schedule:延迟多长时间之后只执行一次;
scheduledAtFixedRate固定:延迟指定时间后执行一次,之后按照固定的时长周期执行;
scheduledWithFixedDelay非固定:延迟指定时间后执行一次,之后按照:上一次任务执行时长 + 周期的时长 的时间去周期执行;

方法分析

delayedExecute方法

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 如果线程池不是RUNNING状态,则使用拒绝策略把提交任务拒绝掉
    if (isShutdown())
        reject(task);
    else {
        //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            // 和ThreadPoolExecutor不一样,corePoolSize没有达到会增加Worker;
            // 增加Worker,确保提交的任务能够被执行
            ensurePrestart();
    }
}

offer方法

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            // 容量扩增50%。
            grow();
        size = i + 1;
        // 第一个元素,其实这里也可以统一进行sift-up操作,没必要特判。
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            // 插入堆尾。
            siftUp(i, e);
        }
        // 如果新加入的元素成为了堆顶,则原先的leader就无效了。
        if (queue[0] == e) {
            leader = null;
            // 由于原先leader已经无效被设置为null了,这里随便唤醒一个线程(未必是原先的leader)来取走堆顶任务。
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

siftUp方法

private void siftUp(int k, RunnableScheduledFuture<?> key) {
     // 找到父节点的索引
    while (k > 0) {
        // 获取父节点
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
         // 如果key节点的执行时间大于父节点的执行时间,不需要再排序了
        if (key.compareTo(e) >= 0)
            break;
        // 如果key.compareTo(e) < 0,说明key节点的执行时间小于父节点的执行时间,需要把父节点移到后面
        queue[k] = e;
        setIndex(e, k);
        // 设置索引为k
        k = parent;
    }
     // key设置为排序后的位置中
    queue[k] = key;
    setIndex(key, k);
}

run方法

public void run() {
    // 是否周期性,就是判断period是否为0。
    boolean periodic = isPeriodic();
    // 检查任务是否可以被执行。
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    // 如果非周期性任务直接调用run运行即可。
    else if (!periodic)
        ScheduledFutureTask.super.run();
    // 如果成功runAndRest,则设置下次运行时间并调用reExecutePeriodic。
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        // 需要重新将任务(outerTask)放到工作队列中。此方法源码会在后文介绍ScheduledThreadPoolExecutor本身API时提及。
        reExecutePeriodic(outerTask);
    }
}

时间计算模式

 private void setNextRunTime() {
     long p = period;
     if (p > 0)
         time += p;
     else
         time = triggerTime(-p);
 }


long triggerTime(long delay) {
    return now() +
        ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

发表评论

评论列表

  • 这篇文章还没有收到评论,赶紧来抢沙发吧~