Linux 多线程编程(二)2019-08-10

Linux 多线程编程(二)2019-08-10,第1张

三种专门用于线程同步的机制:POSIX信号量,互斥量和条件变量.

在Linux上信号量API有两组,一组是System V IPC信号量,即PV *** 作,另外就是POSIX信号量,POSIX信号量的名字都是以sem_开头.

phshared参数指定信号量的类型,若其值为0,就表示这个信号量是当前进程的局部信号量,否则该信号量可以在多个进程之间共享.value值指定信号量的初始值,一般与下面的sem_wait函数相对应.

其中比较重要的函数sem_wait函数会以原子 *** 作的方式将信号量的值减一,如果信号量的值为零,则sem_wait将会阻塞,信号量的值可以在sem_init函数中的value初始化sem_trywait函数是sem_wait的非阻塞版本sem_post函数将以原子的 *** 作对信号量加一,当信号量的值大于0时,其他正在调用sem_wait等待信号量的线程将被唤醒.

这些函数成功时返回0,失败则返回-1并设置errno.

生产者消费者模型:

生产者对应一个信号量:sem_t producer

消费者对应一个信号量:sem_t customer

sem_init(&producer,2)----生产者拥有资源,可以工作

sem_init(&customer,0)----消费者没有资源,阻塞

在访问公共资源前对互斥量设置(加锁),确保同一时间只有一个线程访问数据,在访问完成后再释放(解锁)互斥量.

互斥锁的运行方式:串行访问共享资源

信号量的运行方式:并行访问共享资源

互斥量用pthread_mutex_t数据类型表示,在使用互斥量之前,必须使用pthread_mutex_init函数对它进行初始化,注意,使用完毕后需调用pthread_mutex_destroy.

pthread_mutex_init用于初始化互斥锁,mutexattr用于指定互斥锁的属性,若为NULL,则表示默认属性。除了用这个函数初始化互斥所外,还可以用如下方式初始化:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER。

pthread_mutex_destroy用于销毁互斥锁,以释放占用的内核资源,销毁一个已经加锁的互斥锁将导致不可预期的后果。

pthread_mutex_lock以原子 *** 作给一个互斥锁加锁。如果目标互斥锁已经被加锁,则pthread_mutex_lock则被阻塞,直到该互斥锁占有者把它给解锁.

pthread_mutex_trylock和pthread_mutex_lock类似,不过它始终立即返回,而不论被 *** 作的互斥锁是否加锁,是pthread_mutex_lock的非阻塞版本.当目标互斥锁未被加锁时,pthread_mutex_trylock进行加锁 *** 作;否则将返回EBUSY错误码。注意:这里讨论的pthread_mutex_lock和pthread_mutex_trylock是针对普通锁而言的,对于其他类型的锁,这两个加锁函数会有不同的行为.

pthread_mutex_unlock以原子 *** 作方式给一个互斥锁进行解锁 *** 作。如果此时有其他线程正在等待这个互斥锁,则这些线程中的一个将获得它.

三个打印机轮流打印:

输出结果:

如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量就是用于在线程之间同步共享数据的值.条件变量提供了一种线程之间通信的机制:当某个共享数据达到某个值时,唤醒等待这个共享数据的线程.

条件变量会在条件不满足的情况下阻塞线程.且条件变量和互斥量一起使用,允许线程以无竞争的方式等待特定的条件发生.

其中pthread_cond_broadcast函数以广播的形式唤醒所有等待目标条件变量的线程,pthread_cond_signal函数用于唤醒一个等待目标条件变量线程.但有时候我们可能需要唤醒一个固定的线程,可以通过间接的方法实现:定义一个能够唯一标识目标线程的全局变量,在唤醒等待条件变量的线程前先设置该变量为目标线程,然后采用广播的方式唤醒所有等待的线程,这些线程被唤醒之后都检查该变量以判断是否是自己.

采用条件变量+互斥锁实现生产者消费者模型:

运行结果:

阻塞队列+生产者消费者

运行结果:

这个问题需要的知识主要包括:

1 多进程间进行通信;

