java并发编程(7) 共享模型之工具 - JDK中的线程池

java并发编程(7) 共享模型之工具 - JDK中的线程池,第1张

java并发编程(7) 共享模型之工具 - JDK中的线程

文章目录

前言1. ThreadPoolExecutor

1. 线程池状态2. 构造方法3. 工作方式4. 工厂方法

1. newFixedThreadPool2. newCachedThreadPool3. newSingleThreadExecutor4. 使用提供的线程池的弊端 5. 提交任务

1. submit2. invokeAll3. invokeAny 6. 关闭线程池

1. shutdown2. shutdownNow3. 其他方法 7. 异步模式之工作线程

1. 定义2. 饥饿3. 饥饿解决4. 创建多少线程合适

CPU 密集型运算I/O 密集型运算 8. 任务调度线程池

1. Timer 的缺点(定时任务)2. 使用 ScheduledExecutorService 改写 9. 异常处理

1. 方法1:主动捉异常(newScheduledThreadPool例子)2. 方法2:使用 Future(newFixedThreadPool例子) 10. 定时任务11. tomcat线程池 2. Fork/Join

1. 概念2. 使用3. 优化4. 文件拆分


前言

从(7)开始讨论一些工具,这篇文章讨论JDK中的线程池。文章根据《Java并发编程的艺术》这本书以及黑马的视频 黑马多线程 做的笔记。


1. ThreadPoolExecutor


1. 线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,这里的第一位是符号位这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子 *** 作 进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));

// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是使用或 *** 作合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }



2. 构造方法
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        this.ctl = new AtomicInteger(ctlOf(-536870912, 0));
        this.mainLock = new ReentrantLock();
        this.workers = new HashSet();
        this.termination = this.mainLock.newCondition();
        if (corePoolSize >= 0 && maximumPoolSize > 0 && maximumPoolSize >= corePoolSize && keepAliveTime >= 0L) {
            if (workQueue != null && threadFactory != null && handler != null) {
                this.corePoolSize = corePoolSize;
                this.maximumPoolSize = maximumPoolSize;
                this.workQueue = workQueue;
                this.keepAliveTime = unit.toNanos(keepAliveTime);
                this.threadFactory = threadFactory;
                this.handler = handler;
            } else {
                throw new NullPointerException();
            }
        } else {
            throw new IllegalArgumentException();
        }
    }

参数:这里的参数比我们自定义的线程池多了一个救急线程,救急线程是在当核心线程和阻塞队列都满了,就是用救急线程来进行处理,处理完之后就销毁

corePoolSize 核心线程数目 (最多保留的线程数)maximumPoolSize 最大线程数目,根据核心线程和和救急的线程数加起来得出的keepAliveTime 生存时间 - 针对救急线程。核心线程一直保留在线程池中被使用,救急线程过了生存时间就会被销毁unit 时间单位 - 针对救急线程workQueue 阻塞队列threadFactory 线程工厂 - 可以为线程创建时起个好名字handler 拒绝策略



3. 工作方式

    线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。

    当线程数达到 corePoolSize(核心) 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue(任务队列) 队列排 队,直到有空闲的线程。

    如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。(这里的救急线程数等于最大线程数 - 核心线程数)

    如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。这里的拒接策略是在救急线程也执行不完的情况下才拒绝的,和前面自定义的线程池不同。拒绝策略 jdk 提供了 4 种实现,其它著名框架也提供了实现

    AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略CallerRunsPolicy 让调用者运行任务(谁调用的谁来执行)DiscardPolicy 放弃本次任务DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之,感觉这一应该算是一个替换策略了Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题Netty 的实现,是创建一个新线程来执行任务ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

    当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。



4. 工厂方法

基于上面这个原理,JDK 的线程池提供了很多的工厂方法来创建各种用途的线程池,但是在真正要用到线程池的时候,建议自己根据需求来设置参数,这也是阿里巴巴开发手册中提到的。


1. newFixedThreadPool

里面的构造方法

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
   //参数中核心线程数 = 最大线程数
   return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new linkedBlockingQueue(),
                                  threadFactory);
}


