指的是import java.util.concurrent包下的类,通常用于解决多线程协调问题
- lock和衍生的ReentrantLock
- 各种容器的安全类:CopyOnWriteArrayList,ConcurrentHashMap…
- 不安全集合转安全集合:Collections.synchronizedLis()
- …
- 如果生产者不为空,通知消费者消费
- 当商品被消费完了,通知消费者消费
- 生产者和消费者的拉锯
//生产者 class Producer { final Produce produce; public Producer(Produce produce) { this.produce = produce; } //生产 public void produce() { for (int i = 0; i < 100; i++) { produce.add(); } } } //消费者 class Consumer { final Produce produce; public Consumer(Produce produce) { this.produce = produce; } //消费 public void consume() { for (int i = 0; i < 1000; i++) { produce.dec(); } } } //产品 class Produce { QueueConditiondata = new linkedList<>(); int count = 0; public synchronized void add(){ while (data.size()>=10){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } data.offer(count); System.out.println("生产了产品:" + count++); notifyAll(); } public synchronized void dec(){ while (data.size()==0){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消费了产品:" + data.poll()); notifyAll(); } }
- 通常是通过lock.newCondition创建
//产品 class Produce { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); Queuedata = new linkedList<>(); int count = 0; public void add() { lock.lock(); try { while (data.size() >= 10) { try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } data.offer(count); System.out.println("生产了产品:" + count++); condition.signalAll(); } finally { lock.unlock(); } } public void dec() { lock.lock(); try { while (data.size() == 0) { try { condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消费了产品:" + data.poll()); condition.signalAll(); } finally { lock.unlock(); } } }
- 多个Condition可以实现精准唤醒
package com.hzy.juc.pc; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class C { public static void main(String[] args) { Data3 data3=new Data3(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printA(); } },"A").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printB(); } },"B").start(); new Thread(()->{ for (int i = 0; i < 10; i++) { data3.printC(); } },"C").start(); } } class Data3{ private Lock lock=new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private int number=1; //1a 2b 3c public void printA(){ lock.lock(); try { while (number!=1) condition1.await(); { } System.out.println(Thread.currentThread().getName()+"=>AAAAAA"); number=2; condition2.signal();//唤醒指定的人干活 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB(){ lock.lock(); try { while (number!=2) condition2.await(); { } System.out.println(Thread.currentThread().getName()+"=>BBBBBB"); number=3; condition3.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC(){ lock.lock(); try { while (number!=3) condition3.await(); { } System.out.println(Thread.currentThread().getName()+"=>CCCCCC"); number=1; condition1.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }计数器 CountDownLatch(减法计数器)
- 线程安全
- 多个线程countDown,为0await向下执行
public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { //总数是6,必须要执行任务的时候,再使用! CountDownLatch countDownLatch=new CountDownLatch(6); for (int i = 1; i <= 6; i++) { new Thread(()->{ System.out.println(Thread.currentThread().getName()+"go out"); countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await();//等待计数器归0然后再向下执行 System.out.println("Close Door"); } }CyclicBarrier(加法计数器)
public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{ System.out.println("召唤神龙成功!"); }); for (int i = 1; i <= 7 ; i++) { final int temp=i; new Thread(()->{ System.out.println(Thread.currentThread().getName()+"收集了"+temp+"颗龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }Semaphore(信号量)
多个信号量可以让多个线程交替使用
多个共享资源互斥的使用!并发限流,控制最大的线程数!
acquire();获取,假设已经满了,等待被释放为止!
release();释放,会将当前的信号量释放+1,然后唤醒等待的线程
//最多同时三个访问 Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { new Thread(()->{ try { //拿到许可证,若没有则等待 semaphore.acquire(); //一秒后释放许可证 System.out.println(Thread.currentThread().getName()+"执行中"); TimeUnit.SECONDS.sleep(1); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }读写锁
- 读是共享锁,写是排它锁
- 分为writeLock().lock()和readLock().lock();
private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();队列 阻塞队列
- FIFO(先入先出)
- 满了阻塞,空了阻塞
- 场景:并发处理,线程池
- add&remove
- offer&poll
ArrayBlockingQueue blockingQueue=new ArrayBlockingQueue(int num);同步队列
- 只能放一个,存进去必须等待取出来(交替打印)
public class SynchronousQueueDemo { public static void main(String[] args) { BlockingQueue线程池(重点)synchronousQueue= new SynchronousQueue(); new Thread(()->{ try { System.out.println(Thread.currentThread().getName()+"=>"+"put 1"); synchronousQueue.put("1"); System.out.println(Thread.currentThread().getName()+"=>"+"put 2"); synchronousQueue.put("2"); System.out.println(Thread.currentThread().getName()+"=>"+"put 3"); synchronousQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } },"T1").start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName()+"=>"+synchronousQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } },"T2").start(); } }
池化技术:事先准备好一些资源,用完之后还给我(不回收)
- 节省开辟线程的消耗
- 提高想要速度
- 并与管理
看阿里巴巴开发手册并发编程这块有一条:线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,通过源码分析禁用的原因。
阅读Executors源码,发现只不过是写好了参数。
-
newFixedThreadPool
作用:该方法返回一个固定线程数量的线程池,线程数量自定义。该方法创建的线程池最大线程数量等于核心线程数量。如果新提交的任务没有空闲的线程去处理,就会被放入阻塞队列中。
缺点:该线程池使用的阻塞队列是linkedBlockingQueue:链表阻塞队列,默认容量为Integer.MAX_VALUE,容量过大,可能会堆积大量的任务,从而造成OOM(内存溢出)
-
newSingleThreadExecutor
作用:该方法创建了只有一个线程的线程池,如果提交的任务没有空闲的线程去处理,就会被放入阻塞队列中
缺点:该线程池使用的阻塞队列是linkedBlockingQueue:链表阻塞队列,默认容量为Integer.MAX_VALUE,容量过大,可能会堆积大量的任务,从而造成OOM(内存溢出)
-
newCachedThreadPool
作用:该方法返回一个可根据实际需求调整线程数量的线程池。如果提交的任务没有空闲的线程处理,就会创建新的线程去处理该任务。如果有线程空闲时间超过60秒,就会被销毁
缺点:该线程池允许创建的最大线程数量为Integer.MAX_VALUE,可能会创建出大量线程,导致OOM(内存溢出)
-
newScheduleThreadPool
作用:该方法可以创建自定义核心线程容量的线程池,而且该线程池支持定时以及周期性的任务执行。
缺点:该线程池允许创建的最大线程数量为Integer.MAX_VALUE,可能会创建出大量线程,导致OOM(内存溢出)
FixedThreadPool是复用固定数量的线程处理一个共享的无边界队列
- 它是一种固定大小的线程池;
- corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
- keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
- 阻塞队列采用了linkedBlockingQueue,它是一个无界队列;
- 由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
- 由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
- 它只会创建一条工作线程处理任务;
- 采用的阻塞队列为linkedBlockingQueue;
- 它是一个可以无限扩大的线程池;
- 它比较适合处理执行时间比较小的任务;
- corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
- keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
- 采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
scheduledThreadPool.schedule(() -> System.out.println("hello"), int time, TimeUnit.MINUTES);:time分钟后打印hello
- 它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
- scheduledAtFixedRate
- scheduledWithFixedDelay
- SchduledFutureTask接收的参数:
- time:任务开始的时间
- sequenceNumber:任务的序号
- period:任务执行的时间间隔
- 它采用DelayQueue存储等待的任务
- DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
- DelayQueue也是一个无界队列;
- 工作线程的执行过程:
- 工作线程会从DelayQueue取已经到期的任务去执行;
- 执行结束后重新设置任务的到期时间,再次放回DelayQueue
ForkJoin在JDK1.7,并行执行任务!提高效率,大数据!(把大任务拆分成小任务)
public class Main { public static void main(String[] args) throws Exception { // 创建2000个随机数组成的数组: long[] array = new long[2000]; long expectedSum = 0; for (int i = 0; i < array.length; i++) { array[i] = random(); expectedSum += array[i]; } System.out.println("Expected sum: " + expectedSum); // fork/join: ForkJoinTaskFuturetask = new SumTask(array, 0, array.length); long startTime = System.currentTimeMillis(); Long result = ForkJoinPool.commonPool().invoke(task); long endTime = System.currentTimeMillis(); System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms."); } static Random random = new Random(0); static long random() { return random.nextInt(10000); } } class SumTask extends RecursiveTask { static final int THRESHOLD = 500; long[] array; int start; int end; SumTask(long[] array, int start, int end) { this.array = array; this.start = start; this.end = end; } @Override protected Long compute() { if (end - start <= THRESHOLD) { // 如果任务足够小,直接计算: long sum = 0; for (int i = start; i < end; i++) { sum += this.array[i]; // 故意放慢计算速度: try { Thread.sleep(1); } catch (InterruptedException e) { } } return sum; } // 任务太大,一分为二: int middle = (end + start) / 2; System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end)); SumTask subtask1 = new SumTask(this.array, start, middle); SumTask subtask2 = new SumTask(this.array, middle, end); invokeAll(subtask1, subtask2); Long subresult1 = subtask1.join(); Long subresult2 = subtask2.join(); Long result = subresult1 + subresult2; System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result); return result; } }
- 类似Ajax
- 对将来的某个事件的结果进行建模
CompletableFutureCompletableFuturecompletableFuture = CompletableFuture.supplyAsync(()->{ System.out.println(Thread.currentThread().getName()+"supplyAsync"+":Integer"); return 1024; }); System.out.println(completableFuture.whenComplete((t, u) -> { //t:正常的返回结果 //u:错误信息 System.out.println(t + "t" + u); }).exceptionally((e) -> { e.getMessage();//打印错误信息 return 233;// 可以获取到错误的返回结果 }).get());
Java8引入的,针对Future做了优化,可以传入回调对象,当任务结束时,会自动回调某个对象的方法,并且可以串行执行,使用静态方法即可创建一个异步任务:
CompletableFuturecompletableFuture1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello"); return "hello"; }); CompletableFuture completableFuture2 = CompletableFuture.supplyAsync(() -> { System.out.println("world"); return "world"; }); CompletableFuture completableFuture = CompletableFuture.allOf(completableFuture1, completableFuture2); completableFuture.thenAccept(c -> { System.out.println("任务都完成了"); });
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)