2 使用同步信号量(semaphore)和互斥信号量(mutex)进行数据保护。

参考代码如下,可以参照注释辅助理解:

#include <stdio.h>

#include <stdlib.h>

#include <unistd.h>

#include <pthread.h>

#include <semaphore.h>

#define N 2   // 消费者或者生产者的数目

#define M 10 // 缓冲数目

int in = 0   // 生产者放置产品的位置

int out = 0 // 消费者取产品的位置

int buff[M] = {0} // 缓冲初始化为0, 开始时没有产品

sem_t empty_sem // 同步信号量, 当满了时阻止生产者放产品

sem_t full_sem   // 同步信号量, 当没产品时阻止消费者消费

pthread_mutex_t mutex // 互斥信号量, 一次只有一个线程访问缓冲

int product_id = 0   //生产者id

int prochase_id = 0 //消费者id

/* 打印缓冲情况 */

void print()

{

int i

for(i = 0 i < M i++)

   printf("%d ", buff[i])

printf("\n")

}

/* 生产者方法 */ 

void *product()

{

int id = ++product_id

while(1)

{

   // 用sleep的数量可以调节生产和消费的速度,便于观察

   sleep(1)

   //sleep(1)

  

   sem_wait(&empty_sem)

   pthread_mutex_lock(&mutex)

  

   in = in % M

   printf("product%d in %d. like: \t", id, in)

  

   buff[in] = 1  

   print()  

   ++in

  

   pthread_mutex_unlock(&mutex)

   sem_post(&full_sem)  

}

}

/* 消费者方法 */

void *prochase()

{

int id = ++prochase_id

while(1)

{

   // 用sleep的数量可以调节生产和消费的速度,便于观察

   sleep(1)

//sleep(1)

  

   sem_wait(&full_sem)

   pthread_mutex_lock(&mutex)

  

   out = out % M

   printf("prochase%d in %d. like: \t", id, out)

  

   buff[out] = 0

   print()

   ++out

  

   pthread_mutex_unlock(&mutex)

   sem_post(&empty_sem)

}

}

int main()

