java阻塞队列 线程同步合作

java阻塞队列 线程同步合作,第1张

Queue接口与List Set同一级别 都是继承了Collection接口 LinkedList实现了Queue接口 Queue接口窄化了对LinkedList的方法的访问权限(即在方法中的参数类型如果是Queue时 就完全只能访问Queue接口所定义的方法了 而不能直接访问 LinkedList的非Queue的方法) 以使得只有恰当的方法才可以使用 BlockingQueue 继承了Queue接口

队列是一种数据结构.它有两个基本 *** 作 在队列尾部加人一个元素 和从队列头部移除一个元素就是说 队列以一种先进先出的方式管理数据 如果你试图向一个已经满了的阻塞队列中添加一个元素或者是从一个空的阻塞队列中移除一个元索 将导致线程阻塞.在多线程进行合作时 阻塞队列是很有用的工具 工作者线程可以定期地把中间结果存到阻塞队列中而其他工作者线线程把中间结果取出并在将来修改它们 队列会自动平衡负载 如果第一个线程集运行得比第二个慢 则第二个线程集在等待结果时就会阻塞 如果第一个线程集运行得快 那么它将等待第二个线程集赶上来 下表显示了jdk 中的阻塞队列的 *** 作

add        增加一个元索                     如果队列已春慎桥满 则抛出一个IIIegaISlabEepeplian异常

remove   移除并返回队列头部的元素    如果队列为空 则抛出一个NoSuchElementException异常

element  返回队列头部的元素             如果队列为空 则抛出一个NoSuchElementException异常

offer       添加一个元素并返回true       如果队列已满 则返回false

poll         移除并返问队列头部的元素    如果队列扒猛为空 则返回null

peek       返回队列头部的元素             如果队列为空 则返回null

put         添加一个元素                      如果队列满 则阻塞

take        移除并返回队列头部的元素     如果队列为空 则阻塞

remove element offer poll peek 其实是属于Queue接口

阻塞队列的 *** 作可以根据它们的响应方式分为以下三类 aad removee和element *** 作在你试图为一个已满的队列增加元素或从空队列取得元素时抛出异常 当然 在多线程程序中 队列在任何时间都可能变成满的或空的 所以你可能想使用offer poll peek方法 这些方法在无法完成任务时只是给出一个出错示而不会抛出异常

注意 poll和peek方法出错进返回null 因此 向队列中插入null值是不合法的

还有带超时的offer和poll方法变种 例如 下面的调用

boolean success = q offer(x TimeUnit MILLISECONDS)

尝试在 毫秒内向队列尾部插入一个元素 如果成功 立即返回true 否则 当到达超时进 返回false 同样地 调用

Object head = q poll( TimeUnit MILLISECONDS)

如果在 毫秒内成功地移除了队列头元素 则立即返回头元素 否则在到达超时时 返回null

最后 我们有阻塞 *** 作put和take put方法在队列满时阻塞 take方法在队列空时阻塞

ncurrent包提供了阻塞队列的 个变种 默认情况下 LinkedBlockingQueue的容量是没有上限的(说的不准确 在不指定时容量为Integer MAX_VALUE 不要然的话在put时怎么会受阻呢) 但是也可以选择指定其最大容量 它是基孝仔于链表的队列 此队列按 FIFO(先进先出)排序元素

ArrayBlockingQueue在构造时需要指定容量 并可以选择是否需要公平性 如果公平参数被设置true 等待时间最长的线程会优先得到处理(其实就是通过将ReentrantLock设置为true来达到这种公平性的 即等待时间最长的线程会先 *** 作) 通常 公平性会使你在性能上付出代价 只有在的确非常需要的时候再使用它 它是基于数组的阻塞循环队列 此队列按 FIFO(先进先出)原则对元素进行排序

PriorityBlockingQueue是一个带优先级的队列 而不是先进先出队列 元素按优先级顺序被移除 该队列也没有上限(看了一下源码 PriorityBlockingQueue是对PriorityQueue的再次包装 是基于堆数据结构的 而PriorityQueue是没有容量限制的 与ArrayList一样 所以在优先阻塞队列上put时是不会受阻的 虽然此队列逻辑上是无界的 但是由于资源被耗尽 所以试图执行添加 *** 作可能会导致 OutOfMemoryError) 但是如果队列为空 那么取元素的 *** 作take就会阻塞 所以它的检索 *** 作take是受阻的 另外 往入该队列中的元素要具有比较能力