从图中可以看到keepAliveTime为0,以及传入的核心线程数等于最大线程数,所以是没有紧急线程的,而且任务队列没有上限,总结以下特点:

核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间阻塞队列是无界的,可以放任意数量的任务适用于任务量已知,相对耗时的任务

下面来写下使用的测试代码:其中重写的 newThread 这个方法是用来给线程起名字的

@Slf4j
public class Test2 {
    public static void main(String[] args) {
        //核心线程数 = 最大线程数 = 2
        ExecutorService service = Executors.newFixedThreadPool(2, new ThreadFactory() {
            private AtomicInteger t = new AtomicInteger(1);
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "mypool_t-" + t.getAndIncrement());
            }
        });
        service.execute(()->{
            log.debug("1");
        });
        service.execute(()->{
            log.debug("2");
        });
        service.execute(()->{
            log.debug("3");
        });
//        DEBUG [mypool_t-1] (21:27:21,196) (Test2.java:30) - 1
//        DEBUG [mypool_t-2] (21:27:21,196) (Test2.java:33) - 2
//        DEBUG [mypool_t-1] (21:27:21,198) (Test2.java:36) - 3
    }
}



2. newCachedThreadPool
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
	   return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
	                                   60L, TimeUnit.SECONDS,
	                                   new SynchronousQueue(),
	                                   threadFactory);
	}

先来看构造方法里面的参数:

corePoolSize 核心线程数:0maximumPoolSize 最大线程数:Integer 的最大值keepAliveTime 救急线程的生存时间:60L时间单位:S任务队列:SynchronousQueue线程工厂没有传入的参数:拒绝策略,使用默认的 AbortPolicy

总结上面的参数,可以看到以下特点:

核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着

全部都是救急线程(60s 后可以回收)救急线程可以无限创建

队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)


下面来看一个例子,测试 SynchronousQueue 这个队列的工作机制:

