最近,有一批任务需要把两批的fastq合并到一起并压缩成一个fastq文件才能继续往下做,由于存储空间有限又不能直接全部跑上,只能按样本逐个分批跑。众所周知,一般fastq是成对存在的,所需要对read1和read2分别合并一次,然而这次任务的fastq文件比较大,合并然后压缩一次需要1天左右,那对于一组fastq就要2-3天,这也太耗时间了,所以我在想能不能read1和read2 同时跑上,这就可以节省一半的时间了。
平时也能遇到很多类似的任务,特别是在进程数有限的情况下,如果这些小任务单独占用一个进程,而任务很多就很耗时间,如果能在一个进程下实现多个线程并行执行,就能大大提高运行效率。关于进程和线程的知识可以参考知乎的这篇文章【 Shell“ 多线程”,提高工作效率 】,整理的也比较有条理,能比较容易读懂。
当然,某些博主也写过类似的文章,例如这篇【 shell后台限制多并发控制后台任务强度进行文件拷贝 】但是实在是太高深莫测了,看不懂,一时半会儿也学不会。本文将示例Shell实现多线程的简单版本,其实不用太复杂。
其实只需要两个步骤, 第一步是给需要并行运行的命令行在结尾加上"&",代表放到后台运行,第二步是在在所有并行任务的后面加上一句“wait”,意思是等所有通过“&”放到后台运行的任务跑完后再继续执行后面的任务 ,这些就能实现所有带有“&”的行并行执行了。
看完脚本是不是觉得很简单?
上面的脚本适合并行任务少的,可以手动加&和wait,但是如果有几十个甚至上百个的小任务就比较麻烦了。但不用担心,可以写个循环,批量运行。
循环的结果也是跟上面类似的,只是多了个循环结构。
如果需要执行的任务只有一行,可以把大括号去掉。
关于for和while的循环可以查看之前的文章【 Shell常用循环示例(for和while批量处理)2022-05-25 】
需要注意的是多线程并行还是需要有限制的,毕竟都是在一个进程里运行,如果线程太多了会卡顿的,建议控制在100个以内,当然还有毕竟高级和复杂的方法可以实现限制。因为上面的脚本已经够我用了,没继续往下学,以后可以再补充。
三种专门用于线程同步的机制:POSIX信号量,互斥量和条件变量.
在Linux上信号量API有两组,一组是System V IPC信号量,即PV *** 作,另外就是POSIX信号量,POSIX信号量的名字都是以sem_开头.
phshared参数指定信号量的类型,若其值为0,就表示这个信号量是当前进程的局部信号量,否则该信号量可以在多个进程之间共享.value值指定信号量的初始值,一般与下面的sem_wait函数相对应.
其中比较重要的函数sem_wait函数会以原子 *** 作的方式将信号量的值减一,如果信号量的值为零,则sem_wait将会阻塞,信号量的值可以在sem_init函数中的value初始化sem_trywait函数是sem_wait的非阻塞版本sem_post函数将以原子的 *** 作对信号量加一,当信号量的值大于0时,其他正在调用sem_wait等待信号量的线程将被唤醒.
这些函数成功时返回0,失败则返回-1并设置errno.
生产者消费者模型:
生产者对应一个信号量:sem_t producer
消费者对应一个信号量:sem_t customer
sem_init(&producer,2)----生产者拥有资源,可以工作
sem_init(&customer,0)----消费者没有资源,阻塞
在访问公共资源前对互斥量设置(加锁),确保同一时间只有一个线程访问数据,在访问完成后再释放(解锁)互斥量.
互斥锁的运行方式:串行访问共享资源
信号量的运行方式:并行访问共享资源
互斥量用pthread_mutex_t数据类型表示,在使用互斥量之前,必须使用pthread_mutex_init函数对它进行初始化,注意,使用完毕后需调用pthread_mutex_destroy.
pthread_mutex_init用于初始化互斥锁,mutexattr用于指定互斥锁的属性,若为NULL,则表示默认属性。除了用这个函数初始化互斥所外,还可以用如下方式初始化:pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER。
pthread_mutex_destroy用于销毁互斥锁,以释放占用的内核资源,销毁一个已经加锁的互斥锁将导致不可预期的后果。
pthread_mutex_lock以原子 *** 作给一个互斥锁加锁。如果目标互斥锁已经被加锁,则pthread_mutex_lock则被阻塞,直到该互斥锁占有者把它给解锁.
pthread_mutex_trylock和pthread_mutex_lock类似,不过它始终立即返回,而不论被 *** 作的互斥锁是否加锁,是pthread_mutex_lock的非阻塞版本.当目标互斥锁未被加锁时,pthread_mutex_trylock进行加锁 *** 作;否则将返回EBUSY错误码。注意:这里讨论的pthread_mutex_lock和pthread_mutex_trylock是针对普通锁而言的,对于其他类型的锁,这两个加锁函数会有不同的行为.
pthread_mutex_unlock以原子 *** 作方式给一个互斥锁进行解锁 *** 作。如果此时有其他线程正在等待这个互斥锁,则这些线程中的一个将获得它.
三个打印机轮流打印:
输出结果:
如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量就是用于在线程之间同步共享数据的值.条件变量提供了一种线程之间通信的机制:当某个共享数据达到某个值时,唤醒等待这个共享数据的线程.
条件变量会在条件不满足的情况下阻塞线程.且条件变量和互斥量一起使用,允许线程以无竞争的方式等待特定的条件发生.
其中pthread_cond_broadcast函数以广播的形式唤醒所有等待目标条件变量的线程,pthread_cond_signal函数用于唤醒一个等待目标条件变量线程.但有时候我们可能需要唤醒一个固定的线程,可以通过间接的方法实现:定义一个能够唯一标识目标线程的全局变量,在唤醒等待条件变量的线程前先设置该变量为目标线程,然后采用广播的方式唤醒所有等待的线程,这些线程被唤醒之后都检查该变量以判断是否是自己.
采用条件变量+互斥锁实现生产者消费者模型:
运行结果:
阻塞队列+生产者消费者
运行结果:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)