一聚教程网:一个值得你收藏的教程网站

热门教程

Java线程池重复利用空闲线程代码示例解析

时间:2022-06-29 02:20:52 编辑:袖梨 来源:一聚教程网

本篇文章小编给大家分享一下Java线程池重复利用空闲线程代码示例解析,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。

在Java开发中,经常需要创建线程去执行一些任务,实现起来也非常方便,但如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。此时,我们很自然会想到使用线程池来解决这个问题。

使用线程池的好处:

降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想一下如果我们有n多个子任务需要执行,如果我们为每个子任务都创建一个执行线程,而创建线程的过程是需要一定的系统消耗的,最后肯定会拖慢整个系统的处理速度。而通过线程池我们可以做到复用线程,任务有多个,但执行任务的线程可以通过线程池来复用,这样减少了创建线程的开销,系统资源利用率得到了提升。

降低管理线程的难度。多线程环境下对线程的管理是最容易出现问题的,而线程池通过框架为我们降低了管理线程的难度。我们不用再去担心何时该销毁线程,如何最大限度的避免多线程的资源竞争。这些事情线程池都帮我们代劳了。

提升任务处理速度。线程池中长期驻留了一定数量的活线程,当任务需要执行时,我们不必先去创建线程,线程池会自己选择利用现有的活线程来处理任务。

很显然,线程池一个很显著的特征就是“长期驻留了一定数量的活线程”,避免了频繁创建线程和销毁线程的开销,那么它是如何做到的呢?我们知道一个线程只要执行完了run()方法内的代码,这个线程的使命就完成了,等待它的就是销毁。既然这是个“活线程”,自然是不能很快就销毁的。为了搞清楚这个“活线程”是如何工作的,下面通过追踪源码来看看能不能解开这个疑问。

学习过线程池都知道,可以通过工厂类Executors来创个多种类型的线程池,部分类型如下:

public static ExecutorService newFixedThreadPool(int var0) {
    return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
    return new ScheduledThreadPoolExecutor(var0);
}

无论哪种类型的线程池,最终都是直接或者间接通过ThreadPoolExecutor这个类来实现的。而ThreadPoolExecutor的有多个构造方法,最终都是调用含有7个参数的构造函数。

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

① corePoolSize

顾名思义,其指代核心线程的数量。当提交一个任务到线程池时,线程池会创建一个核心线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建新的核心线程,而等到需要执行的任务数大于线程池核心线程的数量时就不再创建,这里也可以理解为当核心线程的数量等于线程池允许的核心线程最大数量的时候,如果有新任务来,就不会创建新的核心线程。

如果你想要提前创建并启动所有的核心线程,可以调用线程池的prestartAllCoreThreads()方法。

② maximumPoolSize

顾名思义,其指代线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。所以只有队列满了的时候,这个参数才有意义。因此当你使用了无界任务队列的时候,这个参数就没有效果了。

③ keepAliveTime

顾名思义,其指代线程活动保持时间,即当线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率,不然线程刚执行完一个任务,还没来得及处理下一个任务,线程就被终止,而需要线程的时候又再次创建,刚创建完不久执行任务后,没多少时间又终止,会导致资源浪费。

注意:这里指的是核心线程池以外的线程。还可以设置allowCoreThreadTimeout = true这样就会让核心线程池中的线程有了存活的时间。

④ TimeUnit

顾名思义,其指代线程活动保持时间的单位:可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。

⑤ workQueue

顾名思义,其指代任务队列:用来保存等待执行任务的阻塞队列。

⑥ threadFactory

顾名思义,其指代创建线程的工厂:可以通过线程工厂给每个创建出来的线程设置更加有意义的名字。

⑦ RejectedExecutionHandler

顾名思义,其指代拒绝执行程序,可以理解为饱和策略:当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了以下4种策略。

AbortPolicy:直接抛出异常RejectedExecutionException。

CallerRunsPolicy:只用调用者所在线程来运行任务,即由调用 execute方法的线程执行该任务。

DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

DiscardPolicy:不处理,丢弃掉,即丢弃且不抛出异常。

这7个参数共同决定了线程池执行一个任务的策略:

当一个任务被添加进线程池时:

线程数量未达到 corePoolSize,则新建一个线程(核心线程)执行任务

线程数量达到了 corePools,则将任务移入队列等待

队列已满,新建线程(非核心线程)执行任务