{

pthread_t id1[N]

pthread_t id2[N]

int i

int ret[N]

// 初始化同步信号量

int ini1 = sem_init(&empty_sem, 0, M) 

int ini2 = sem_init(&full_sem, 0, 0)  

if(ini1 && ini2 != 0)

{

   printf("sem init failed \n")

   exit(1)

//初始化互斥信号量 

int ini3 = pthread_mutex_init(&mutex, NULL)

if(ini3 != 0)

{

   printf("mutex init failed \n")

   exit(1)

// 创建N个生产者线程

for(i = 0 i < N i++)

{

   ret[i] = pthread_create(&id1[i], NULL, product, (void *)(&i))

   if(ret[i] != 0)

   {

    printf("product%d creation failed \n", i)

    exit(1)

   }

}

//创建N个消费者线程

for(i = 0 i < N i++)

{

   ret[i] = pthread_create(&id2[i], NULL, prochase, NULL)

   if(ret[i] != 0)

   {

    printf("prochase%d creation failed \n", i)

    exit(1)

   }

}

//销毁线程

for(i = 0 i < N i++)

{

   pthread_join(id1[i],NULL)

   pthread_join(id2[i],NULL)

}

exit(0) 

}

在Linux下编译的时候,要在编译命令中加入选项-lpthread以包含多线程支持。比如存储的C文件为demo.c,要生成的可执行文件为demo。可以使用命令:

gcc demo.c -o demo -lpthread

程序中为便于观察,使用了sleep(1)来暂停运行,所以查看输出的时候可以看到,输出是每秒打印一次的。

磁盘结构与数据存储方式, 数据是如何存储的,又通过怎样的方式被访问?

机械硬盘主要由磁盘盘片、磁头、主轴与传动轴等组成;数据就存放在磁盘盘片中

现代硬盘寻道都是采用CHS( Cylinder Head Sector )的方式,硬盘读取数据时,读写磁头沿径向移动,移到要读取的扇区所在磁道的上方,这段时间称为 寻道时间(seek time) 因读写磁头的起始位置与目标位置之间的距离不同,寻道时间也不同 。磁头到达指定磁道后,然后通过盘片的旋转,使得要读取的扇区转到读写磁头的下方,这段时间称为 旋转延迟时间(rotational latencytime) 。然后再读写数据,读写数据也需要时间,这段时间称为 传输时间(transfer time)

固态硬盘主要由主控芯片、闪存颗粒与缓存组成;数据就存放在闪存芯片中

通过主控芯片进行寻址, 因为是电信号方式, 没有任何物理结构, 所以寻址速度非常快且与数据存储位置无关

如何查看系统IO状态

查看磁盘空间

调用 open , fwrite 时到底发生了什么?

在一个IO过程中,以下5个API/系统调用是必不可少的

Create 函数用来打开一个文件,如果该文件不存在,那么需要在磁盘上创建该文件

Open 函数用于打开一个指定的文件。如果在 Open 函数中指定 O_CREATE 标记,那么 Open 函数同样可以实现 Create 函数的功能

Clos e函数用于释放文件句柄

Write 和 Read 函数用于实现文件的读写过程

O_SYNC (先写缓存, 但是需要实际落盘之后才返回, 如果接下来有读请求, 可以从内存读 ), write-through

O_DSYNC (D=data, 类似O_SYNC, 但是只同步数据, 不同步元数据)

O_DIRECT (直接写盘, 不经过缓存)

O_ASYNC (异步IO, 使用信号机制实现, 不推荐, 直接用aio_xxx)

O_NOATIME (读取的时候不更新文件 atime(access time))

sync() 全局缓存写回磁盘

fsync() 特定fd的sync()

fdatasync() 只刷数据, 不同步元数据

mount noatime(全局不记录atime), re方式(只读), sync(同步方式)

一个IO的传奇一生 这里有一篇非常好的资料,讲述了整个IO过程;

下面简单记录下自己的理解的一次常见的Linux IO过程, 想了解更详细及相关源码,非常推荐阅读上面的原文

Linux IO体系结构

[站外图片上传中...(image-38a7b-1644137945193)]

Superblock 超级描述了整个文件系统的信息。为了保证可靠性,可以在每个块组中对superblock进行备份。为了避免superblock冗余过多,可以采用稀疏存储的方式,即在若干个块组中对superblock进行保存,而不需要在所有的块组中都进行备份

GDT 组描述符表 组描述符表对整个组内的数据布局进行了描述。例如,数据块位图的起始地址是多少?inode位图的起始地址是多少?inode表的起始地址是多少?块组中还有多少空闲块资源等。组描述符表在superblock的后面

数据块位图 数据块位图描述了块组内数据块的使用情况。如果该数据块已经被某个文件使用,那么位图中的对应位会被置1,否则该位为0

Inode位图 Inode位图描述了块组内inode资源使用情况。如果一个inode资源已经使用,那么对应位会被置1

Inode表 (即inode资源)和数据块。这两块占据了块组内的绝大部分空间,特别是数据块资源

一个文件是由inode进行描述的。一个文件占用的数据块block是通过inode管理起来的 。在inode结构中保存了直接块指针、一级间接块指针、二级间接块指针和三级间接块指针。对于一个小文件,直接可以采用直接块指针实现对文件块的访问;对于一个大文件,需要采用间接块指针实现对文件块的访问

最简单的调度器。它本质上就是一个链表实现的 fifo 队列,并对请求进行简单的 合并 处理。

调度器本身并没有提供任何可以配置的参数

读写请求被分成了两个队列, 一个用访问地址作为索引,一个用进入时间作为索引,并且采用两种方式将这些request管理起来;

在请求处理的过程中,deadline算法会优先处理那些访问地址临近的请求,这样可以最大程度的减少磁盘抖动的可能性。

只有在有些request即将被饿死的时候,或者没有办法进行磁盘顺序化 *** 作的时候,deadline才会放弃地址优先策略,转而处理那些即将被饿死的request

deadline算法可调整参数

read_expire : 读请求的超时时间设置(ms)。当一个读请求入队deadline的时候,其过期时间将被设置为当前时间+read_expire,并放倒fifo_list中进行排序

write_expire :写请求的超时时间设置(ms)

fifo_batch :在顺序(sort_list)请求进行处理的时候,deadline将以batch为单位进行处理。每一个batch处理的请求个数为这个参数所限制的个数。在一个batch处理的过程中,不会产生是否超时的检查,也就不会产生额外的磁盘寻道时间。这个参数可以用来平衡顺序处理和饥饿时间的矛盾,当饥饿时间需要尽可能的符合预期的时候,我们可以调小这个值,以便尽可能多的检查是否有饥饿产生并及时处理。增大这个值当然也会增大吞吐量,但是会导致处理饥饿请求的延时变长

writes_starved :这个值是在上述deadline出队处理第一步时做检查用的。用来判断当读队列不为空时,写队列的饥饿程度是否足够高,以时deadline放弃读请求的处理而处理写请求。当检查存在有写请求的时候,deadline并不会立即对写请求进行处理,而是给相关数据结构中的starved进行累计,如果这是第一次检查到有写请求进行处理,那么这个计数就为1。如果此时writes_starved值为2,则我们认为此时饥饿程度还不足够高,所以继续处理读请求。只有当starved >= writes_starved的时候,deadline才回去处理写请求。可以认为这个值是用来平衡deadline对读写请求处理优先级状态的,这个值越大,则写请求越被滞后处理,越小,写请求就越可以获得趋近于读请求的优先级

front_merges :当一个新请求进入队列的时候,如果其请求的扇区距离当前扇区很近,那么它就是可以被合并处理的。而这个合并可能有两种情况,一个是向当前位置后合并,另一种是向前合并。在某些场景下,向前合并是不必要的,那么我们就可以通过这个参数关闭向前合并。默认deadline支持向前合并,设置为0关闭

在调度一个request时,首先需要选择一个一个合适的cfq_group。Cfq调度器会为每个cfq_group分配一个时间片,当这个时间片耗尽之后,会选择下一个cfq_group。每个cfq_group都会分配一个vdisktime,并且通过该值采用红黑树对cfq_group进行排序。在调度的过程中,每次都会选择一个vdisktime最小的cfq_group进行处理。

一个cfq_group管理了7棵service tree,每棵service tree管理了需要调度处理的对象cfq_queue。因此,一旦cfq_group被选定之后,需要选择一棵service tree进行处理。这7棵service tree被分成了三大类,分别为RT、BE和IDLE。这三大类service tree的调度是按照优先级展开的

通过优先级可以很容易的选定一类Service tree。当一类service tree被选定之后,采用service time的方式选定一个合适的cfq_queue。每个Service tree是一棵红黑树,这些红黑树是按照service time进行检索的,每个cfq_queue都会维护自己的service time。分析到这里,我们知道,cfq算法通过每个cfq_group的vdisktime值来选定一个cfq_group进行服务,在处理cfq_group的过程通过优先级选择一个最需要服务的service tree。通过该Service tree得到最需要服务的cfq_queue。该过程在 cfq_select_queue 函数中实现

一个cfq_queue被选定之后,后面的过程和deadline算法有点类似。在选择request的时候需要考虑每个request的延迟等待时间,选择那种等待时间最长的request进行处理。但是,考虑到磁盘抖动的问题,cfq在处理的时候也会进行顺序批量处理,即将那些在磁盘上连续的request批量处理掉

cfq调度算法的参数

back_seek_max :磁头可以向后寻址的最大范围,默认值为16M

back_seek_penalty :向后寻址的惩罚系数。这个值是跟向前寻址进行比较的

fifo_expire_async :设置异步请求的超时时间。同步请求和异步请求是区分不同队列处理的,cfq在调度的时候一般情况都会优先处理同步请求,之后再处理异步请求,除非异步请求符合上述合并处理的条件限制范围内。当本进程的队列被调度时,cfq会优先检查是否有异步请求超时,就是超过fifo_expire_async参数的限制。如果有,则优先发送一个超时的请求,其余请求仍然按照优先级以及扇区编号大小来处理

fifo_expire_sync :这个参数跟上面的类似,区别是用来设置同步请求的超时时间

slice_idle :参数设置了一个等待时间。这让cfq在切换cfq_queue或service tree的时候等待一段时间,目的是提高机械硬盘的吞吐量。一般情况下,来自同一个cfq_queue或者service tree的IO请求的寻址局部性更好,所以这样可以减少磁盘的寻址次数。这个值在机械硬盘上默认为非零。当然在固态硬盘或者硬RAID设备上设置这个值为非零会降低存储的效率,因为固态硬盘没有磁头寻址这个概念,所以在这样的设备上应该设置为0,关闭此功能

group_idle :这个参数也跟上一个参数类似,区别是当cfq要切换cfq_group的时候会等待一段时间。在cgroup的场景下,如果我们沿用slice_idle的方式,那么空转等待可能会在cgroup组内每个进程的cfq_queue切换时发生。这样会如果这个进程一直有请求要处理的话,那么直到这个cgroup的配额被耗尽,同组中的其它进程也可能无法被调度到。这样会导致同组中的其它进程饿死而产生IO性能瓶颈。在这种情况下,我们可以将slice_idle = 0而group_idle = 8。这样空转等待就是以cgroup为单位进行的,而不是以cfq_queue的进程为单位进行,以防止上述问题产生

low_latency :这个是用来开启或关闭cfq的低延时(low latency)模式的开关。当这个开关打开时,cfq将会根据target_latency的参数设置来对每一个进程的分片时间(slice time)进行重新计算。这将有利于对吞吐量的公平(默认是对时间片分配的公平)。关闭这个参数(设置为0)将忽略target_latency的值。这将使系统中的进程完全按照时间片方式进行IO资源分配。这个开关默认是打开的

target_latency :当low_latency的值为开启状态时,cfq将根据这个值重新计算每个进程分配的IO时间片长度

quantum :这个参数用来设置每次从cfq_queue中处理多少个IO请求。在一个队列处理事件周期中,超过这个数字的IO请求将不会被处理。这个参数只对同步的请求有效

slice_sync :当一个cfq_queue队列被调度处理时,它可以被分配的处理总时间是通过这个值来作为一个计算参数指定的。公式为: time_slice = slice_sync + (slice_sync/5 * (4 - prio)) 这个参数对同步请求有效

slice_async :这个值跟上一个类似,区别是对异步请求有效

slice_async_rq :这个参数用来限制在一个slice的时间范围内,一个队列最多可以处理的异步请求个数。请求被处理的最大个数还跟相关进程被设置的io优先级有关

通常在Linux上使用的IO接口是同步方式的,进程调用 write / read 之后会阻塞陷入到内核态,直到本次IO过程完成之后,才能继续执行,下面介绍的异步IO则没有这种限制,但是当前Linux异步IO尚未成熟

目前Linux aio还处于较不成熟的阶段,只能在 O_DIRECT 方式下才能使用(glibc_aio),也就是无法使用默认的Page Cache机制

正常情况下,使用aio族接口的简要方式如下:

io_uring 是 2019 年 5 月发布的 Linux 5.1 加入的一个重大特性 —— Linux 下的全新的异步 I/O 支持,希望能彻底解决长期以来 Linux AIO 的各种不足

io_uring 实现异步 I/O 的方式其实是一个生产者-消费者模型:

逻辑卷管理

RAID0

RAID1

RAID5(纠错)

条带化

Linux系统性能调整:IO过程

Linux的IO调度

一个IO的传奇一生

理解inode

Linux 文件系统是怎么工作的?

Linux中Buffer cache性能问题一探究竟

Asynchronous I/O and event notification on linux

AIO 的新归宿:io_uring

Linux 文件 I/O 进化史(四):io_uring —— 全新的异步 I/O


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/yw/8963685.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-23
下一篇 2023-04-23

发表评论

登录后才能评论

评论列表(0条)

保存