最后 DelayQueue(基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列 只有在延迟期满时才能从中提取元素 该队列的头部是延迟期满后保存时间最长的 Delayed 元素 如果延迟都还没有期满 则队列没有头部 并且poll将返回null 当一个元素的 getDelay(TimeUnit NANOSECONDS) 方法返回一个小于或等于零的值时 则出现期满 poll就以移除这个元素了 此队列不允许使用 null 元素 下面是延迟接口

Java代码

public interface Delayed extends Comparable<Delayed>{

long getDelay(TimeUnit unit)

}

public interface Delayed extends Comparable<Delayed>{

long getDelay(TimeUnit unit)

}

放入DelayQueue的元素还将要实现pareTo方法 DelayQueue使用这个来为元素排序

下面的实例展示了如何使用阻塞队列来控制线程集 程序在一个目录及它的所有子目录下搜索所有文件 打印出包含指定关键字的文件列表 从下面实例可以看出 使用阻塞队列两个显著的好处就是 多线程 *** 作共同的队列时不需要额外的同步 另外就是队列会自动平衡负载 即那边(生产与消费两边)处理快了就会被阻塞掉 从而减少两边的处理速度差距 下面是具体实现

Java代码

public class BlockingQueueTest {

public static void main(String[] args) {

Scanner in = new Scanner(System in)

System out print( Enter base directory (e g /usr/local/jdk /src): )

String directory = in nextLine()

System out print( Enter keyword (e g volatile): )

String keyword = in nextLine()

final int FILE_QUEUE_SIZE = // 阻塞队列大小

final int SEARCH_THREADS = // 关键字搜索线程个数

// 基于ArrayBlockingQueue的阻塞队列

BlockingQueue<File>queue = new ArrayBlockingQueue<File>(

FILE_QUEUE_SIZE)

//只启动一个线程来搜索目录

FileEnumerationTask enumerator = new FileEnumerationTask(queue

new File(directory))

new Thread(enumerator) start()

//启动 个线程用来在文件中搜索指定的关键字

for (int i = i <= SEARCH_THREADSi++)

new Thread(new SearchTask(queue keyword)) start()

}

}

class FileEnumerationTask implements Runnable {

//哑元文件对象 放在阻塞队列最后 用来标示文件已被遍历完

public static File DUMMY = new File( )

private BlockingQueue<File>queue

private File startingDirectory

public FileEnumerationTask(BlockingQueue<File>queue File startingDirectory) {

this queue = queue

this startingDirectory = startingDirectory

}

public void run() {

try {

enumerate(startingDirectory)

queue put(DUMMY)//执行到这里说明指定的目录下文件已被遍历完

} catch (InterruptedException e) {

}

}

// 将指定目录下的所有文件以File对象的形式放入阻塞队列中

public void enumerate(File directory) throws InterruptedException {

File[] files = directory listFiles()

for (File file : files) {

if (file isDirectory())

enumerate(file)

else

//将元素放入队尾 如果队列满 则阻塞

queue put(file)

}

}

}

class SearchTask implements Runnable {

private BlockingQueue<File>queue

private String keyword

public SearchTask(BlockingQueue<File>queue String keyword) {

this queue = queue

this keyword = keyword

}

public void run() {

try {

boolean done = false

while (!done) {

//取出队首元素 如果队列为空 则阻塞

File file = queue take()

if (file == FileEnumerationTask DUMMY) {

//取出来后重新放入 好让其他线程读到它时也很快的结束

queue put(file)

done = true

} else

search(file)

}

} catch (IOException e) {

e printStackTrace()

} catch (InterruptedException e) {

}

}

public void search(File file) throws IOException {

Scanner in = new Scanner(new FileInputStream(file))

int lineNumber =

while (in hasNextLine()) {

lineNumber++

String line = in nextLine()

if (ntains(keyword))

System out printf( %s:%d:%s%n file getPath() lineNumber

line)

}

in close()

}

lishixinzhi/Article/program/Java/hx/201311/26657

典型地,suspend() 和 resume() 被用在等待另一个线程产生的结果的情形:测试发现结果还没有产生后,让线程阻塞,另一个线程产生了结果后,调用 resume() 使其恢复。但suspend()方法很容易引起死锁问题,已经不推荐使用了。wait() 和 notify() 方法:两尘早穗个方法配套使用,wait() 使得线程进入阻塞状态,它有两种形式,一种允许 指定以毫秒为单位的一段时间作为参数,另一种没有参数,前者当对应的 notify() 被调用或者超出指定时间时线程重新进入可执行状态,后者则必须对应的 notify() 被调用。 初看起来它们与 suspend() 和 resume() 方法对没有什么分别,但是事实上它们是截然不同的。区别的核心在于,前面叙述的所有方法,阻塞时都不会释放占用的锁(如果占用了的话),派卜而这一对方法则相反。 上述的核心区别导致了一系列的细节上的区别。 首先,前面叙述的所有方法都隶属于 Thread 类,但是这一对却直接隶属于 Object 类,也就是说,所有对象都拥有这一对方法。初看起来这十分不可思议,但是实际上却是很自然的,因为这一对方法阻塞时要释放占用的锁,而锁是任何对象都具有的,调用任意对象的 wait() 方法导致线程阻塞,并且该对象上的锁被释放。而调用 任意对象的notify()方法则导致因调用该对象的 wait() 方法而阻塞的线程中随机选择的一个解除阻塞(但要等到获得锁后才真正可执行)。 其次,前面叙述的所有方法都可在任何位置调用,但是这一对方法却必须在 synchronized 方法或块中调用,理由也很简单,只有在 synchronized 方法或块中当前线程才占有锁,才有锁可以释放。同样的道理,调用这一对方法的对象上的锁必须为当前线程所拥有,这样才有锁可以释放。因此,这一对方法调用必须放置在这样的 synchronized 方法或块中,该方法或块的上锁对象就是调用这一对方法的对象。若不满足这一条件,则程序虽然仍能编译,但在运行时会出现IllegalMonitorStateException 异常。 wait() 和 notify() 方法的上述特性决定了它们经常和synchronized 方法或块一起使用,将它们和 *** 作系统的进程间通信机制作一个比较就会发现它们的相似性:synchronized方法或块提供了类似于 *** 作系统原语的功能,它们的执行不会受到多线程机制的干扰,而这一对方法则相当于 block 和wakeup 原语(这一对方法均声明为 synchronized)。它们的结合使得我们可以实现 *** 作系统上一系列精妙的进程间通信的算法(如信号量睁哗算法),并用于解决各种复杂的线程间通信问题。 关于 wait() 和 notify() 方法最后再说明两点: 第一:调用 notify() 方法导致解除阻塞的线程是从因调用该对象的 wait() 方法而阻塞的线程中随机选取的,我们无法预料哪一个线程将会被选择,所以编程时要特别小心,避免因这种不确定性而产生问题。 第二:除了 notify(),还有一个方法 notifyAll() 也可起到类似作用,唯一的区别在于,调用 notifyAll() 方法将把因调用该对象的 wait() 方法而阻塞的所有线程一次性全部解除阻塞。当然,只有获得锁的那一个线程才能进入可执行状态。 谈到阻塞,就不能不谈一谈死锁,略一分析就能发现,suspend() 方法和不指定超时期限的 wait() 方法的调用都可能产生死锁。遗憾的是,Java 并不在语言级别上支持死锁的避免,我们在编程中必须小心地避免死锁。 以上我们对 Java 中实现线程阻塞的各种方法作了一番分析,我们重点分析了 wait() 和 notify() 方法,因为它们的功能最强大,使用也最灵活,但是这也导致了它们的效率较低,较容易出错。实际使用中我们应该灵活使用各种方法,以便更好地达到我们的目的。


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/12313102.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-05-24
下一篇 2023-05-24

发表评论

登录后才能评论

评论列表(0条)

保存