@Slf4j
public class Test3 {
    public static void main(String[] args) {
        SynchronousQueue integers = new SynchronousQueue<>();
        new Thread(() -> {
            try {
                log.debug("putting {} ", 1);
                integers.put(1);
                log.debug("{} putted...", 1);

                log.debug("putting...{} ", 2);
                integers.put(2);
                log.debug("{} putted...", 2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1").start();

        sleep.mySleep(1);

        new Thread(() -> {
            try {
                log.debug("taking {}", 1);
                integers.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2").start();

        sleep.mySleep(1);

        new Thread(() -> {
            try {
                log.debug("taking {}", 2);
                integers.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t3").start();
    }
    //19:10:02.917 [t1] DEBUG com.jianglianghao.HeiMaJUC.Unit8.Test3 - putting 1 
    //19:10:03.924 [t2] DEBUG com.jianglianghao.HeiMaJUC.Unit8.Test3 - taking 1
    //19:10:03.924 [t1] DEBUG com.jianglianghao.HeiMaJUC.Unit8.Test3 - 1 putted...
    //19:10:03.924 [t1] DEBUG com.jianglianghao.HeiMaJUC.Unit8.Test3 - putting...2 
    //19:10:04.938 [t3] DEBUG com.jianglianghao.HeiMaJUC.Unit8.Test3 - taking 2
    //19:10:04.939 [t1] DEBUG com.jianglianghao.HeiMaJUC.Unit8.Test3 - 2 putted...
}


测试结果如下,这里的线程 t1 负责放数据,t2 和 t3 负责取数,可以看出来,t1 调用 integers.put(1); 的时候,并没有立刻输出 1 putted..., 而是等了一秒线程 t2 来取数的时候才继续往下运行。


总结:整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况



3. newSingleThreadExecutor

单线程线程池

特点:

核心线程和最大线程都是 1没有救济线程使用的也是默认的拒绝策略:AbortPolicy

使用场景: 希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

区别:

自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作

Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改

FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法

Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改

对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改



4. 使用提供的线程池的弊端

对于 newFixedThreadPool 和 newSingleThreadExecutor,任务队列是无解的,所以对于大量的任务都会装到队列里面对于 newCachedThreadPool ,允许创建的最大线程数为 Integer.MAX_VALUE,所以就有可能导致线程过多创建的问题

实际上,阿里巴巴开发手册也给出了线程池要自己定义参数的需求,不要用自带的创建线程池的方法, 可以有效避免 OOM(内存溢出) 的问题,问题还是出自提供的线程池没有对数量进行一个约束。还有一点就是自定义线程池可以根据自己的实际情况,业务需求,电脑的性能来决定到底用多少线程。



5. 提交任务
// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果
 Future submit(Callable task);

// 提交 tasks 中所有任务
 List> invokeAll(Collection> tasks)
 throws InterruptedException;
 
// 提交 tasks 中所有任务,带超时时间
 List> invokeAll(Collection> tasks,
 long timeout, TimeUnit unit)
 throws InterruptedException;
 
// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
 T invokeAny(Collection> tasks)
 throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
 T invokeAny(Collection> tasks,
 long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;



1. submit

submit.get() 可以获取返回的结果

@Slf4j
public class Test4 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future submit = pool.submit(() -> {
            log.debug("running....");
            Thread.sleep(1000);
            return "ok";
        });
        //获取结果或者异常
        String s = submit.get();
        log.debug("{}", s);
        //DEBUG [pool-1-thread-1] (23:15:20,189) (Test4.java:21) - running....
        //DEBUG [main] (23:15:21,203) (Test4.java:28) - ok
    }
}



2. invokeAll

多个任务一起执行返回List对象

@Slf4j
public class Test4 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        //extracted(pool);

        List> futures = pool.invokeAll(
        //添加三个任务
			Arrays.asList(
                () -> {
                    log.debug("beagin");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("beagin");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("beagin");
                    Thread.sleep(2000);
                    return "3";
                }
        ));
        //遍历所有的返回结果,然后打印出来
        futures.forEach(
                f -> {
                    try {
                        log.debug("{}", f.get());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
        );
        //DEBUG [pool-1-thread-1] (23:29:48,670) (Test4.java:24) - beagin
        //DEBUG [pool-1-thread-2] (23:29:48,670) (Test4.java:29) - beagin
        //DEBUG [pool-1-thread-2] (23:29:49,172) (Test4.java:34) - beagin
        //DEBUG [main] (23:29:51,179) (Test4.java:42) - 1
        //DEBUG [main] (23:29:51,179) (Test4.java:42) - 2
        //DEBUG [main] (23:29:51,180) (Test4.java:42) - 3
    }

}



3. invokeAny

执行所有的任务,谁先执行完成就返回谁的结果,其他的全部取消

@Slf4j
public class Test4 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        String s = pool.invokeAny(Arrays.asList(
                () -> {
                    log.debug("beagin");
                    Thread.sleep(1000);
                    return "1";
                },
                () -> {
                    log.debug("beagin");
                    Thread.sleep(500);
                    return "2";
                },
                () -> {
                    log.debug("beagin");
                    Thread.sleep(2000);
                    return "3";
                }
        ));
        log.debug("{}", s);
        //DEBUG [pool-1-thread-1] (23:33:46,263) (Test4.java:22) - beagin
        //DEBUG [pool-1-thread-2] (23:33:46,263) (Test4.java:27) - beagin
        //DEBUG [pool-1-thread-2] (23:33:46,773) (Test4.java:32) - beagin
        //DEBUG [main] (23:33:46,774) (Test4.java:37) - 2
        //只返回了 2 的结果

    }
}



6. 关闭线程池 1. shutdown
void shutdown();

public void shutdown() {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
          checkShutdownAccess();
          // 修改线程池状态
          advanceRunState(SHUTDOWN);
          // 仅会打断空闲线程
          interruptIdleWorkers();
          onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
      } finally {
          mainLock.unlock();
      }
      // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
      tryTerminate();
  }

测试例子:

@Slf4j
public class TestShutDown {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        log.debug("提交任务1");
        Future result1 = pool.submit(() -> {
            log.debug("task 1 running...");
            Thread.sleep(1000);
            log.debug("task 1 finish...");
            return 1;
        });
        log.debug("提交任务2");
        Future result2 = pool.submit(() -> {
            log.debug("task 2 running...");
            Thread.sleep(1000);
            log.debug("task 2 finish...");
            return 2;
        });
        log.debug("提交任务3");
        Future result3 = pool.submit(() -> {
            log.debug("task 3 running...");
            Thread.sleep(1000);
            log.debug("task 3 finish...");
            return 3;
        });

        log.debug("shutdown");
        pool.shutdown();
    }
}

测试结果:可以看到任务 1、2、3都被提交之后再关闭,那么提交过的任务都会被执行,当然在 shutdown 之后再提交就会抛异常了。注意一点就是 shutdown 不会阻塞主线程的打印。

//这行代码起到延时的作用,也就是主线程shutdown之后等待线程池多久才继续往下执行
pool.awaitTermination(3, TimeUnit.SECONDS);



`

2. shutdownNow
List shutdownNow();

public List shutdownNow() {

       List tasks;
       final ReentrantLock mainLock = this.mainLock;
       mainLock.lock();
       try {
           checkShutdownAccess();
           // 修改线程池状态
           advanceRunState(STOP);
           // 打断所有线程
           interruptWorkers();
           // 获取队列中剩余任务
           tasks = drainQueue();
       } finally {
           mainLock.unlock();
       }
       // 尝试终结
       tryTerminate();
       return tasks;
   }

测试代码:

@Slf4j
public class TestShutDown {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        log.debug("提交任务1");
        Future result1 = pool.submit(() -> {
            log.debug("task 1 running...");
            Thread.sleep(1000);
            log.debug("task 1 finish...");
            return 1;
        });
        log.debug("提交任务2");
        Future result2 = pool.submit(() -> {
            log.debug("task 2 running...");
            Thread.sleep(1000);
            log.debug("task 2 finish...");
            return 2;
        });
        log.debug("提交任务3");
        Future result3 = pool.submit(() -> {
            log.debug("task 3 running...");
            Thread.sleep(1000);
            log.debug("task 3 finish...");
            return 3;
        });

        log.debug("shutdown");
        //pool.shutdown();
        //pool.awaitTermination(3, TimeUnit.SECONDS);
        List runnables = pool.shutdownNow();
        log.debug("other.... {}" , runnables);
    }
}


执行结果:可以看到由于线程池大小为2,所以,任务1和2可以被线程执行,但是任务3要加入任务队列中阻塞。此时调用 shutdownNow 就会导致任务3不会被执行,而是直接返回,其他正在执行的线程执行完成后返回。所以后面打印出返回的任务 3 了。



3. 其他方法
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();

// 线程池状态是否是 TERMINATED
boolean isTerminated();

// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
//情,可以利用此方法等待,简单来说就是如果想要让调用线程使用了shutdown 之后等待线
//程池执行完成后一段时间才继续向下运行,就可以用这个方法
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;



7. 异步模式之工作线程 1. 定义

让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。

例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那 么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message(来一个消息就创建一个新线程处理))

注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率

例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成 服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工



2. 饥饿

固定大小线程池会有饥饿现象

两个工人是同一个线程池中的两个线程

他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作

客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待后厨做菜:没啥说的,做就是了

但是现在同时来了两个客人,这时候工人 A 和 工人 B 都去处理点餐了,没人做饭,这时候就出问题了

针对上面的问题,我们可以用不同线程池来处理不同的问题,这样就可以避免了线程过多创建的问题,如果我们只用增加线程的数量,那么当客人达到成百上千的时候线程的数量就太多了,下面我们来测试一下这种现象:

这个程序中,我们定义一个大小为2的线程池,然后我们设计是一个线程处理点餐,一个线程处理做菜,但是现在两个线程都去点餐了:

@Slf4j
public class TestStarvation {
    static final List MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();

    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
        //大小为2的线程
        ExecutorService waiterPool = Executors.newFixedThreadPool(2);
		//一个线程执行点餐
        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future f = waiterPool.submit(() -> {
            	//这里也需要一个线程来做菜
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });

        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future f = waiterPool.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}

结果截图:



3. 饥饿解决

针对上面的问题,我们可以采用多个线程池的思路,不同的线程池对应不同的工作职责,这时候点菜和做菜就不会阻塞了。同时多个线程池也使得不必要创建太多的线程数。我们当然可以通过提高线程池的大小来解决问题,但是这样就线程数只会越来越多,造成不必要的麻烦。下面就是解决的方法:

@Slf4j
public class TestStarvation {
    static final List MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
    static Random RANDOM = new Random();

    static String cooking() {
        return MENU.get(RANDOM.nextInt(MENU.size()));
    }

    public static void main(String[] args) {
        //调用2个线程分别做不同的事
        ExecutorService waiterPool = Executors.newFixedThreadPool(1);
        ExecutorService cookPool = Executors.newFixedThreadPool(1);

        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future f = cookPool.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
        waiterPool.execute(() -> {
            log.debug("处理点餐...");
            Future f = cookPool.submit(() -> {
                log.debug("做菜");
                return cooking();
            });
            try {
                log.debug("上菜: {}", f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}


结果输出:



4. 创建多少线程合适

过小会导致程序不能充分地利用系统资源、容易导致饥饿过大会导致更多的线程上下文切换,占用更多内存



CPU 密集型运算

通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障( *** 作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费



I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O *** 作时、远程RPC 调用时,包括进行数据库 *** 作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下:
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,那么调用公式就是:4 * 100% * 100% / 50% = 8

例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,那么调用公式就是 4 * 100% * 100% / 10% = 40



8. 任务调度线程池 1. Timer 的缺点(定时任务)

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。

@Slf4j
public class TestTimer {
    public static void main(String[] args) {
        Timer timer = new Timer();
        TimerTask task1 = new TimerTask() {
            @Override
            public void run() {
                log.debug("task 1");
                sleep.mySleep(2); //任务1睡眠导致任务2也延迟相应的睡眠时间
                //int i = 1/0;  出异常导致任务2也不执行了
            }
        };
        TimerTask task2 = new TimerTask() {
            @Override
            public void run() {
                log.debug("task 2");
            }
        };

        log.debug("start..........");
        // 使用 timer 添加两个任务,希望它们都在 1s 后执行
       // 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行
        timer.schedule(task1, 1000);
        timer.schedule(task2, 1000);
    }
}


输出:

//从 1s后 开始执行
20:46:09.444 c.TestTimer [main] - start... 
//任务1先执行,睡眠2s
20:46:10.447 c.TestTimer [Timer-0] - task 1 
//此时再执行任务2,可以看到和任务1是相隔了2s的
20:46:12.448 c.TestTimer [Timer-0] - task 2



2. 使用 ScheduledExecutorService 改写

ScheduledExecutorService 里面的方法可以使得几个任务同时执行,任务不必一个一个执行,而这个类也有几种用法:


1、 普通使用

  ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
        pool.schedule(()->{
            log.debug("task 1");
            //sleep.mySleep(2);
            int i = 1/0;
        }, 1, TimeUnit.SECONDS);

        pool.schedule(()->{
            log.debug("task 2");
        }, 1, TimeUnit.SECONDS);
    }
    //DEBUG [pool-1-thread-1] (20:31:54,492) (TestTimer.java:25) - task 1
    //DEBUG [pool-1-thread-2] (20:31:54,492) (TestTimer.java:30) - task 2
    //同时执行,不会因为睡眠或者异常而影响其他任务的执行



2、定时执行 scheduleAtFixedRate
期间的sleep等延时 *** 作会延长定时 *** 作的时间间隔才可以往下执行

 ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        log.debug("start....");
        pool.scheduleAtFixedRate(()->{
            log.debug("running");
            //initalDelay:延迟多少秒开始工作
        }, 1, 1, TimeUnit.SECONDS);
        //DEBUG [main] (20:37:55,855) (TestTimer.java:24) - start....
        //DEBUG [pool-1-thread-1] (20:37:56,909) (TestTimer.java:26) - running
        //DEBUG [pool-1-thread-1] (20:37:57,917) (TestTimer.java:26) - running
        //DEBUG [pool-1-thread-1] (20:37:58,910) (TestTimer.java:26) - running
        //DEBUG [pool-1-thread-1] (20:37:59,907) (TestTimer.java:26) - running
        //DEBUG [pool-1-thread-1] (20:38:00,905) (TestTimer.java:26) - running
        //DEBUG [pool-1-thread-1] (20:38:01,904) (TestTimer.java:26) - running



3、延时执行 scheduleWithFixedDelay
scheduleWithFixedDelay中每个任务都要等待上一个任务执行完成之后才可以往下执行,用法:

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
	log.debug("start...");
	pool.scheduleWithFixedDelay(()-> {
	 log.debug("running...");
	 sleep(2);
}, 1, 1, TimeUnit.SECONDS);

输出:一开始 1s 后运行,然后由于每个线程都要睡眠2s,所以从第二个线程开始要等待 3s 才可以继续执行

21:40:55.078 c.TestTimer [main] - start... 
21:40:56.140 c.TestTimer [pool-1-thread-1] - running... 
21:40:59.143 c.TestTimer [pool-1-thread-1] - running... 
21:41:02.145 c.TestTimer [pool-1-thread-1] - running... 
21:41:05.147 c.TestTimer [pool-1-thread-1] - running...

最后,总结这个线程池: 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务



9. 异常处理 1. 方法1:主动捉异常(newScheduledThreadPool例子)
  ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
	   log.debug("start....");
	    //延时执行
	    pool.scheduleWithFixedDelay(()->{
	        log.debug("running");
	        try{
	        	//异常
	            int i = 1/0;
	        } catch (Exception e){
	        	//我们自己捕捉异常
	            log.debug("{}", e.getMessage());
	        }
	    }, 1, 1, TimeUnit.SECONDS);
	    //下面是输出结果,可以看到异常被打印出来了
	    //DEBUG [main] (20:45:37,593) (TestTimer.java:31) - start....
	    //DEBUG [pool-1-thread-1] (20:45:38,643) (TestTimer.java:34) - running
	    //DEBUG [pool-1-thread-1] (20:45:38,645) (TestTimer.java:38) - / by zero
	    //DEBUG [pool-1-thread-1] (20:45:39,661) (TestTimer.java:34) - running
	    //DEBUG [pool-1-thread-1] (20:45:39,662) (TestTimer.java:38) - / by zero



2. 方法2:使用 Future(newFixedThreadPool例子)
  ExecutorService pool = Executors.newFixedThreadPool(1);
	    Future submit = pool.submit(() -> {
	          log.debug("running....");
	          int i = 1 / 0;
	          return true;
	      });
	      //出异常了就会封装到ExecutorService对象中,调用get()方法就可以获取到了
	      log.debug("{}", submit.get());

		

结果输出:下面是捕获到的异常

21:54:58.208 c.TestTimer [pool-1-thread-1] - task1 
Exception in thread "main" java.util.concurrent.ExecutionException: 
java.lang.ArithmeticException: / by zero 
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
	at java.util.concurrent.FutureTask.get(FutureTask.java:192) 
	at cn.itcast.n8.TestTimer.main(TestTimer.java:31) 
Caused by: java.lang.ArithmeticException: / by zero 
	 at cn.itcast.n8.TestTimer.lambda$main
 public void execute(Runnable command, long timeout, TimeUnit unit) {
        submittedCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            if (super.getQueue() instanceof TaskQueue) {
                final TaskQueue queue = (TaskQueue)super.getQueue();
                try {
                    // 使任务从新进入阻塞队列
                    if (!queue.force(command, timeout, unit)) {
                        //如果还是失败才会抛出异常
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException("Queue capacity is full.");
                    }
                } catch (InterruptedException x) {
                    submittedCount.decrementAndGet();
                    Thread.interrupted();
                    throw new RejectedExecutionException(x);
                }
            } else {
                submittedCount.decrementAndGet();
                throw rx;
            }
        }
    }
(TestTimer.java:28) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

总结以下:建议自己捕获异常,可以自己在里面做一些自定义的处理



10. 定时任务

在 springboot 中可以使用 @Scheduled 这个注解来配置定时任务

@Slf4j
public class TestScheduled {
    //在周四18:00:00定时执行任务
    public static void main(String[] args) {
        //获取当前时间
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime time = now.withHour(18).withMinute(0).withSecond(0).withNano(0).with(DayOfWeek.THURSDAY);
        //一周时间
        long period = 1000 * 60 * 60 *  24 * 7;
        //判断如果当前时间大于本周周四,就推迟到下周执行
        if(now.compareTo(time) > 0){
            time = time.plusWeeks(1L);
        }
        //计算两个LocalDateTime时间的差值
        long initDelayTime = Duration.between(now, time).toMillis();
        //scheduleAtFixedRate(task,time,period)
        //task-所要安排的任务 time-首次执行任务的时间 period-执行一次task的时间间隔,单位毫秒
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        pool.scheduleAtFixedRate(()->{
            log.debug("running....");
        }, initDelayTime, period, TimeUnit.SECONDS);
    }
}



11. tomcat线程池

下面来看看 tomcat 在哪用到了线程池

LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲Acceptor 只负责【接收新的 socket 连接】Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理Executor 线程池中的工作线程最终负责【处理请求】



Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同

如果总线程数达到 maximumPoolSize(最大线程数)

这时不会立刻抛 RejectedExecutionException 异常,而是再次尝试将任务放入队列如果还失败,才抛出 RejectedExecutionException 异常

源码 tomcat-7.0.42

public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
    if ( parent.isShutdown() )
        throw new RejectedExecutionException(
        "Executor not running, can't force a command into the queue"
    );
    return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task  is rejected
   
}

TaskQueue.java

	//下面四个线程一个线程执行一步 *** 作
   DEBUG [ForkJoinPool-1-worker-1] (22:04:05,767) (TestForkJoin2.java:51) - fork() 5 + {4}
   DEBUG [ForkJoinPool-1-worker-2] (22:04:05,767) (TestForkJoin.java:48) - fork() 4 + {3}
    DEBUG [ForkJoinPool-1-worker-3] (22:04:05,767) (TestForkJoin.java:48) - fork() 3 + {2}
    DEBUG [ForkJoinPool-1-worker-0] (22:04:05,767) (TestForkJoin.java:48) - fork() 2 + {1}
	
	//线程3获取最下面的1
    DEBUG [ForkJoinPool-1-worker-3] (22:04:05,773) (TestForkJoin.java:42) - join() 1
    //线程0获取最下面的1交给线程0进行加法,依次类推
    DEBUG [ForkJoinPool-1-worker-0] (22:04:05,773) (TestForkJoin.java:50) - join() 2 + {1} = 3
    DEBUG [ForkJoinPool-1-worker-3] (22:04:05,773) (TestForkJoin.java:50) - join() 3 + {2} = 6
    DEBUG [ForkJoinPool-1-worker-2] (22:04:05,773) (TestForkJoin.java:50) - join() 4 + {3} = 10
    DEBUG [ForkJoinPool-1-worker-1] (22:04:05,773) (TestForkJoin2.java:55) - join() 5 + {4} = 15
    //15new MyTask(5)  5+ new MyTask(4)  4 + new MyTask(3)  3 + new MyTask(2)  2 + new MyTask(1)



连接Connector 配置:


Executor 线程配置:



2. Fork/Join 1. 概念

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算

所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率

Fork/Join 默认会创建与 cpu 核心数大小相同的线程池


2. 使用

提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务:使用递归 + 多线程的方式,每一步交给线程池去执行

public class TestForkJoin {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool(4);
        System.out.println(pool.invoke(new MyTask(5)));
        //任务拆分:MyTask(5) = 5 + MyTask(4) = 5 + 4 + MyTask(3) = ...
        //其实就是递归 + 多线程
        //15
    }
}

//如果是继承的RecursiveAction就是没有返回结果
//进行计算1-n之间的数字的和
class MyTask extends RecursiveTask {

    private int n;

    public MyTask(int n){
        this.n = n;
    }

    @Override
    protected Integer compute() {
        //终止条件
        if(n == 1){
            return 1;
        }
        MyTask t1 = new MyTask(n - 1);
        //让一个线程去执行这个任务
        t1.fork();
        //获取t1的结果,如果初始构造是MyTask(5),那么第一次调用result =5 + MyTask(4)
        int result = t1.join() + n;
        return result;
    }
}

带过程的代码:

public class TestForkJoin2 {

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool(4);
        System.out.println(pool.invoke(new MyTask(5)));
    }
}

// 1~n 之间整数的和
@Slf4j
class MyTask extends RecursiveTask {

    private int n;

    public MyTask(int n) {
        this.n = n;
    }

    @Override
    public String toString() {
        return "{" + n + '}';
    }

    @Override
    protected Integer compute() {
        // 如果 n 已经为 1,可以求得结果了
        if (n == 1) {
            log.debug("join() {}", n);
            return n;
        }

        // 将任务进行拆分(fork)
        AddTask1 t1 = new AddTask1(n - 1);
        t1.fork();
        log.debug("fork() {} + {}", n, t1);

        // 合并(join)结果
        int result = n + t1.join();
        log.debug("join() {} + {} = {}", n, t1, result);
        return result;
    }
}

输出结果:

DEBUG [ForkJoinPool-1-worker-0] (22:14:59,628) (TestForkJoin.java:118) - join() 1 + 2 = 3
DEBUG [ForkJoinPool-1-worker-3] (22:14:59,628) (TestForkJoin.java:118) - join() 4 + 5 = 9
DEBUG [ForkJoinPool-1-worker-2] (22:14:59,628) (TestForkJoin.java:127) - fork() {1,2} + {3,3} = ?
DEBUG [ForkJoinPool-1-worker-1] (22:14:59,628) (TestForkJoin.java:127) - fork() {1,3} + {4,5} = ?
DEBUG [ForkJoinPool-1-worker-0] (22:14:59,630) (TestForkJoin.java:114) - join() 3
DEBUG [ForkJoinPool-1-worker-2] (22:14:59,633) (TestForkJoin.java:130) - join() {1,2} + {3,3} = 6
DEBUG [ForkJoinPool-1-worker-1] (22:14:59,633) (TestForkJoin.java:130) - join() {1,3} + {4,5} = 15
15



3. 优化

当然,对于上面的代码我们还可以进一步优化,上面的方法任务都是相互依赖的,一个任务结果没有返回那么下一个任务是无法执行的,现在使用新的方法:使用二分配合多线程

@Slf4j
class AddTask3 extends RecursiveTask {

    int begin;
    int end;
	
    public AddTask3(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    public String toString() {
        return "{" + begin + "," + end + '}';
    }

    @Override
    protected Integer compute() {
    	//当begin 和 end 相同的时候直接返回就行了
        if (begin == end) {
            log.debug("join() {}", begin);
            return begin;
        }
        //如果是相邻的也没必要拆分了,因为此时求出来的中间值都是不变的
        //比如4和5,那么两个区间就是[4,4] 和 [5,5],这时候相加就是4+5
        if (end - begin == 1) {
            log.debug("join() {} + {} = {}", begin, end, end + begin);
            return end + begin;
        }
        //求出中间值
        int mid = (end + begin) / 2;
		//这里负责求 begin-mid
        AddTask3 t1 = new AddTask3(begin, mid);
        t1.fork();
        //这里负责求 mid+1 - end
        AddTask3 t2 = new AddTask3(mid + 1, end);
        t2.fork();
        log.debug("fork() {} + {} = ?", t1, t2);

        int result = t1.join() + t2.join();
        log.debug("join() {} + {} = {}", t1, t2, result);
        return result;
    }
}

输出结果:



4. 文件拆分

如果一个大文件就可以拆分成不同GB级别的文件流给不同线程处理,会更快。Stream并行流。





如有错误,欢迎指出!!!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5722525.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存