生产者消费者问题

生产者消费者问题,第1张

生产者消费者是极其经典的并发同步模型,描述了在共享固定大小缓冲区下,生产者生产一定量数据放入缓冲区,而消费者则从缓冲区消费取出一定量数据。

生产者消费者问题还有一个名字是有限缓冲问题。

生产者消费者更加强调了两边写缓冲线程的角色,而有限缓冲则将目光聚焦到中间的缓冲

semaphore 实现

semaphore(信号量)是一个变量或者抽象数据类型,被用于控制在并发系统临界区问题多个线程对公共资源的访问。

与自旋锁、栅栏一起作为同步手段。

一个普通的信号量是单纯取决于程序员定义条件而改变的变量。

可以将信号量看作是特定资源的数量记录,并耦合调整该记录的安全 *** 作。

生产者消费者的有限缓冲实际上就可以看作是特定资源,因此可以使用信号量来记录有限缓冲的数量。

以下是c++版本的实现,当然为了控制对buffer写 *** 作时可能碰到的多个生产者消费者同时取放数据导致的写冲突,也在其中加了互斥锁

#include 
#include 
#include 

std::counting_semaphore<N> number_of_queueing_portions{0};
std::counting_semaphore<N> number_of_empty_positions{N};
std::mutex buffer_manipulation;

void producer(){
    for(;;) {
        Portion portion = produce_next_portion();
        number_of_empty_positions.acquire();
        {
            std::lock_guard<std::mutex> g(buffer_manipulation);
            add_portion_to_buffer(portion);
        }
        number_of_queueing_portions.release();
    }
}

void consumer(){
    for(;;) {
        number_of_queueing_portions.acquire();
        Portion portion;
        {
            std::lock_guard<std::mutex> g(buffer_manipulation);
            portion = take_portion_from_buffer();
        }
        number_of_empty_positions.release();
        process_portion_taken(portion);
    }
}

int main(int argc, char const *argv[])
{
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join();
    t2.join();
    return 0;
}
monitor实现

monitor(管程)是允许线程具有互斥、等待(堵塞)某个条件为false的能力的抽象数据结构。

还具有通知其他线程他们特定条件已经满足的机制,以及让他们暂时放弃独占访问,以便等待某些条件满足,然后重新获取独占访问并恢复他们的任务。

管程由互斥锁以及特定条件变量组成。

条件变量本质上是等待特定条件的线程的容器

管程和信号量一个明显的区别就是信号量是对共享资源数量的记录,wait(),notify()是仅有能修改共享资源数量的记录的方法,并且这种修改是互斥的

管程则是条件变量(等待满足特定条件线程容器)以及 *** 作该变量的程序

class Bounded_buffer{
    Portion buffer[N];
    unsigned head, tail;
    unsigned count;
    std::condition_variable nonempty, nonfull;
    std::mutex mtx;

public:
    void append(Portion x){
        std::unique_lock<std::mutex> lck(mtx);
        nonfull.wait(lck, [&]{return !(N==count);});
        assert(0<=count && count < N)
        buffer[tail++]=x;
        tail%=N;
        ++count;
        nonempty.notify_one();
    }

    Portion remove() {
        std::unique_lock<std::mutex> lck(mtx);
        nonempty.wait(lck,[&]{return !(0==count);});
        assert(0 < count && count <= N)
        Portion x = buffer[head++];
        head %= N;
        --count;
        nonfull.notify_one();
        return x;
    }

    Bounded_buffer() {
        head = 0; tail = 0; count = 0;
    }
}
channel实现

channel是一种通过消息传递实现进程间通信和同步的模型。

消息可以通过通道发送,而另一个进程或线程能够接收通过它引用channel发送的消息,比如流。

通道不同实现可以被缓冲,也可以不被缓冲,可以是异步或者同步的。

channel实现在golang中是极为优雅的

var element = 0

func produce() int {
	element++
	return element
}

func consume(e int) {
	// consume element
}

const (
	producerCount = 2
	consumerCount = 2
	bufferSize = 1
)

func main() {
	ch:=make(chan int, bufferSize)
	for i := 0; i < producerCount; i++ {
		go func() {
			ch<-produce()
		}()
	}

	for j := 0; j < consumerCount; j++ {
		go func() {
			consume(<-ch)
		}()
	}
  
  // assume main will wait other goroutines
}
无semaphore、monitors实现

在单生产者和消费者的时候,可以定义一个容量为b(b>=1)的buffer。

使k为大于b的常数并为b的倍数,s、r为0到k-1之间的整数。

在初始化时s=r且buffer为空。

生产者放入消息到buffer[s mod b],消费者取出消息buffer[r mod b]

生产者消费者最重要的是互斥有限缓冲的写 *** 作以及同步生产者消费者。

在原子 *** 作下的buffer中同一个位置不可能同时存在有资源以及没有资源两种情况,因此无需互斥。

单生产者消费者,仅仅只需要考虑生产者和消费者之间同步,而忙等待判断是否为空或者超出容量做到了这点

当然还要考虑到在调度器切换时,可能一个线程读取了变量值,切换到第二个线程更改了该值,再切回来,那么第一个线程将使用旧值,而不是当前值,因此需要通过原子 *** 作来解决这个问题

enum {
    N = 4
};
Message buffer[N];
std::atomic<unsigned> count{0};

void producer() {
    unsigned tail{0};
    for (;;) {
        Message msg = produceMessage();
        while (N == count);
        buffer[tail++] = msg;
        tail%=N;
        count.fetch_add(1,std::memory_order_relaxed);
    }
}

void consumer(){
    unsigned head {0};
    for(;;){
        while (count==0);
        Message msg = buffer[head++];
        head%=N;
        count.fetch_sub(1,std::memory_order_relaxed);
        consumeMessage(msg);
    }
}

注意代码中while的使用,这是为了防止一些竞争情况的出现。

比如某消费者在数据放入缓冲区时被唤醒,但另一个消费者在管程上等待了一段时间并移除了该数据。

所以如果是if,可能会出现放入缓冲区的数据项过多,或移除空缓冲区中的元素的情况。

Ref
  1. https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
  2. https://en.wikipedia.org/wiki/Monitor_(synchronization)
  3. https://en.wikipedia.org/wiki/Semaphore
  4. https://en.wikipedia.org/wiki/Channel_(programming)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/674692.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-19
下一篇 2022-04-19

发表评论

登录后才能评论

评论列表(0条)

保存