队列已满,总线程数又达到了 maximumPoolSize,就会由上面那位星期天(RejectedExecutionHandler)抛出异常

说白了就是先利用核心线程,核心线程用完,新来的就加入等待队列,一旦队列满了,那么只能开始非核心线程来执行了。

上面的策略,会在阅读代码的时候体现出来,并且在代码中也能窥探出真正复用空闲线程的实现原理。

接下来我们就从线程池执行任务的入口分析。

一个线程池可以接受任务类型有Runnable和Callable,分别对应了execute和submit方法。目前我们只分析execute的执行过程。

上源码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) { //第一步:如果线程数量小于核心线程数
        if (addWorker(command, true))//则启动一个核心线程执行任务
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {//第二步:当前线程数量大于等于核心线程数,加入任务队列,成功的话会进行二次检查
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);//启动非核心线程执行,注意这里任务是null,其实里面会去取任务队列里的任务执行
    }
    else if (!addWorker(command, false))//第三步:加入不了队列(即队列满了),尝试启动非核心线程
        reject(command);//如果启动不了非核心线程执行,说明到达了最大线程数量的限制,会使用第7个参数抛出异常
}

代码并不多,主要分三个步骤,其中有两个静态方法经常被用到,主要用来判断线程池的状态和有效线程数量:

// 获取运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 获取活动线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }

总结一下,execute的执行逻辑就是:

如果 当前活动线程数 < 指定的核心线程数,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于核心线程);

如果 当前活动线程数 >= 指定的核心线程数,且缓存队列未满,则将任务添加到缓存队列中;

如果 当前活动线程数 >= 指定的核心线程数,且缓存队列已满,则创建并启动一个线程来执行新提交的任务(此时新建的线程相当于非核心线程);

从代码中我们也可以看出,即便当前活动的线程有空闲的,只要这个活动的线程数量小于设定的核心线程数,那么依旧会启动一个新线程来执行任务。也就是说不会去复用任何线程。在execute方法里面我们没有看到线程复用的影子,那么我们继续来看看addWorker方法。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //前面都是线程池状态的判断,暂时不理会,主要看下面两个关键的地方
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask); // 新建一个Worker对象,这个对象包含了待执行的任务,并且新建一个线程
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start(); // 启动刚创建的worker对象里面的thread执行
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

方法虽然有点长,但是我们只考虑两个关键的地方,先是创建一个worker对象,创建成功后,对线程池状态判断成功后,就去执行该worker对象的thread的启动。也就是说在这个方法里面启动了一个关联到worker的线程,但是这个线程是如何执行我们传进来的runnable任务的呢?接下来看看这个Worker对象到底做了什么。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker. */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

最重要的构造方法:

Worker(Runnable firstTask) { // worker本身实现了Runnable接口
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask; // 持有外部传进来的runnable任务
        //创建了一个thread对象,并把自身这个runnable对象给了thread,一旦该thread执行start方法,就会执行worker的run方法
        this.thread = getThreadFactory().newThread(this); 
    }
在addWorker方法中执行的t.start会去执行worker的run方法:

public void run() {
        runWorker(this);
    }
run方法又执行了ThreadPoolExecutor的runWorker方法,把当前worker对象传入。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask; // 取出worker的runnable任务
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 循环不断的判断任务是否为空,当第一个判断为false的时候,即task为null,这个task啥时候为null呢?
        // 要么w.firstTask为null,还记得我们在execute方法第二步的时候,执行addWorker的时候传进来的runnable是null吗?
        // 要么是执行了一遍while循环,在下面的finally中执行了task=null;
        // 或者执行第二个判断,一旦不为空就会继续执行循环里的代码。
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); // 任务不为空,就会执行任务的run方法,也就是runnable的run方法
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null; // 执行完成置null,继续下一个循环
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

方法比较长,归纳起来就三步:

1,从worker中取出runnable(这个对象有可能是null,见注释中的解释);

2,进入while循环判断,判断当前worker中的runnable,或者通过getTask得到的runnable是否为空,不为空的情况下,就执行run;

3,执行完成把runnable任务置为null。

假如我们不考虑此方法里面的while循环的第二个判断,在我们的线程开启的时候,顺序执行了runWorker方法后,当前worker的run就执行完成了。

既然执行完了那么这个线程也就没用了,只有等待虚拟机销毁了。那么回顾一下我们的目标:Java线程池中的线程是如何被重复利用的?好像并没有重复利用啊,新建一个线程,执行一个任务,然后就结束了,销毁了。没什么特别的啊,难道有什么地方漏掉了,被忽略了?

