标签: 多线程同步生产者与消费者写者与读者
目录(?)[+]
在了解了《同步与互斥的区别 》之后,我们来看看几个经典的线程同步的例子。相信通过具体场景可以让我们学会分析和解决这类线程同步的问题,以便以后应用在实际的项目中。
一、生产者-消费者问题
问题描述:
一组生产者进程和一组消费者进程共享一个初始为空、大小为 n 的缓冲区,只有缓冲区没满时,生产者才能把消息放入到缓冲区,否则必须等待;只有缓冲区不空时,消费者才能从中取出消息,否则必须等待。由于缓冲区是临界资源,它只允许一个生产者放入消息,或者一个消费者从中取出消息。
分析:
关系分析:生产者和消费者对缓冲区互斥访问是互斥关系,同时生产者和消费者又是一个相互协作的关系,只有生产者生产之后,消费者才能消费,它们也是同步关系。
整理思路:这里比较简单,只有生产者和消费者两个进程,且这两个进程存在着互斥关系和同步关系。那么需要解决的是互斥和同步的PV *** 作的位置。
信号量设置:信号量mutex作为互斥信号量,用于控制互斥访问缓冲池,初值为1;信号量full用于记录当前缓冲池中“满”缓冲区数,初值为 0;信号量empty用于记录当前缓冲池中“空”缓冲区数,初值为n。
代码示例:(semaphore类的封装见下文)
#include<iostream>
#include<unistd.h> // sleep
#include<pthread.h>
#include"semaphore.h"
using namespace std
#define N 5
semaphore mutex("/", 1) // 临界区互斥信号量
semaphore empty("/home", N) // 记录空缓冲区数,初值为N
semaphore full("/home/songlee",0)// 记录满缓冲区数,初值为0
int buffer[N]// 缓冲区,大小为N
int i=0
int j=0
void* producer(void* arg)
{
empty.P()// empty减1
mutex.P()
buffer[i] = 10 + rand() % 90
printf("Producer %d write Buffer[%d]: %d\n",arg,i+1,buffer[i])
i = (i+1) % N
mutex.V()
full.V() // full加1
}
void* consumer(void* arg)
{
full.P() // full减1
mutex.P()
printf(" \033[131m")
printf("Consumer %d read Buffer[%d]: %d\n",arg,j+1,buffer[j])
printf("\033[0m")
j = (j+1) % N
mutex.V()
empty.V()// empty加1
}
int main()
{
pthread_t id[10]
// 开10个生产者线程,10个消费者线程
for(int k=0k<10++k)
pthread_create(&id[k], NULL, producer, (void*)(k+1))
for(int k=0k<10++k)
pthread_create(&id[k], NULL, consumer, (void*)(k+1))
sleep(1)
return 0
}12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455561234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
编译运行输出结果:
Producer 1 write Buffer[1]: 83
Producer 2 write Buffer[2]: 26
Producer 3 write Buffer[3]: 37
Producer 5 write Buffer[4]: 35
Producer 4 write Buffer[5]: 33
Consumer 1 read Buffer[1]: 83
Producer 6 write Buffer[1]: 35
Consumer 2 read Buffer[2]: 26
Consumer 3 read Buffer[3]: 37
Consumer 4 read Buffer[4]: 35
Consumer 5 read Buffer[5]: 33
Consumer 6 read Buffer[1]: 35
Producer 7 write Buffer[2]: 56
Producer 8 write Buffer[3]: 22
Producer 10 write Buffer[4]: 79
Consumer 9 read Buffer[2]: 56
Consumer 10 read Buffer[3]: 22
Producer 9 write Buffer[5]: 11
Consumer 7 read Buffer[4]: 79
Consumer 8 read Buffer[5]: 1112345678910111213141516171819201234567891011121314151617181920
二、读者-写者问题
问题描述:
有读者和写者两组并发线程,共享一个文件,当两个或以上的读线程同时访问共享数据时不会产生副作用,但若某个写线程和其他线程(读线程或写线程)同时访问共享数据时则可能导致数据不一致的错误。因此要求:
允许多个读者可以同时对文件执行读 *** 作;
只允许一个写者往文件中写信息;
任一写者在完成写 *** 作之前不允许其他读者或写者工作;
写者执行写 *** 作前,应让已有的读者和写者全部退出。
分析:
关系分析:由题目分析可知,读者和写者是互斥的,写者和写者也是互斥的,而读者和读者不存在互斥问题。
整理思路:写者是比较简单的,它与任何线程互斥,用互斥信号量的 PV *** 作即可解决。读者的问题比较复杂,它必须实现与写者的互斥,多个读者还可以同时读。所以,在这里用到了一个计数器,用它来判断当前是否有读者读文件。当有读者的时候写者是无法写文件的,此时读者会一直占用文件,当没有读者的时候写者才可以写文件。同时,不同的读者对计数器的访问也应该是互斥的。
信号量设置:首先设置一个计数器count,用来记录当前的读者数量,初值为0;设置互斥信号量mutex,用于保护更新 count 变量时的互斥;设置互斥信号量rw用于保证读者和写者的互斥访问。
代码示例:
#include<iostream>
#include<unistd.h> // sleep
#include<pthread.h>
#include"semaphore.h"
using namespace std
int count = 0 // 记录当前的读者数量
semaphore mutex("/",1) // 用于保护更新count变量时的互斥
semaphore rw("/home",1)// 用于保证读者和写者的互斥
void* writer(void* arg)
{
rw.P() // 互斥访问共享文件
printf(" Writer %d start writing...\n", arg)
sleep(1)
printf(" Writer %d finish writing...\n", arg)
rw.V() // 释放共享文件
}
void* reader(void* arg)
{
mutex.P() // 互斥访问count变量
if(count == 0) // 当第一个读线程读文件时
rw.P() // 阻止写线程写
++count// 读者计数器加1
mutex.V() // 释放count变量
printf("Reader %d start reading...\n", arg)
sleep(1)
printf("Reader %d finish reading...\n", arg)
mutex.P() // 互斥访问count变量
--count// 读者计数器减1
if(count == 0) // 当最后一个读线程读完文件
rw.V() // 允许写线程写
mutex.V() // 释放count变量
}
int main()
{
pthread_t id[8]// 开6个读线程,2个写线程
pthread_create(&id[0], NULL, reader, (void*)1)
pthread_create(&id[1], NULL, reader, (void*)2)
pthread_create(&id[2], NULL, writer, (void*)1)
pthread_create(&id[3], NULL, writer, (void*)2)
pthread_create(&id[4], NULL, reader, (void*)3)
pthread_create(&id[5], NULL ,reader, (void*)4)
sleep(2)
pthread_create(&id[6], NULL, reader, (void*)5)
pthread_create(&id[7], NULL ,reader, (void*)6)
sleep(4)
return 0
}123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
编译运行的结果如下:
Reader 2 start reading...
Reader 1 start reading...
Reader 3 start reading...
Reader 4 start reading...
Reader 1 finish reading...
Reader 2 finish reading...
Reader 3 finish reading...
Reader 4 finish reading...
Writer 1 start writing...
Writer 1 finish writing...
Writer 2 start writing...
Writer 2 finish writing...
Reader 5 start reading...
Reader 6 start reading...
Reader 5 finish reading...
Reader 6 finish reading...1234567891011121314151612345678910111213141516
三、哲学家进餐问题
问题描述:
一张圆桌上坐着 5 名哲学家,桌子上每两个哲学家之间摆了一根筷子,桌子的中间是一碗米饭,如图所示:
哲学家们倾注毕生精力用于思考和进餐,哲学家在思考时,并不影响他人。只有当哲学家饥饿的时候,才试图拿起左、右两根筷子(一根一根拿起)。如果筷子已在他人手上,则需等待。饥饿的哲学家只有同时拿到了两根筷子才可以开始进餐,当进餐完毕后,放下筷子继续思考。
分析:
关系分析:5名哲学家与左右邻居对其中间筷子的访问是互斥关系。
整理思路:显然这里有 5 个线程,那么要如何让一个哲学家拿到左右两个筷子而不造成死锁或饥饿现象?解决方法有两个,一个是让他们同时拿两个筷子;二是对每个哲学家的动作制定规则,避免饥饿或死锁现象的发生。
信号量设置:定义互斥信号量数组chopstick[5] = {1,1,1,1,1}用于对 5 根筷子的互斥访问。
示例代码:
Linux内核设计与实现 十、内核同步方法
手把手教Linux驱动5-自旋锁、信号量、互斥体概述
== 基础概念: ==
并发 :多个执行单元同时进行或多个执行单元微观串行执行,宏观并行执行
竞态 :并发的执行单元对共享资源(硬件资源和软件上的全局变量)的访问而导致的竟态状态。
临界资源 :多个进程访问的资源
临界区 :多个进程访问的代码段
== 并发场合: ==
1、单CPU之间进程间的并发 :时间片轮转,调度进程。 A进程访问打印机,时间片用完,OS调度B进程访问打印机。
2、单cpu上进程和中断之间并发 :CPU必须停止当前进程的执行中断
3、多cpu之间
4、单CPU上中断之间的并发
== 使用偏向: ==
==信号量用于进程之间的同步,进程在信号量保护的临界区代码里面是可以睡眠的(需要进行进程调度),这是与自旋锁最大的区别。==
信号量又称为信号灯,它是用来协调不同进程间的数据对象的,而最主要的应用是共享内存方式的进程间通信。本质上,信号量是一个计数器,它用来记录对某个资源(如共享内存)的存取状况。它负责协调各个进程,以保证他们能够正确、合理的使用公共资源。它和spin lock最大的不同之处就是:无法获取信号量的进程可以睡眠,因此会导致系统调度。
1、==用于进程与进程之间的同步==
2、==允许多个进程进入临界区代码执行,临界区代码允许睡眠;==
3、信号量本质是==基于调度器的==,在UP和SMP下没有区别;进程获取不到信号量将陷入休眠,并让出CPU;
4、不支持进程和中断之间的同步
5、==进程调度也是会消耗系统资源的,如果一个int型共享变量就需要使用信号量,将极大的浪费系统资源==
6、信号量可以用于多个线程,用于资源的计数(有多种状态)
==信号量加锁以及解锁过程:==
sema_init(&sp->dead_sem, 0)/ 初始化 /
down(&sema)
临界区代码
up(&sema)
==信号量定义:==
==信号量初始化:==
==dowm函数实现:==
==up函数实现:==
信号量一般可以用来标记可用资源的个数。
举2个生活中的例子:
==dowm函数实现原理解析:==
(1)down
判断sem->count是否 >0,大于0则说明系统资源够用,分配一个给该进程,否则进入__down(sem)
(2)__down
调用__down_common(sem, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT)其中TASK_UNINTERRUPTIBLE=2代表进入睡眠,且不可以打断;MAX_SCHEDULE_TIMEOUT休眠最长LONG_MAX时间;
(3)list_add_tail(&waiter.list, &sem->wait_list)
把当前进程加入到sem->wait_list中;
(3)先解锁后加锁
进入__down_common前已经加锁了,先把解锁,调用schedule_timeout(timeout),当waiter.up=1后跳出for循环;退出函数之前再加锁;
Linux内核ARM构架中原子变量的底层实现研究
rk3288 原子 *** 作和原子位 *** 作
原子变量适用于只共享一个int型变量;
1、原子 *** 作是指不被打断的 *** 作,即它是最小的执行单位。
2、最简单的原子 *** 作就是一条条的汇编指令(不包括一些伪指令,伪指令会被汇编器解释成多条汇编指令)
==常见函数:==
==以atomic_inc为例介绍实现过程==
在Linux内核文件archarmincludeasmatomic.h中。 执行atomic_read、atomic_set这些 *** 作都只需要一条汇编指令,所以它们本身就是不可打断的。 需要特别研究的是atomic_inc、atomic_dec这类读出、修改、写回的函数。
所以atomic_add的原型是下面这个宏:
atomic_add等效于:
result(%0) tmp(%1) (v->counter)(%2) (&v->counter)(%3) i(%4)
注意:根据内联汇编的语法,result、tmp、&v->counter对应的数据都放在了寄存器中 *** 作。如果出现上下文切换,切换机制会做寄存器上下文保护。
(1)ldrex %0, [%3]
意思是将&v->counter指向的数据放入result中,并且(分别在Local monitor和Global monitor中)设置独占标志。
(2)add %0, %0, %4
result = result + i
(3)strex %1, %0, [%3]
意思是将result保存到&v->counter指向的内存中, 此时 Exclusive monitors会发挥作用,将保存是否成功的标志放入tmp中。
(4) teq %1, #0
测试strex是否成功(tmp == 0 ??)
(5)bne 1b
如果发现strex失败,从(1)再次执行。
Spinlock 是内核中提供的一种比较常见的锁机制,==自旋锁是“原地等待”的方式解决资源冲突的==,即,一个线程获取了一个自旋锁后,另外一个线程期望获取该自旋锁,获取不到,只能够原地“打转”(忙等待)。由于自旋锁的这个忙等待的特性,注定了它使用场景上的限制 —— 自旋锁不应该被长时间的持有(消耗 CPU 资源),一般应用在==中断上下文==。
1、spinlock是一种死等机制
2、信号量可以允许多个执行单元进入,spinlock不行,一次只能允许一个执行单元获取锁,并且进入临界区,其他执行单元都是在门口不断的死等
3、由于不休眠,因此spinlock可以应用在中断上下文中;
4、由于spinlock死等的特性,因此临界区执行代码尽可能的短;
==spinlock加锁以及解锁过程:==
spin_lock(&devices_lock)
临界区代码
spin_unlock(&devices_lock)
==spinlock初始化==
==进程和进程之间同步==
==本地软中断之间同步==
==本地硬中断之间同步==
==本地硬中断之间同步并且保存本地中断状态==
==尝试获取锁==
== arch_spinlock_t结构体定义如下: ==
== arch_spin_lock的实现如下: ==
lockval(%0) newval(%1) tmp(%2) &lock->slock(%3) 1 <<TICKET_SHIFT(%4)
(1)ldrex %0, [%3]
把lock->slock的值赋值给lockval;并且(分别在Local monitor和Global monitor中)设置独占标志。
(2)add %1, %0, %4
newval =lockval +(1<<16)相当于next+1;
(3)strex %2, %1, [%3]
newval =lockval +(1<<16)相当于next+1;
意思是将newval保存到 &lock->slock指向的内存中, 此时 Exclusive monitors会发挥作用,将保存是否成功的标志放入tmp中。
(4) teq %2, #0
测试strex是否成功
(5)bne 1b
如果发现strex失败,从(1)再次执行。
通过上面的分析,可知关键在于strex的 *** 作是否成功的判断上。而这个就归功于ARM的Exclusive monitors和ldrex/strex指令的机制。
(6)while (lockval.tickets.next != lockval.tickets.owner)
如何lockval.tickets的next和owner是否相等。相同则跳出while循环,否则在循环内等待判断;
* (7)wfe()和smp_mb() 最终调用#define barrier() asm volatile ("": : :"memory") *
阻止编译器重排,保证编译程序时在优化屏障之前的指令不会在优化屏障之后执行。
== arch_spin_unlock的实现如下: ==
退出锁时:tickets.owner++
== 出现死锁的情况: ==
1、拥有自旋锁的进程A在内核态阻塞了,内核调度B进程,碰巧B进程也要获得自旋锁,此时B只能自旋转。 而此时抢占已经关闭,(单核)不会调度A进程了,B永远自旋,产生死锁。
2、进程A拥有自旋锁,中断到来,CPU执行中断函数,中断处理函数,中断处理函数需要获得自旋锁,访问共享资源,此时无法获得锁,只能自旋,产生死锁。
== 如何避免死锁: ==
1、如果中断处理函数中也要获得自旋锁,那么驱动程序需要在拥有自旋锁时禁止中断;
2、自旋锁必须在可能的最短时间内拥有
3、避免某个获得锁的函数调用其他同样试图获取这个锁的函数,否则代码就会死锁;不论是信号量还是自旋锁,都不允许锁拥有者第二次获得这个锁,如果试图这么做,系统将挂起;
4、锁的顺序规则(a) 按同样的顺序获得锁;b) 如果必须获得一个局部锁和一个属于内核更中心位置的锁,则应该首先获取自己的局部锁 c) 如果我们拥有信号量和自旋锁的组合,则必须首先获得信号量;在拥有自旋锁时调用down(可导致休眠)是个严重的错误的;)
== rw(read/write)spinlock: ==
加锁逻辑:
1、假设临界区内没有任何的thread,这个时候任何的读线程和写线程都可以键入
2、假设临界区内有一个读线程,这时候信赖的read线程可以任意进入,但是写线程不能进入;
3、假设临界区有一个写线程,这时候任何的读、写线程都不可以进入;
4、假设临界区内有一个或者多个读线程,写线程不可以进入临界区,但是写线程也无法阻止后续的读线程继续进去,要等到临界区所有的读线程都结束了,才可以进入,可见:==rw(read/write)spinlock更加有利于读线程;==
== seqlock(顺序锁): ==
加锁逻辑:
1、假设临界区内没有任何的thread,这个时候任何的读线程和写线程都可以键入
2、假设临界区内没有写线程的情况下,read线程可以任意进入;
3、假设临界区有一个写线程,这时候任何的读、写线程都不可以进入;
4、假设临界区内只有read线程的情况下,写线程可以理解执行,不会等待,可见:==seqlock(顺序锁)更加有利于写线程;==
读写速度 : CPU >一级缓存 >二级缓存 >内存 ,因此某一个CPU0的lock修改了,其他的CPU的lock就会失效;那么其他CPU就会依次去L1 L2和主存中读取lock值,一旦其他CPU去读取了主存,就存在系统性能降低的风险;
mutex用于互斥 *** 作。
互斥体只能用于一个线程,资源只有两种状态(占用或者空闲)
1、mutex的语义相对于信号量要简单轻便一些,在锁争用激烈的测试场景下,mutex比信号量执行速度更快,可扩展
性更好,
2、另外mutex数据结构的定义比信号量小、
3、同一时刻只有一个线程可以持有mutex
4、不允许递归地加锁和解锁
5、当进程持有mutex时,进程不可以退出。
• mutex必须使用官方API来初始化。
• mutex可以睡眠,所以不允许在中断处理程序或者中断下半部中使用,例如tasklet、定时器等
==常见 *** 作:==
struct mutex mutex_1
mutex_init(&mutex_1)
mutex_lock(&mutex_1)
临界区代码;
mutex_unlock(&mutex_1)
==常见函数:==
=
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)