public class Demo { public static void main(String[] args) { Data data=new Data(); //两个生产者线程 for (int i = 0; i < 2; i++) { new Thread(()->{ try { data.increment(); } catch (InterruptedException e) { e.printStackTrace(); } },"生产者"+String.valueOf(i)).start(); } //两个消费者线程 for (int i = 2; i < 4; i++) { new Thread(()->{ try { data.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } },"消费者"+String.valueOf(i)).start(); } } } class Data{ //共享资源 private int number=0; Lock lock=new ReentrantLock(); Condition condition= lock.newCondition(); public void increment() throws InterruptedException { //上锁 lock.lock(); try { //循环判断条件,不用if避免虚假唤醒 while (number!=0){ //不满足条件就等待 condition.await(); } // *** 作 生产 number++; System.out.println(Thread.currentThread().getName()+"线程t"+number); //通知唤醒等待的消费者线程 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } //解锁 lock.unlock(); } public void decrement() throws InterruptedException { //上锁 lock.lock(); try { while (number==0){ condition.await(); } number--; System.out.println(Thread.currentThread().getName()+"线程t"+number); condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } lock.unlock(); } }2.阻塞队列实现
public class 阻塞队列版 { public static void main(String[] args) { MyResource myResource=new MyResource(new ArrayBlockingQueue(10)); new Thread(()->{ System.out.println("生产线程启动"); try { myResource.myProd(); } catch (InterruptedException e) { e.printStackTrace(); } },"Prod").start(); new Thread(()->{ System.out.println("消费线程启动"); try { myResource.myConsumer(); } catch (InterruptedException e) { e.printStackTrace(); } },"Consumer").start(); try { Thread.sleep(5000); System.out.println(); System.out.println(); System.out.println(); System.out.println(); myResource.stop(); System.out.println("5秒时间到,老板main线程叫停"); } catch (InterruptedException e) { } } } class MyResource{ //默认开启,开始生产消费 private volatile boolean FLAG=true; private AtomicInteger atomicInteger=new AtomicInteger(); BlockingQueueblockingQueue=null; public MyResource(BlockingQueue blockingQueue1){ blockingQueue=blockingQueue1; System.out.println(blockingQueue1.getClass().getName()); } //生产方法 public void myProd() throws InterruptedException { String data=null; boolean result; while (FLAG){ data=atomicInteger.incrementAndGet()+""; result=blockingQueue.offer(data,2L, TimeUnit.SECONDS); if(result){ System.out.println(Thread.currentThread().getName()+"t 插入队列"+data+"成功"); }else{ System.out.println(Thread.currentThread().getName()+"t 插入队列"+data+"失败"); } TimeUnit.SECONDS.sleep(1); } System.out.println("生产结束"); } //消费方法 public void myConsumer() throws InterruptedException { String rs=null; while (FLAG){ rs = blockingQueue.poll(2L, TimeUnit.SECONDS); if (rs==null||rs.equalsIgnoreCase("")) { FLAG=false; System.out.println(Thread.currentThread().getName()+"超过两秒钟没取到商品,退出"); System.out.println(); System.out.println(); System.out.println(); return; // } System.out.println(Thread.currentThread().getName()+"t 消费队列"+rs+"成功"); } } public void stop(){ this.FLAG=false; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)