Java线程池

Java线程池,第1张

线程池:控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动排在队列中的任务,
如果线程数量超过了最大数量,超出数量的线程进入队列,等待其他线程执行完毕,再从队列中取出任务执行。
优点:
(1)线程复用:避免了反复创建销毁线程的消耗,提高了线程的响应速度。
(2)控制最大并发数:避免并发数过高,服务崩溃。
(3)集中管理线程:使用线程池对线程进行统一管理和处理。

线程池里的7大参数:

        /**
         * 线程池的七大参数:
         * 1.corePoolSize:核心线程数
         * 2.maximumPoolSize:最大线程数
         * 3.keepaliveTime:时间大小
         * 4.unit:时间单位
         * 5.workQueue:任务队列
         * 6.threadFactory:线程工厂
         * 7.handler:拒绝策略
         */    
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

newFixedThreadPool(int)线程池:

package org.lee.study.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lee
 * @date 2022/4/20 15:19
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        /**
         * 1.newFixedThreadPool();线程池中有固定的线程数:
         * 通过源码得知,核心线程数和最大线程数都是固定的,适合执行长期固定任务。
         * 缺点:通过源码得知,等待队列大小为int的最大值,会造成任务堆积,不建议使用。
         */
        int threadPoolNumber = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        try {
            for (int i = 1; i <= threadPoolNumber; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}
//源码:传入的int值为核心线程数和最大线程数
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue());
    }


//任务队列的大小为Integer.MAX_VALUE
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
newSingleThreadExecutor()
package org.lee.study.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lee
 * @date 2022/4/20 15:19
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        /**
         * 2.newSingleThreadExecutor();线程池中只有一个线程。
         * 通过源码得知,核心线程数和最大线程数都是1。
         * 缺点:通过源码得知,等待队列大小为int的最大值,会造成任务堆积,不建议使用。
         */
        int threadPoolNumber = 10;
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        try {
            for (int i = 1; i <= threadPoolNumber; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue()));
    }

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
newCachedThreadPool
package org.lee.study.concurrent;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lee
 * @date 2022/4/20 15:19
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        /**
         * 3.newCachedThreadPool();线程池中的数量可以动态变化。
         * 通过源码得知,核心线程数是0,最大线程数是Integer.MAX_VALUE。
         * 缺点:通过源码得知,最大线程池数大小为int的最大值,会造成任务堆积,造成OOM,不建议使用。
         */
        int threadPoolNumber = 10;
        ExecutorService executorService = Executors.newCachedThreadPool();
        try {
            for (int i = 1; i <= threadPoolNumber; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}
此处略有不同的地方是,任务队列的数据类型是SynchronousQueue队列,源码默认是false,使用的是TransferStack;
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue());
    }


    /**
     * Creates a {@code SynchronousQueue} with nonfair access policy.
     *创建具有非公平访问策略的 SynchronousQueue。
     */
    public SynchronousQueue() {
        this(false);
    }

    /**
     * Creates a {@code SynchronousQueue} with the specified fairness policy.
     *
     * @param fair if true, waiting threads contend in FIFO order for
     *        access; otherwise the order is unspecified.
     */
    public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue() : new TransferStack();
    }


 以上3种创建线程池的方式仅供了解,实际开发中不建议使用,都有造成OOM的风险。实际开发中使用的自定义创建线程池的方式。

package org.lee.study.concurrent;

import java.util.concurrent.*;

/**
 * @author lee
 * @date 2022/4/20 15:19
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        int threadPoolNumber = 10;
        /**
         * 线程池的七大参数:
         * 1.corePoolSize:核心线程数
         * 2.maximumPoolSize:最大线程数
         * 3.keepaliveTime:时间大小
         * 4.unit:时间单位
         * 5.workQueue:任务队列
         * 6.threadFactory:线程工厂
         * 7.handler:拒绝策略
         */
        ExecutorService executorService = new ThreadPoolExecutor(2,
                5,
                2L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
        try {
            for (int i = 1; i <= threadPoolNumber; i++) {
                executorService.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}

通过源码了解线程池的工作模式

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

通过源码里面的注释可以得知:

1.核心线程数没满,新建线程执行任务。如果核心线程数满了,新任务将进入队列。

2.任务队列满了,将使用最大线程数。

3.最大线程数满了,将使用拒绝策略。

4.当一个线程无事可做并且当前线程数大于核心线程数,将根据最大的存活时间进行回收,所有线程池的任务完成后,线程池的大小会回缩到核心线程数的大小。

线程池的拒绝策略

/**
*通过源码可知,拒绝策略的其中一种实现了RejectedExecutionHandler类,这是其中一种直接抛出异常。
*/    
public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

 通过这个类得知,一共有以下4中拒绝策略。

 1.AbortPolicy拒绝策略:丢弃任务并直接抛出异常。

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

2.CallerRunsPolicy拒绝策略:如果任务被拒绝了,将由调用线程直接执行此任务。

举例:用main方法写了一个线程池,任务队列满了的情况下,将由main主线程来执行这个任务。


        /**
         * Executes task r in the caller's thread, unless the executor
         * has been shut down, in which case the task is discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }

3.DiscardOldestPolicy拒绝策略:如果线程池没有关闭,将队列第一个元素出队(抛弃),然后重新提交这个任务。

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }

        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

4.DiscardPolicy拒绝策略:Does nothing, which has the effect of discarding task r.

丢弃任务什么也不做。

    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }

        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/721947.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-25
下一篇 2022-04-25

发表评论

登录后才能评论

评论列表(0条)

保存