仔细回顾下该方法中的while循环的第二个判断(task = getTask)!=null

玄机就在getTask方法中。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // timed变量用于判断是否需要进行超时控制。
        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程或者允许核心线程进行超时控制的时候,需要进行超时控制
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 如果需要进行超时控制,且上次从缓存队列中获取任务时发生了超时(timedOut开始为false,后面的循环末尾超时时会置为true)
        // 或者当前线程数量已经超过了最大线程数量,那么尝试将workerCount减1,即当前活动线程数减1,
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 如果减1成功,则返回null,这就意味着runWorker()方法中的while循环会被退出,其对应的线程就要销毁了,也就是线程池中少了一个线程了
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 注意workQueue中的poll()方法与take()方法的区别
            //poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长,取不到返回null
            //take方式取任务的特点是从缓存队列中取任务,若队列为空,则进入阻塞状态,直到能取出对象为止
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true; // 能走到这里说明已经超时了
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

注释已经很清楚了,getTask的作用就是,在当前线程中:

1,如果当前线程池线程数量大于核心线程数量或者设置了对核心线程进行超时控制的话(此时相当于对所有线程进行超时控制),就会去任务队列获取超时时间内的任务(队列的poll方法),获取到的话就会继续执行任务,也就是执行runWorker方法中的while循环里的任务的run方法,执行完成后,又继续进入getTask从任务队列中获取下一个任务。如果在超时时间内没有获取到任务,就会走到getTask的倒数第三行,设置timeOut标记为true,此时继续进入getTask的for循环中,由于超时了,那么就会进入尝试去去对线程数量-1操作,-1成功了,就直接返回一个null的任务,这样就回到了当前线程执行的runWorker方法中,该方法的while循环判断getTask为空,直接退出循环,这样当前线程就执行完成了,意味着要被销毁了,这样自然就会被回收器择时回收了。也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数(或者核心线程也允许超时)就会这样一个一个地销毁这些多余的线程。

2,如果当前活动线程数小于等于核心线程数(或者不允许核心线程超时),同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态(队列的take方法),直到能取出任务为止(也就是队列中被新添加了任务时),因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。

综上所述,线程之所以能达到复用,就是在当前线程执行的runWorker方法中有个while循环,while循环的第一个判断条件是执行当前线程关联的Worker对象中的任务,执行一轮后进入while循环的第二个判断条件getTask(),从任务队列中取任务,取这个任务的过程要么是一直阻塞的,要么是阻塞一定时间直到超时才结束的,超时到了的时候这个线程也就走到了生命的尽头。

然而在我们开始分析execute的时候,这个方法中的三个部分都会调用addWorker去执行任务,在addWorker方法中都会去新建一个线程来执行任务,这样的话是不是每次execute都是去创建线程了?事实上,复用机制跟线程池的阻塞队列有很大关系,我们可以看到,在execute在核心线程满了,但是队列不满的时候会把任务加入到队列中,一旦加入成功,之前被阻塞的线程就会被唤醒去执行新的任务,这样就不会重新创建线程了。

我们用个例子来看下:

假设我们有这么一个ThreadPoolExecutor,核心线程数设置为5(不允许核心线程超时),最大线程数设置为10,超时时间为20s,线程队列是LinkedBlockingDeque(相当于是个无界队列)。

当我们给这个线程池陆续添加任务,前5个任务执行的时候,会执行到我们之前分析的execute方法的第一步部分,会陆续创建5个线程做为核心线程执行任务,当前线程里面的5个关联的任务执行完成后,会进入各自的while循环的第二个判断getTask中去取队列中的任务,假设当前没有新的任务过来也就是没有执行execute方法,那么这5个线程就会在workQueue.take()处一直阻塞的。这个时候,我们执行execute加入一个任务,即第6个任务,这个时候会进入execute的第二部分,将任务加入到队列中,一旦加入队列,之前阻塞的5个线程其中一个就会被唤醒取出新加入的任务执行了。(这里有个execute的第二部分的后半段执行重复校验的代码即addWorker(传入null任务),目前还没搞明白是怎么回事)。

在我们这个例子中,由于队列是无界的,所以始终不会执行到execute的第三部分即启动非核心线程,假如我们设置队列为有界的,那么必然就会执行到这里了。

热门栏目