生产者消费者是极其经典的并发同步模型,描述了在共享固定大小缓冲区下,生产者生产一定量数据放入缓冲区,而消费者则从缓冲区消费取出一定量数据。
semaphore 实现生产者消费者问题还有一个名字是有限缓冲问题。
生产者消费者更加强调了两边写缓冲线程的角色,而有限缓冲则将目光聚焦到中间的缓冲
semaphore(信号量)是一个变量或者抽象数据类型,被用于控制在并发系统临界区问题多个线程对公共资源的访问。
与自旋锁、栅栏一起作为同步手段。
一个普通的信号量是单纯取决于程序员定义条件而改变的变量。
可以将信号量看作是特定资源的数量记录,并耦合调整该记录的安全 *** 作。
生产者消费者的有限缓冲实际上就可以看作是特定资源,因此可以使用信号量来记录有限缓冲的数量。
以下是c++版本的实现,当然为了控制对buffer写 *** 作时可能碰到的多个生产者消费者同时取放数据导致的写冲突,也在其中加了互斥锁
#include
#include
#include
std::counting_semaphore<N> number_of_queueing_portions{0};
std::counting_semaphore<N> number_of_empty_positions{N};
std::mutex buffer_manipulation;
void producer(){
for(;;) {
Portion portion = produce_next_portion();
number_of_empty_positions.acquire();
{
std::lock_guard<std::mutex> g(buffer_manipulation);
add_portion_to_buffer(portion);
}
number_of_queueing_portions.release();
}
}
void consumer(){
for(;;) {
number_of_queueing_portions.acquire();
Portion portion;
{
std::lock_guard<std::mutex> g(buffer_manipulation);
portion = take_portion_from_buffer();
}
number_of_empty_positions.release();
process_portion_taken(portion);
}
}
int main(int argc, char const *argv[])
{
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
monitor实现
monitor(管程)是允许线程具有互斥、等待(堵塞)某个条件为false的能力的抽象数据结构。
还具有通知其他线程他们特定条件已经满足的机制,以及让他们暂时放弃独占访问,以便等待某些条件满足,然后重新获取独占访问并恢复他们的任务。
管程由互斥锁以及特定条件变量组成。
条件变量本质上是等待特定条件的线程的容器
管程和信号量一个明显的区别就是信号量是对共享资源数量的记录,wait(),notify()是仅有能修改共享资源数量的记录的方法,并且这种修改是互斥的
管程则是条件变量(等待满足特定条件线程容器)以及 *** 作该变量的程序
class Bounded_buffer{
Portion buffer[N];
unsigned head, tail;
unsigned count;
std::condition_variable nonempty, nonfull;
std::mutex mtx;
public:
void append(Portion x){
std::unique_lock<std::mutex> lck(mtx);
nonfull.wait(lck, [&]{return !(N==count);});
assert(0<=count && count < N)
buffer[tail++]=x;
tail%=N;
++count;
nonempty.notify_one();
}
Portion remove() {
std::unique_lock<std::mutex> lck(mtx);
nonempty.wait(lck,[&]{return !(0==count);});
assert(0 < count && count <= N)
Portion x = buffer[head++];
head %= N;
--count;
nonfull.notify_one();
return x;
}
Bounded_buffer() {
head = 0; tail = 0; count = 0;
}
}
channel实现
channel是一种通过消息传递实现进程间通信和同步的模型。
消息可以通过通道发送,而另一个进程或线程能够接收通过它引用channel发送的消息,比如流。
通道不同实现可以被缓冲,也可以不被缓冲,可以是异步或者同步的。
channel实现在golang中是极为优雅的
var element = 0
func produce() int {
element++
return element
}
func consume(e int) {
// consume element
}
const (
producerCount = 2
consumerCount = 2
bufferSize = 1
)
func main() {
ch:=make(chan int, bufferSize)
for i := 0; i < producerCount; i++ {
go func() {
ch<-produce()
}()
}
for j := 0; j < consumerCount; j++ {
go func() {
consume(<-ch)
}()
}
// assume main will wait other goroutines
}
无semaphore、monitors实现
在单生产者和消费者的时候,可以定义一个容量为b(b>=1)的buffer。
使k为大于b的常数并为b的倍数,s、r为0到k-1之间的整数。
在初始化时s=r且buffer为空。
生产者放入消息到buffer[s mod b],消费者取出消息buffer[r mod b]
生产者消费者最重要的是互斥有限缓冲的写 *** 作以及同步生产者消费者。
在原子 *** 作下的buffer中同一个位置不可能同时存在有资源以及没有资源两种情况,因此无需互斥。
单生产者消费者,仅仅只需要考虑生产者和消费者之间同步,而忙等待判断是否为空或者超出容量做到了这点
当然还要考虑到在调度器切换时,可能一个线程读取了变量值,切换到第二个线程更改了该值,再切回来,那么第一个线程将使用旧值,而不是当前值,因此需要通过原子 *** 作来解决这个问题
enum {
N = 4
};
Message buffer[N];
std::atomic<unsigned> count{0};
void producer() {
unsigned tail{0};
for (;;) {
Message msg = produceMessage();
while (N == count);
buffer[tail++] = msg;
tail%=N;
count.fetch_add(1,std::memory_order_relaxed);
}
}
void consumer(){
unsigned head {0};
for(;;){
while (count==0);
Message msg = buffer[head++];
head%=N;
count.fetch_sub(1,std::memory_order_relaxed);
consumeMessage(msg);
}
}
Ref注意代码中while的使用,这是为了防止一些竞争情况的出现。
比如某消费者在数据放入缓冲区时被唤醒,但另一个消费者在管程上等待了一段时间并移除了该数据。
所以如果是if,可能会出现放入缓冲区的数据项过多,或移除空缓冲区中的元素的情况。
- https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
- https://en.wikipedia.org/wiki/Monitor_(synchronization)
- https://en.wikipedia.org/wiki/Semaphore
- https://en.wikipedia.org/wiki/Channel_(programming)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)